Skip to content

Commit

Permalink
upgrade aio to 1.0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackerl committed Jul 19, 2023
1 parent 40cbfe0 commit f6dad5c
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 211 deletions.
63 changes: 31 additions & 32 deletions rasp/golang-ebpf/client/smith_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,36 @@
#include <aio/net/stream.h>
#include <zero/log.h>

using namespace std::chrono_literals;

constexpr auto SOCKET_PATH = "/var/run/smith_agent.sock";

std::shared_ptr<zero::async::promise::Promise<void>>
transfer(
const std::shared_ptr<aio::Context> &context,
const std::array<std::shared_ptr<aio::IChannel<SmithMessage>>, 2> &channels
const zero::ptr::RefPtr<aio::ISender<SmithMessage>> &sender,
const zero::ptr::RefPtr<aio::IReceiver<SmithMessage>> &receiver
) {
return aio::net::connect(context, SOCKET_PATH)->then([=](const std::shared_ptr<aio::ev::IBuffer> &buffer) {
return aio::net::stream::connect(
context,
SOCKET_PATH
)->then([=](const zero::ptr::RefPtr<aio::ev::IBuffer> &buffer) {
return zero::async::promise::all(
zero::async::promise::loop<void>([=](const auto &loop) {
buffer->read(4)->then([=](const std::vector<std::byte> &header) {
return buffer->read(ntohl(*(uint32_t *) header.data()));
zero::async::promise::doWhile([=]() {
return buffer->readExactly(4)->then([=](const std::vector<std::byte> &header) {
return buffer->readExactly(ntohl(*(uint32_t *) header.data()));
})->then([=](const std::vector<std::byte> &msg) {
LOG_INFO("message: %.*s", msg.size(), msg.data());

try {
channels[0]->sendNoWait(nlohmann::json::parse(msg).get<SmithMessage>());
sender->trySend(nlohmann::json::parse(msg).get<SmithMessage>());
} catch (const nlohmann::json::exception &e) {
LOG_ERROR("exception: %s", e.what());
}

P_CONTINUE(loop);
}, [=](const zero::async::promise::Reason &reason) {
P_BREAK_E(loop, reason);
});
}),
zero::async::promise::loop<void>([=](const auto &loop) {
channels[1]->receive()->then([=](const SmithMessage &message) {
zero::async::promise::doWhile([=]() {
return receiver->receive()->then([=](const SmithMessage &message) {
std::string msg = nlohmann::json(message).dump(
-1,
' ',
Expand All @@ -40,40 +42,37 @@ transfer(

uint32_t length = htonl(msg.length());

buffer->write({(const std::byte *) &length, sizeof(uint32_t)});
buffer->submit({(const std::byte *) &length, sizeof(uint32_t)});
buffer->submit({(const std::byte *) msg.data(), msg.size()});

if (buffer->write(msg) > 1024 * 1024)
if (buffer->pending() > 1024 * 1024)
return buffer->drain();

return zero::async::promise::resolve<void>();
})->then([=]() {
P_CONTINUE(loop);
}, [=](const zero::async::promise::Reason &reason) {
P_BREAK_E(loop, reason);
});
})
);
})->fail([](const zero::async::promise::Reason &reason) {
LOG_WARNING("transfer finished: %s", reason.message.c_str());
});
}

std::array<std::shared_ptr<aio::IChannel<SmithMessage>>, 2> startClient(const std::shared_ptr<aio::Context> &context) {
std::array<std::shared_ptr<aio::IChannel<SmithMessage>>, 2> channels = {
std::make_shared<aio::Channel<SmithMessage, 100>>(context),
std::make_shared<aio::Channel<SmithMessage, 100>>(context)
std::pair<zero::ptr::RefPtr<aio::IReceiver<SmithMessage>>, zero::ptr::RefPtr<aio::ISender<SmithMessage>>>
startClient(const std::shared_ptr<aio::Context> &context) {
zero::ptr::RefPtr<aio::IChannel<SmithMessage>> channels[2] = {
zero::ptr::makeRef<aio::Channel<SmithMessage, 100>>(context),
zero::ptr::makeRef<aio::Channel<SmithMessage, 100>>(context)
};

zero::async::promise::loop<void>([=](const auto &loop) {
transfer(context, channels)->finally([=]() {
LOG_INFO("disconnect");
zero::async::promise::doWhile([=]() {
return transfer(context, channels[0], channels[1])->fail([=](const zero::async::promise::Reason &reason) {
LOG_WARNING(
"transfer finished[code[%d] msg[%s]]",
reason.code,
reason.message.c_str()
);

std::make_shared<aio::ev::Timer>(context)->setTimeout(std::chrono::minutes{1})->then([=] {
LOG_INFO("reconnect");
P_CONTINUE(loop);
});
return zero::ptr::makeRef<aio::ev::Timer>(context)->setTimeout(1min);
});
});

return channels;
return {channels[0], channels[1]};
}
3 changes: 2 additions & 1 deletion rasp/golang-ebpf/client/smith_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "smith_message.h"
#include <aio/channel.h>

std::array<std::shared_ptr<aio::IChannel<SmithMessage>>, 2> startClient(const std::shared_ptr<aio::Context> &context);
std::pair<zero::ptr::RefPtr<aio::IReceiver<SmithMessage>>, zero::ptr::RefPtr<aio::ISender<SmithMessage>>>
startClient(const std::shared_ptr<aio::Context> &context);

#endif //GO_PROBE_EBPF_SMITH_CLIENT_H
58 changes: 24 additions & 34 deletions rasp/golang-ebpf/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ bool filter(const Trace &trace, const std::map<std::tuple<int, int>, Filter> &fi
}

void onEvent(probe_event *event, void *ctx) {
auto &[skeleton, instances, channel] = *(std::tuple<probe_bpf *, std::map<pid_t, Instance> &, std::shared_ptr<aio::IChannel<SmithMessage>>> *) ctx;
auto &[skeleton, instances, channel] = *(std::tuple<probe_bpf *, std::map<pid_t, Instance> &, zero::ptr::RefPtr<aio::ISender<SmithMessage>>> *) ctx;

auto it = instances.find(event->pid);

Expand Down Expand Up @@ -177,7 +177,7 @@ void onEvent(probe_event *event, void *ctx) {
#endif
#endif

channel->sendNoWait({it->first, it->second.version, it->second.processInfo, TRACE, trace});
channel->trySend({it->first, it->second.version, it->second.processInfo, TRACE, trace});
}

std::optional<int> getAPIOffset(const elf::Reader &reader, uint64_t address) {
Expand Down Expand Up @@ -217,12 +217,12 @@ std::optional<int> getAPIOffset(const elf::Reader &reader, uint64_t address) {
return offset;
}

std::shared_ptr<aio::IChannel<pid_t>> inputChannel(const std::shared_ptr<aio::Context> &context) {
std::shared_ptr channel = std::make_shared<aio::Channel<pid_t, 10>>(context);
std::shared_ptr buffer = std::make_shared<aio::ev::Buffer>(bufferevent_socket_new(context->base(), STDIN_FILENO, 0));
zero::ptr::RefPtr<aio::IReceiver<pid_t>> inputChannel(const std::shared_ptr<aio::Context> &context) {
zero::ptr::RefPtr<aio::IChannel<pid_t>> channel = zero::ptr::makeRef<aio::Channel<pid_t, 10>>(context);
zero::ptr::RefPtr<aio::ev::IBuffer> buffer = aio::ev::newBuffer(context, STDIN_FILENO, false);

zero::async::promise::loop<void>([=](const auto &loop) {
buffer->readLine(EVBUFFER_EOL_ANY)->then([=](const std::string &line) {
zero::async::promise::doWhile([=]() {
return buffer->readLine(aio::ev::EOL::ANY)->then([=](const std::string &line) {
std::optional<pid_t> pid = zero::strings::toNumber<pid_t>(line);

if (!pid) {
Expand All @@ -231,13 +231,11 @@ std::shared_ptr<aio::IChannel<pid_t>> inputChannel(const std::shared_ptr<aio::Co
}

return channel->send(*pid);
})->then([=]() {
P_CONTINUE(loop);
}, [=](const zero::async::promise::Reason &reason) {
LOG_ERROR("read stdin failed: %s", reason.message.c_str());
channel->close();
P_BREAK(loop);
});
})->fail([=](const zero::async::promise::Reason &reason) {
LOG_ERROR("read stdin failed: %s", reason.message.c_str());
})->finally([=]() {
channel->close();
});

return channel;
Expand Down Expand Up @@ -424,18 +422,16 @@ int main() {
std::shared_ptr<aio::Context> context = aio::newContext();
std::map<pid_t, Instance> instances;

zero::async::promise::loop<void>([skeleton, &instances, channel = inputChannel(context)](const auto &loop) {
channel->receive()->then([skeleton, loop, &instances](pid_t pid) {
zero::async::promise::doWhile([skeleton, &instances, receiver = inputChannel(context)]() {
return receiver->receive()->then([skeleton, &instances](pid_t pid) {
if (instances.find(pid) != instances.end()) {
LOG_WARNING("ignore process %d", pid);
P_CONTINUE(loop);
return;
}

std::optional<Instance> instance = attach(skeleton, pid);

if (!instance) {
P_CONTINUE(loop);
std::cout << pid << ":failed" << std::endl;
return;
}
Expand All @@ -444,15 +440,12 @@ int main() {

std::fill_n(instance->quotas[0], sizeof(instance->quotas) / sizeof(**instance->quotas), DEFAULT_QUOTAS);
instances.insert({pid, std::move(*instance)});

P_CONTINUE(loop);
}, [=](const zero::async::promise::Reason &reason) {
LOG_ERROR("receive failed: %s", reason.message.c_str());
P_BREAK(loop);
});
})->fail([=](const zero::async::promise::Reason &reason) {
LOG_ERROR("receive failed: %s", reason.message.c_str());
});

std::make_shared<aio::ev::Timer>(context)->setInterval(1min, [&]() {
zero::ptr::makeRef<aio::ev::Timer>(context)->setInterval(1min, [&]() {
auto it = instances.begin();

while (it != instances.end()) {
Expand Down Expand Up @@ -496,10 +489,10 @@ int main() {
return true;
});

std::array<std::shared_ptr<aio::IChannel<SmithMessage>>, 2> channels = startClient(context);
const auto [receiver, sender] = startClient(context);

zero::async::promise::loop<void>([channels, &instances](const auto &loop) {
channels[0]->receive()->then([loop, &instances](const SmithMessage &message) {
zero::async::promise::doWhile([receiver = receiver, &instances]() {
return receiver->receive()->then([&instances](const SmithMessage &message) {
auto it = instances.find(message.pid);

if (it == instances.end()) {
Expand Down Expand Up @@ -553,18 +546,15 @@ int main() {
default:
break;
}

P_CONTINUE(loop);
})->fail([=](const zero::async::promise::Reason &reason) {
LOG_ERROR("receive failed: %s", reason.message.c_str());
P_BREAK(loop);
});
})->fail([=](const zero::async::promise::Reason &reason) {
LOG_ERROR("receive failed: %s", reason.message.c_str());
});

std::tuple<probe_bpf *, std::map<pid_t, Instance> &, std::shared_ptr<aio::IChannel<SmithMessage>>> ctx = {
std::tuple<probe_bpf *, std::map<pid_t, Instance> &, zero::ptr::RefPtr<aio::ISender<SmithMessage>>> ctx = {
skeleton,
instances,
channels[1]
sender
};

#ifdef USE_RING_BUFFER
Expand Down Expand Up @@ -611,7 +601,7 @@ int main() {
}

for (size_t i = 0; i < perf_buffer__buffer_cnt(pb); i++) {
std::make_shared<aio::ev::Event>(context, perf_buffer__buffer_fd(pb, i))->onPersist(
zero::ptr::makeRef<aio::ev::Event>(context, perf_buffer__buffer_fd(pb, i))->onPersist(
EV_READ,
[=](short what) {
perf_buffer__consume_buffer(pb, i);
Expand Down
2 changes: 1 addition & 1 deletion rasp/golang-ebpf/vcpkg.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{
"name": "aio",
"default-features": false,
"version>=": "1.0.3"
"version>=": "1.0.5"
},
{
"name": "curl",
Expand Down
63 changes: 29 additions & 34 deletions rasp/golang/client/smith_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,29 @@ constexpr auto MESSAGE_DIRECTORY = "/var/run/elkeid_rasp";
std::shared_ptr<zero::async::promise::Promise<void>>
transfer(
const std::shared_ptr<aio::Context> &context,
const std::shared_ptr<aio::ISender<SmithMessage>> &sender,
const std::shared_ptr<aio::IReceiver<SmithMessage>> &receiver
const zero::ptr::RefPtr<aio::ISender<SmithMessage>> &sender,
const zero::ptr::RefPtr<aio::IReceiver<SmithMessage>> &receiver
) {
return aio::net::connect(context, SOCKET_PATH)->then([=](const std::shared_ptr<aio::ev::IBuffer> &buffer) {
return aio::net::stream::connect(
context,
SOCKET_PATH
)->then([=](const zero::ptr::RefPtr<aio::ev::IBuffer> &buffer) {
return zero::async::promise::all(
zero::async::promise::loop<void>([=](const auto &loop) {
buffer->read(4)->then([=](const std::vector<std::byte> &header) {
return buffer->read(ntohl(*(uint32_t *) header.data()));
zero::async::promise::doWhile([=]() {
return buffer->readExactly(4)->then([=](const std::vector<std::byte> &header) {
return buffer->readExactly(ntohl(*(uint32_t *) header.data()));
})->then([=](const std::vector<std::byte> &msg) {
LOG_INFO("message: %.*s", msg.size(), msg.data());

try {
sender->sendNoWait(nlohmann::json::parse(msg).get<SmithMessage>());
sender->trySend(nlohmann::json::parse(msg).get<SmithMessage>());
} catch (const nlohmann::json::exception &e) {
LOG_ERROR("exception: %s", e.what());
}

P_CONTINUE(loop);
}, [=](const zero::async::promise::Reason &reason) {
P_BREAK_E(loop, reason);
});
}),
zero::async::promise::loop<void>([=](const auto &loop) {
receiver->receive()->then([=](const SmithMessage &message) {
zero::async::promise::doWhile([=]() {
return receiver->receive()->then([=](const SmithMessage &message) {
std::string msg = nlohmann::json(message).dump(
-1,
' ',
Expand All @@ -45,43 +44,39 @@ transfer(

uint32_t length = htonl(msg.length());

buffer->write({(const std::byte *) &length, sizeof(uint32_t)});
buffer->submit({(const std::byte *) &length, sizeof(uint32_t)});
buffer->submit({(const std::byte *) msg.data(), msg.size()});

if (buffer->write(msg) > 1024 * 1024)
if (buffer->pending() > 1024 * 1024)
return buffer->drain();

return zero::async::promise::resolve<void>();
})->then([=]() {
P_CONTINUE(loop);
}, [=](const zero::async::promise::Reason &reason) {
P_BREAK_E(loop, reason);
});
})
);
})->fail([](const zero::async::promise::Reason &reason) {
LOG_WARNING("transfer finished: %s", reason.message.c_str());
});
}

std::pair<std::shared_ptr<aio::IReceiver<SmithMessage>>, std::shared_ptr<aio::ISender<SmithMessage>>>
std::pair<zero::ptr::RefPtr<aio::IReceiver<SmithMessage>>, zero::ptr::RefPtr<aio::ISender<SmithMessage>>>
startClient(const std::shared_ptr<aio::Context> &context) {
std::shared_ptr<aio::IChannel<SmithMessage>> channels[2] = {
std::make_shared<aio::Channel<SmithMessage, 100>>(context),
std::make_shared<aio::Channel<SmithMessage, 100>>(context)
zero::ptr::RefPtr<aio::IChannel<SmithMessage>> channels[2] = {
zero::ptr::makeRef<aio::Channel<SmithMessage, 100>>(context),
zero::ptr::makeRef<aio::Channel<SmithMessage, 100>>(context)
};

zero::async::promise::loop<void>([=](const auto &loop) {
transfer(context, channels[0], channels[1])->finally([=]() {
LOG_INFO("disconnect");
zero::async::promise::doWhile([=]() {
return transfer(context, channels[0], channels[1])->fail([=](const zero::async::promise::Reason &reason) {
LOG_WARNING(
"transfer finished[code[%d] msg[%s]]",
reason.code,
reason.message.c_str()
);

std::make_shared<aio::ev::Timer>(context)->setTimeout(1min)->then([=] {
LOG_INFO("reconnect");
P_CONTINUE(loop);
});
return zero::ptr::makeRef<aio::ev::Timer>(context)->setTimeout(1min);
});
});

std::make_shared<aio::ev::Timer>(context)->setInterval(
zero::ptr::makeRef<aio::ev::Timer>(context)->setInterval(
5min,
[
channel = channels[0],
Expand All @@ -96,7 +91,7 @@ startClient(const std::shared_ptr<aio::Context> &context) {

try {
for (const auto &message: nlohmann::json::parse(stream).get<std::list<SmithMessage>>())
channel->sendNoWait(message);
channel->trySend(message);
} catch (const nlohmann::json::exception &e) {
LOG_ERROR("exception: %s", e.what());
}
Expand Down
2 changes: 1 addition & 1 deletion rasp/golang/client/smith_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "smith_message.h"
#include <aio/channel.h>

std::pair<std::shared_ptr<aio::IReceiver<SmithMessage>>, std::shared_ptr<aio::ISender<SmithMessage>>>
std::pair<zero::ptr::RefPtr<aio::IReceiver<SmithMessage>>, zero::ptr::RefPtr<aio::ISender<SmithMessage>>>
startClient(const std::shared_ptr<aio::Context> &context);

#endif //GO_PROBE_SMITH_CLIENT_H
Loading

0 comments on commit f6dad5c

Please sign in to comment.