Skip to content

Commit

Permalink
feat: Support K8S_WATCH_POLICY env
Browse files Browse the repository at this point in the history
  • Loading branch information
rvql authored and sharang committed Sep 24, 2024
1 parent f0ebbca commit ec9660a
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 15 deletions.
13 changes: 9 additions & 4 deletions agent/src/platform/kubernetes/api_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::{
rpc::Session,
trident::AgentId,
utils::{
environment::{running_in_container, running_in_only_watch_k8s_mode},
environment::{running_in_container, KubeWatchPolicy},
stats,
},
};
Expand Down Expand Up @@ -198,9 +198,14 @@ impl ApiWatcher {
return;
}

if (!self.context.config.load().kubernetes_api_enabled && !running_in_only_watch_k8s_mode())
|| !running_in_container()
{
let wp = KubeWatchPolicy::get();
debug!("kubernetes watch policy is {wp:?}");
let enabled = match wp {
KubeWatchPolicy::Normal => self.context.config.load().kubernetes_api_enabled,
KubeWatchPolicy::WatchOnly => true,
KubeWatchPolicy::WatchDisabled => false,
};
if !enabled || !running_in_container() {
return;
}

Expand Down
8 changes: 7 additions & 1 deletion agent/src/rpc/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use crate::utils::{
command::get_hostname,
environment::{
get_executable_path, is_tt_pod, running_in_container, running_in_k8s,
running_in_only_watch_k8s_mode,
running_in_only_watch_k8s_mode, KubeWatchPolicy,
},
stats,
};
Expand Down Expand Up @@ -813,6 +813,9 @@ impl Synchronizer {
kubernetes_cluster_id: Some(static_config.kubernetes_cluster_id.clone()),
kubernetes_cluster_name: static_config.kubernetes_cluster_name.clone(),
kubernetes_force_watch: Some(running_in_only_watch_k8s_mode()),
kubernetes_watch_policy: Some(
tp::KubernetesWatchPolicy::from(KubeWatchPolicy::get()).into(),
),
agent_unique_identifier: Some(
tp::AgentIdentifier::from_str_name(
static_config.agent_unique_identifier.as_str_name(),
Expand Down Expand Up @@ -1901,6 +1904,9 @@ impl Synchronizer {
kubernetes_cluster_id: Some(static_config.kubernetes_cluster_id.clone()),
kubernetes_cluster_name: static_config.kubernetes_cluster_name.clone(),
kubernetes_force_watch: Some(running_in_only_watch_k8s_mode()),
kubernetes_watch_policy: Some(
ap::KubernetesWatchPolicy::from(KubeWatchPolicy::get()).into(),
),
agent_unique_identifier: Some(AgentIdentifier::from(
static_config.agent_unique_identifier,
) as i32),
Expand Down
15 changes: 9 additions & 6 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ use log::{debug, info, warn};
use tokio::runtime::{Builder, Runtime};
use tokio::sync::broadcast;

#[cfg(target_os = "linux")]
use crate::platform::{
kubernetes::{GenericPoller, Poller, SidecarPoller},
ApiWatcher, LibvirtXmlExtractor,
};
use crate::{
collector::{
flow_aggr::FlowAggrThread, quadruple_generator::QuadrupleGeneratorThread, CollectorThread,
Expand Down Expand Up @@ -109,6 +104,14 @@ use crate::{
platform::SocketSynchronizer,
utils::{environment::core_file_check, lru::Lru},
};
#[cfg(target_os = "linux")]
use crate::{
platform::{
kubernetes::{GenericPoller, Poller, SidecarPoller},
ApiWatcher, LibvirtXmlExtractor,
},
utils::environment::{IN_CONTAINER, K8S_WATCH_POLICY},
};

use packet_sequence_block::BoxedPacketSequenceBlock;
use pcap_assembler::{BoxedPcapBatch, PcapAssembler};
Expand Down Expand Up @@ -1417,7 +1420,7 @@ impl WatcherComponents {
runtime: Arc<Runtime>,
) -> Result<Self> {
let candidate_config = &config_handler.candidate_config;
info!("With ONLY_WATCH_K8S_RESOURCE and IN_CONTAINER environment variables set, the agent will only watch K8s resource");
info!("This agent will only watch K8s resource because IN_CONTAINER={} and K8S_WATCH_POLICY={}", env::var(IN_CONTAINER).unwrap_or_default(), env::var(K8S_WATCH_POLICY).unwrap_or_default());
Ok(WatcherComponents {
running: AtomicBool::new(false),
capture_mode: candidate_config.capture_mode,
Expand Down
71 changes: 67 additions & 4 deletions agent/src/utils/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

use std::{
cell::OnceCell,
env::{self, VarError},
fs,
iter::Iterator,
Expand All @@ -37,7 +38,10 @@ use crate::{
};

use public::{
proto::agent::{AgentType, Exception},
proto::{
agent::{AgentType, Exception, KubernetesWatchPolicy},
trident::KubernetesWatchPolicy as OldKubernetesWatchPolicy,
},
utils::net::{
addr_list, get_mac_by_ip, get_route_src_ip_and_mac, is_global, link_by_name, link_list,
LinkFlags, MacAddr,
Expand All @@ -55,15 +59,18 @@ pub use self::windows::*;

pub type Checker = Box<dyn Fn() -> Result<()>>;

const IN_CONTAINER: &str = "IN_CONTAINER";
pub const IN_CONTAINER: &str = "IN_CONTAINER";
// K8S environment node ip environment variable
const K8S_NODE_IP_FOR_DEEPFLOW: &str = "K8S_NODE_IP_FOR_DEEPFLOW";
const ENV_INTERFACE_NAME: &str = "CTRL_NETWORK_INTERFACE";
const K8S_POD_IP_FOR_DEEPFLOW: &str = "K8S_POD_IP_FOR_DEEPFLOW";
const K8S_NODE_NAME_FOR_DEEPFLOW: &str = "K8S_NODE_NAME_FOR_DEEPFLOW";
const ONLY_WATCH_K8S_RESOURCE: &str = "ONLY_WATCH_K8S_RESOURCE";
pub const K8S_WATCH_POLICY: &str = "K8S_WATCH_POLICY";
const K8S_NAMESPACE_FOR_DEEPFLOW: &str = "K8S_NAMESPACE_FOR_DEEPFLOW";

// no longer used
const ONLY_WATCH_K8S_RESOURCE: &str = "ONLY_WATCH_K8S_RESOURCE";

const DNS_HOST_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8));
const DNS_HOST_IPV6: IpAddr = IpAddr::V6(Ipv6Addr::new(0x240c, 0, 0, 0, 0, 0, 0, 0x6666));

Expand Down Expand Up @@ -250,8 +257,64 @@ pub fn get_env() -> String {
.join(" ")
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum KubeWatchPolicy {
Normal,
WatchOnly,
WatchDisabled,
}

thread_local! {
// initialize once only to avoid inconsistency
// use LazyCell instead of OnceCell after upgrading rust to 1.80 or later
static KUBE_WATCH_POLICY: OnceCell<KubeWatchPolicy> = OnceCell::new();
}

impl KubeWatchPolicy {
pub fn get() -> Self {
KUBE_WATCH_POLICY.with(|p| *p.get_or_init(|| KubeWatchPolicy::from_env()))
}

pub fn from_env() -> Self {
// ONLY_WATCH_K8S_RESOURCE no longer supported
if env::var_os(ONLY_WATCH_K8S_RESOURCE).is_some() {
error!("Environment variable ONLY_WATCH_K8S_RESOURCE is not longer supported, use K8S_WATCH_POLICY instead!");
thread::sleep(Duration::from_secs(60));
error!("Environment variable ONLY_WATCH_K8S_RESOURCE is not longer supported, use K8S_WATCH_POLICY instead!");
crate::utils::notify_exit(-1);
return KubeWatchPolicy::Normal;
}

match env::var(K8S_WATCH_POLICY) {
Ok(policy) if policy == "watch-only" => Self::WatchOnly,
Ok(policy) if policy == "watch-disabled" => Self::WatchDisabled,
_ => Self::Normal,
}
}
}

impl From<KubeWatchPolicy> for KubernetesWatchPolicy {
fn from(p: KubeWatchPolicy) -> Self {
match p {
KubeWatchPolicy::Normal => Self::KwpNormal,
KubeWatchPolicy::WatchOnly => Self::KwpWatchOnly,
KubeWatchPolicy::WatchDisabled => Self::KwpWatchDisabled,
}
}
}

impl From<KubeWatchPolicy> for OldKubernetesWatchPolicy {
fn from(p: KubeWatchPolicy) -> Self {
match p {
KubeWatchPolicy::Normal => Self::KwpNormal,
KubeWatchPolicy::WatchOnly => Self::KwpWatchOnly,
KubeWatchPolicy::WatchDisabled => Self::KwpWatchDisabled,
}
}
}

pub fn running_in_only_watch_k8s_mode() -> bool {
running_in_container() && env::var_os(ONLY_WATCH_K8S_RESOURCE).is_some()
running_in_container() && KubeWatchPolicy::get() == KubeWatchPolicy::WatchOnly
}

pub fn get_k8s_namespace() -> String {
Expand Down
7 changes: 7 additions & 0 deletions message/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ enum Exception {
// 2^32及以上由控制器使用,顺序从后往前
}

enum KubernetesWatchPolicy {
KWP_NORMAL = 0;
KWP_WATCH_ONLY = 1;
KWP_WATCH_DISABLED = 2;
}

message SyncRequest {
optional uint32 boot_time = 1;
optional bool config_accepted = 2 [default = true];
Expand Down Expand Up @@ -110,6 +116,7 @@ message SyncRequest {
optional string os = 35;
optional string kernel_version = 36;

optional KubernetesWatchPolicy kubernetes_watch_policy = 41;
optional string kubernetes_cluster_id = 45; // 仅对容器类型的 agent 有意义
optional string kubernetes_cluster_name = 46; // 仅对容器类型的 agent 有意义
}
Expand Down
8 changes: 8 additions & 0 deletions message/trident.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ message TsdbReportInfo {
optional string pcap_data_mount_path = 4;
}

enum KubernetesWatchPolicy {
KWP_NORMAL = 0;
KWP_WATCH_ONLY = 1;
KWP_WATCH_DISABLED = 2;
}

message SyncRequest {
optional uint32 boot_time = 1;
optional bool config_accepted = 2 [default = true];
Expand Down Expand Up @@ -111,6 +117,8 @@ message SyncRequest {
optional string os = 35;
optional string kernel_version = 36;

optional KubernetesWatchPolicy kubernetes_watch_policy = 41;

optional TsdbReportInfo tsdb_report_info = 43; // 仅对数据节点有意义

optional string kubernetes_cluster_id = 45; // 仅对容器类型的采集器有意义
Expand Down

0 comments on commit ec9660a

Please sign in to comment.