From ec9660ac2e46bfb0b55062a5e59e89c953096b78 Mon Sep 17 00:00:00 2001 From: JIN Jie Date: Tue, 24 Sep 2024 17:13:10 +0800 Subject: [PATCH] feat: Support K8S_WATCH_POLICY env --- agent/src/platform/kubernetes/api_watcher.rs | 13 ++-- agent/src/rpc/synchronizer.rs | 8 ++- agent/src/trident.rs | 15 +++-- agent/src/utils/environment.rs | 71 ++++++++++++++++++-- message/agent.proto | 7 ++ message/trident.proto | 8 +++ 6 files changed, 107 insertions(+), 15 deletions(-) diff --git a/agent/src/platform/kubernetes/api_watcher.rs b/agent/src/platform/kubernetes/api_watcher.rs index 5ebecb27f3a..3f882368132 100644 --- a/agent/src/platform/kubernetes/api_watcher.rs +++ b/agent/src/platform/kubernetes/api_watcher.rs @@ -48,7 +48,7 @@ use crate::{ rpc::Session, trident::AgentId, utils::{ - environment::{running_in_container, running_in_only_watch_k8s_mode}, + environment::{running_in_container, KubeWatchPolicy}, stats, }, }; @@ -198,9 +198,14 @@ impl ApiWatcher { return; } - if (!self.context.config.load().kubernetes_api_enabled && !running_in_only_watch_k8s_mode()) - || !running_in_container() - { + let wp = KubeWatchPolicy::get(); + debug!("kubernetes watch policy is {wp:?}"); + let enabled = match wp { + KubeWatchPolicy::Normal => self.context.config.load().kubernetes_api_enabled, + KubeWatchPolicy::WatchOnly => true, + KubeWatchPolicy::WatchDisabled => false, + }; + if !enabled || !running_in_container() { return; } diff --git a/agent/src/rpc/synchronizer.rs b/agent/src/rpc/synchronizer.rs index 7930cf4a572..3dc5cbeef25 100644 --- a/agent/src/rpc/synchronizer.rs +++ b/agent/src/rpc/synchronizer.rs @@ -72,7 +72,7 @@ use crate::utils::{ command::get_hostname, environment::{ get_executable_path, is_tt_pod, running_in_container, running_in_k8s, - running_in_only_watch_k8s_mode, + running_in_only_watch_k8s_mode, KubeWatchPolicy, }, stats, }; @@ -813,6 +813,9 @@ impl Synchronizer { kubernetes_cluster_id: Some(static_config.kubernetes_cluster_id.clone()), kubernetes_cluster_name: static_config.kubernetes_cluster_name.clone(), kubernetes_force_watch: Some(running_in_only_watch_k8s_mode()), + kubernetes_watch_policy: Some( + tp::KubernetesWatchPolicy::from(KubeWatchPolicy::get()).into(), + ), agent_unique_identifier: Some( tp::AgentIdentifier::from_str_name( static_config.agent_unique_identifier.as_str_name(), @@ -1901,6 +1904,9 @@ impl Synchronizer { kubernetes_cluster_id: Some(static_config.kubernetes_cluster_id.clone()), kubernetes_cluster_name: static_config.kubernetes_cluster_name.clone(), kubernetes_force_watch: Some(running_in_only_watch_k8s_mode()), + kubernetes_watch_policy: Some( + ap::KubernetesWatchPolicy::from(KubeWatchPolicy::get()).into(), + ), agent_unique_identifier: Some(AgentIdentifier::from( static_config.agent_unique_identifier, ) as i32), diff --git a/agent/src/trident.rs b/agent/src/trident.rs index a9b11c8d9ae..9acef41ba97 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -39,11 +39,6 @@ use log::{debug, info, warn}; use tokio::runtime::{Builder, Runtime}; use tokio::sync::broadcast; -#[cfg(target_os = "linux")] -use crate::platform::{ - kubernetes::{GenericPoller, Poller, SidecarPoller}, - ApiWatcher, LibvirtXmlExtractor, -}; use crate::{ collector::{ flow_aggr::FlowAggrThread, quadruple_generator::QuadrupleGeneratorThread, CollectorThread, @@ -109,6 +104,14 @@ use crate::{ platform::SocketSynchronizer, utils::{environment::core_file_check, lru::Lru}, }; +#[cfg(target_os = "linux")] +use crate::{ + platform::{ + kubernetes::{GenericPoller, Poller, SidecarPoller}, + ApiWatcher, LibvirtXmlExtractor, + }, + utils::environment::{IN_CONTAINER, K8S_WATCH_POLICY}, +}; use packet_sequence_block::BoxedPacketSequenceBlock; use pcap_assembler::{BoxedPcapBatch, PcapAssembler}; @@ -1417,7 +1420,7 @@ impl WatcherComponents { runtime: Arc, ) -> Result { let candidate_config = &config_handler.candidate_config; - info!("With ONLY_WATCH_K8S_RESOURCE and IN_CONTAINER environment variables set, the agent will only watch K8s resource"); + info!("This agent will only watch K8s resource because IN_CONTAINER={} and K8S_WATCH_POLICY={}", env::var(IN_CONTAINER).unwrap_or_default(), env::var(K8S_WATCH_POLICY).unwrap_or_default()); Ok(WatcherComponents { running: AtomicBool::new(false), capture_mode: candidate_config.capture_mode, diff --git a/agent/src/utils/environment.rs b/agent/src/utils/environment.rs index 6c7d54230a5..94398cc3b34 100644 --- a/agent/src/utils/environment.rs +++ b/agent/src/utils/environment.rs @@ -15,6 +15,7 @@ */ use std::{ + cell::OnceCell, env::{self, VarError}, fs, iter::Iterator, @@ -37,7 +38,10 @@ use crate::{ }; use public::{ - proto::agent::{AgentType, Exception}, + proto::{ + agent::{AgentType, Exception, KubernetesWatchPolicy}, + trident::KubernetesWatchPolicy as OldKubernetesWatchPolicy, + }, utils::net::{ addr_list, get_mac_by_ip, get_route_src_ip_and_mac, is_global, link_by_name, link_list, LinkFlags, MacAddr, @@ -55,15 +59,18 @@ pub use self::windows::*; pub type Checker = Box Result<()>>; -const IN_CONTAINER: &str = "IN_CONTAINER"; +pub const IN_CONTAINER: &str = "IN_CONTAINER"; // K8S environment node ip environment variable const K8S_NODE_IP_FOR_DEEPFLOW: &str = "K8S_NODE_IP_FOR_DEEPFLOW"; const ENV_INTERFACE_NAME: &str = "CTRL_NETWORK_INTERFACE"; const K8S_POD_IP_FOR_DEEPFLOW: &str = "K8S_POD_IP_FOR_DEEPFLOW"; const K8S_NODE_NAME_FOR_DEEPFLOW: &str = "K8S_NODE_NAME_FOR_DEEPFLOW"; -const ONLY_WATCH_K8S_RESOURCE: &str = "ONLY_WATCH_K8S_RESOURCE"; +pub const K8S_WATCH_POLICY: &str = "K8S_WATCH_POLICY"; const K8S_NAMESPACE_FOR_DEEPFLOW: &str = "K8S_NAMESPACE_FOR_DEEPFLOW"; +// no longer used +const ONLY_WATCH_K8S_RESOURCE: &str = "ONLY_WATCH_K8S_RESOURCE"; + const DNS_HOST_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)); const DNS_HOST_IPV6: IpAddr = IpAddr::V6(Ipv6Addr::new(0x240c, 0, 0, 0, 0, 0, 0, 0x6666)); @@ -250,8 +257,64 @@ pub fn get_env() -> String { .join(" ") } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum KubeWatchPolicy { + Normal, + WatchOnly, + WatchDisabled, +} + +thread_local! { + // initialize once only to avoid inconsistency + // use LazyCell instead of OnceCell after upgrading rust to 1.80 or later + static KUBE_WATCH_POLICY: OnceCell = OnceCell::new(); +} + +impl KubeWatchPolicy { + pub fn get() -> Self { + KUBE_WATCH_POLICY.with(|p| *p.get_or_init(|| KubeWatchPolicy::from_env())) + } + + pub fn from_env() -> Self { + // ONLY_WATCH_K8S_RESOURCE no longer supported + if env::var_os(ONLY_WATCH_K8S_RESOURCE).is_some() { + error!("Environment variable ONLY_WATCH_K8S_RESOURCE is not longer supported, use K8S_WATCH_POLICY instead!"); + thread::sleep(Duration::from_secs(60)); + error!("Environment variable ONLY_WATCH_K8S_RESOURCE is not longer supported, use K8S_WATCH_POLICY instead!"); + crate::utils::notify_exit(-1); + return KubeWatchPolicy::Normal; + } + + match env::var(K8S_WATCH_POLICY) { + Ok(policy) if policy == "watch-only" => Self::WatchOnly, + Ok(policy) if policy == "watch-disabled" => Self::WatchDisabled, + _ => Self::Normal, + } + } +} + +impl From for KubernetesWatchPolicy { + fn from(p: KubeWatchPolicy) -> Self { + match p { + KubeWatchPolicy::Normal => Self::KwpNormal, + KubeWatchPolicy::WatchOnly => Self::KwpWatchOnly, + KubeWatchPolicy::WatchDisabled => Self::KwpWatchDisabled, + } + } +} + +impl From for OldKubernetesWatchPolicy { + fn from(p: KubeWatchPolicy) -> Self { + match p { + KubeWatchPolicy::Normal => Self::KwpNormal, + KubeWatchPolicy::WatchOnly => Self::KwpWatchOnly, + KubeWatchPolicy::WatchDisabled => Self::KwpWatchDisabled, + } + } +} + pub fn running_in_only_watch_k8s_mode() -> bool { - running_in_container() && env::var_os(ONLY_WATCH_K8S_RESOURCE).is_some() + running_in_container() && KubeWatchPolicy::get() == KubeWatchPolicy::WatchOnly } pub fn get_k8s_namespace() -> String { diff --git a/message/agent.proto b/message/agent.proto index ad121d91b4a..a3b569590a5 100644 --- a/message/agent.proto +++ b/message/agent.proto @@ -80,6 +80,12 @@ enum Exception { // 2^32及以上由控制器使用,顺序从后往前 } +enum KubernetesWatchPolicy { + KWP_NORMAL = 0; + KWP_WATCH_ONLY = 1; + KWP_WATCH_DISABLED = 2; +} + message SyncRequest { optional uint32 boot_time = 1; optional bool config_accepted = 2 [default = true]; @@ -110,6 +116,7 @@ message SyncRequest { optional string os = 35; optional string kernel_version = 36; + optional KubernetesWatchPolicy kubernetes_watch_policy = 41; optional string kubernetes_cluster_id = 45; // 仅对容器类型的 agent 有意义 optional string kubernetes_cluster_name = 46; // 仅对容器类型的 agent 有意义 } diff --git a/message/trident.proto b/message/trident.proto index d5da9e1fa8f..012e32674c3 100644 --- a/message/trident.proto +++ b/message/trident.proto @@ -79,6 +79,12 @@ message TsdbReportInfo { optional string pcap_data_mount_path = 4; } +enum KubernetesWatchPolicy { + KWP_NORMAL = 0; + KWP_WATCH_ONLY = 1; + KWP_WATCH_DISABLED = 2; +} + message SyncRequest { optional uint32 boot_time = 1; optional bool config_accepted = 2 [default = true]; @@ -111,6 +117,8 @@ message SyncRequest { optional string os = 35; optional string kernel_version = 36; + optional KubernetesWatchPolicy kubernetes_watch_policy = 41; + optional TsdbReportInfo tsdb_report_info = 43; // 仅对数据节点有意义 optional string kubernetes_cluster_id = 45; // 仅对容器类型的采集器有意义