Skip to content

Commit

Permalink
feat: add process listener
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa committed Sep 24, 2024
1 parent 7f42010 commit c4ced9b
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 21 deletions.
60 changes: 60 additions & 0 deletions agent/src/ebpf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,66 @@ extern "C" {
}
}

pub fn set_feature_uprobe_golang(pids: &Vec<u32>) {
unsafe {
set_feature_pids(
FEATURE_UPROBE_GOLANG,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_uprobe_golang_symbol(pids: &Vec<u32>) {
unsafe {
set_feature_pids(
FEATURE_UPROBE_GOLANG_SYMBOL,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_uprobe_tls(pids: &Vec<u32>) {
unsafe {
set_feature_pids(
FEATURE_UPROBE_OPENSSL,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_on_cpu(pids: &Vec<u32>) {
unsafe {
set_feature_pids(
FEATURE_PROFILE_ONCPU,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_off_cpu(pids: &Vec<u32>) {
unsafe {
set_feature_pids(
FEATURE_PROFILE_OFFCPU,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

pub fn set_feature_memory(pids: &Vec<u32>) {
unsafe {
set_feature_pids(
FEATURE_PROFILE_MEMORY,
pids.as_ptr() as *const i32,
pids.len() as i32,
);
}
}

#[no_mangle]
extern "C" fn rust_info_wrapper(msg: *const libc::c_char) {
unsafe {
Expand Down
44 changes: 35 additions & 9 deletions agent/src/ebpf_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use crate::flow_generator::{flow_map::Config, AppProto, FlowMap};
use crate::integration_collector::Profile;
use crate::policy::PolicyGetter;
use crate::rpc::get_timestamp;
use crate::utils::stats;
use crate::utils::{process::ProcessListener, stats};

use public::{
buffer::BatchedBox,
Expand Down Expand Up @@ -463,6 +463,7 @@ pub struct EbpfCollector {
counter: Arc<EbpfCounter>,

exception_handler: ExceptionHandler,
process_listener: Arc<ProcessListener>,
}

static mut SWITCH: bool = false;
Expand Down Expand Up @@ -614,10 +615,11 @@ impl EbpfCollector {
ebpf_profile_sender: DebugSender<Profile>,
policy_getter: PolicyGetter,
time_diff: Arc<AtomicI64>,
process_listener: &ProcessListener,
) -> Result<ConfigHandle> {
// ebpf和ebpf collector通信配置初始化
unsafe {
let handle = Self::ebpf_core_init(config);
let handle = Self::ebpf_core_init(process_listener, config);
// initialize communication between core and ebpf collector
SWITCH = false;
SENDER = Some(sender);
Expand All @@ -631,18 +633,24 @@ impl EbpfCollector {
}
}

unsafe fn ebpf_core_init(config: &EbpfConfig) -> Result<ConfigHandle> {
unsafe fn ebpf_core_init(
process_listener: &ProcessListener,
config: &EbpfConfig,
) -> Result<ConfigHandle> {
// ebpf core modules init
#[allow(unused_mut)]
let mut handle = ConfigHandle::default();
if config.ebpf.socket.uprobe.golang.enabled {
let feature = "ebpf.socket.uprobe.golang";
process_listener.register(feature, ebpf::set_feature_uprobe_golang);

let uprobe_proc_regexp = config
.process_matcher
.iter()
.find(|p| {
p.enabled_features
.iter()
.find(|f| f.eq_ignore_ascii_case("ebpf.socket.uprobe.golang"))
.find(|f| f.eq_ignore_ascii_case(feature))
.is_some()
})
.map(|p| p.match_regex.to_owned())
Expand All @@ -663,13 +671,16 @@ impl EbpfCollector {
}

if config.ebpf.socket.uprobe.tls.enabled {
let feature = "ebpf.socket.uprobe.tls";
process_listener.register(feature, ebpf::set_feature_uprobe_tls);

let uprobe_proc_regexp = config
.process_matcher
.iter()
.find(|p| {
p.enabled_features
.iter()
.find(|f| f.eq_ignore_ascii_case("ebpf.socket.uprobe.tls"))
.find(|f| f.eq_ignore_ascii_case(feature))
.is_some()
})
.map(|p| p.match_regex.to_owned())
Expand All @@ -690,13 +701,16 @@ impl EbpfCollector {
}

if config.symbol_table.golang_specific.enabled {
let feature = "input.proc.symbol_table.golang_specific";
process_listener.register(feature, ebpf::set_feature_uprobe_golang_symbol);

let uprobe_proc_regexp = config
.process_matcher
.iter()
.find(|p| {
p.enabled_features
.iter()
.find(|f| f.eq_ignore_ascii_case("input.proc.symbol_table.golang_specific"))
.find(|f| f.eq_ignore_ascii_case(feature))
.is_some()
})
.map(|p| p.match_regex.to_owned())
Expand Down Expand Up @@ -899,13 +913,16 @@ impl EbpfCollector {
}

if !on_cpu.disabled {
let feature = "ebpf.profile.on_cpu";
process_listener.register(feature, ebpf::set_feature_on_cpu);

let on_cpu_regexp = config
.process_matcher
.iter()
.find(|p| {
p.enabled_features
.iter()
.find(|f| f.eq_ignore_ascii_case("ebpf.profile.on_cpu"))
.find(|f| f.eq_ignore_ascii_case(feature))
.is_some()
})
.map(|p| p.match_regex.to_owned())
Expand All @@ -924,18 +941,21 @@ impl EbpfCollector {

#[cfg(feature = "extended_profile")]
{
let feature = "ebpf.profile.off_cpu";
let off_cpu_regexp = config
.process_matcher
.iter()
.find(|p| {
p.enabled_features
.iter()
.find(|f| f.eq_ignore_ascii_case("ebpf.profile.off_cpu"))
.find(|f| f.eq_ignore_ascii_case(feature))
.is_some()
})
.map(|p| p.match_regex.to_owned())
.unwrap_or_default();
if !off_cpu.disabled {
process_listener.register(feature, ebpf::set_feature_off_cpu);

ebpf::set_feature_regex(
ebpf::FEATURE_PROFILE_ONCPU,
CString::new(off_cpu_regexp.as_bytes())
Expand All @@ -949,6 +969,9 @@ impl EbpfCollector {
}

if !memory.disabled {
let feature = "ebpf.profile.memory";
process_listener.register(feature, ebpf::set_feature_memory);

let memory_cpu_regexp = config
.process_matcher
.iter()
Expand Down Expand Up @@ -1046,6 +1069,7 @@ impl EbpfCollector {
queue_debugger: &QueueDebugger,
stats_collector: Arc<stats::Collector>,
exception_handler: ExceptionHandler,
process_listener: &Arc<ProcessListener>,
) -> Result<Box<Self>> {
let ebpf_config = config.load();
if ebpf_config.ebpf.disabled {
Expand All @@ -1071,6 +1095,7 @@ impl EbpfCollector {
ebpf_profile_sender,
policy_getter,
time_diff.clone(),
process_listener,
)?;
Self::ebpf_on_config_change(ebpf::CAP_LEN_MAX);

Expand All @@ -1097,6 +1122,7 @@ impl EbpfCollector {
get_token_failed: AtomicU64::new(0),
}),
exception_handler,
process_listener: process_listener.clone(),
}))
}

Expand Down Expand Up @@ -1143,7 +1169,7 @@ impl EbpfCollector {
as *mut memory_profile::MemoryContext,
));
}
if let Ok(handle) = Self::ebpf_core_init(config) {
if let Ok(handle) = Self::ebpf_core_init(&self.process_listener, config) {
self.config_handle = handle;
} else {
warn!("ebpf start_continuous_profiler error.");
Expand Down
10 changes: 10 additions & 0 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ use crate::{
guard::Guard,
logger::{LogLevelWriter, LogWriterAdapter, RemoteLogWriter},
npb_bandwidth_watcher::NpbBandwidthWatcher,
process::ProcessListener,
stats::{self, Countable, QueueStats, RefCountable},
},
};
Expand Down Expand Up @@ -1571,6 +1572,7 @@ pub struct AgentComponents {
pub tap_interfaces: Vec<Link>,
pub bpf_options: Arc<Mutex<BpfOptions>>,
pub last_dispatcher_component_id: usize,
pub process_listener: Arc<ProcessListener>,

max_memory: u64,
capture_mode: PacketCaptureType,
Expand Down Expand Up @@ -2398,6 +2400,8 @@ impl AgentComponents {
None,
);

let process_listener = Arc::new(ProcessListener::new(&vec![]));

let ebpf_dispatcher_id = dispatcher_components.len();
#[cfg(any(target_os = "linux", target_os = "android"))]
let mut ebpf_dispatcher_component = None;
Expand Down Expand Up @@ -2474,6 +2478,7 @@ impl AgentComponents {
&queue_debugger,
stats_collector.clone(),
exception_handler.clone(),
&process_listener,
) {
Ok(ebpf_collector) => {
synchronizer
Expand Down Expand Up @@ -2745,6 +2750,7 @@ impl AgentComponents {
tap_interfaces,
last_dispatcher_component_id: otel_dispatcher_id,
bpf_options,
process_listener,
})
}

Expand Down Expand Up @@ -2817,6 +2823,7 @@ impl AgentComponents {

self.npb_bandwidth_watcher.start();
self.npb_arp_table.start();
self.process_listener.start();
info!("Started agent components.");
}

Expand Down Expand Up @@ -2894,6 +2901,9 @@ impl AgentComponents {
if let Some(h) = self.stats_collector.notify_stop() {
join_handles.push(h);
}
if let Some(h) = self.process_listener.notify_stop() {
join_handles.push(h);
}

for handle in join_handles {
if !handle.is_finished() {
Expand Down
Loading

0 comments on commit c4ced9b

Please sign in to comment.