Skip to content

Commit

Permalink
Implement listening on activation environment changes
Browse files Browse the repository at this point in the history
  • Loading branch information
q66 committed Oct 3, 2024
1 parent 116633b commit b5bf19d
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 11 deletions.
34 changes: 34 additions & 0 deletions src/control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ bool control_conn_t::process_packet()
return process_setenv();
case cp_cmd::GETALLENV:
return process_getallenv();
case cp_cmd::LISTENENV:
return process_listenenv();
case cp_cmd::SETTRIGGER:
return process_set_trigger();
case cp_cmd::CATLOG:
Expand Down Expand Up @@ -1141,6 +1143,17 @@ bool control_conn_t::process_getallenv()
return true;
}

bool control_conn_t::process_listenenv()
{
// 1 byte packet type, nothing else
rbuf.consume(1);

main_env.add_listener(this);

char ack_rep[] = { (char)cp_rply::ACK };
return queue_packet(ack_rep, 1);
}

bool control_conn_t::process_set_trigger()
{
// 1 byte packet type
Expand Down Expand Up @@ -1468,6 +1481,26 @@ void control_conn_t::service_event(service_record *service, service_event_t even
}
}

void control_conn_t::environ_event(environment *env, std::string const &var_and_val) noexcept
{
// packet type (byte) + packet length (byte) + data size + data
constexpr int pktsize = 2 + sizeof(envvar_len_t);
envvar_len_t ln = var_and_val.size() + 1;
auto *ptr = var_and_val.data();

try {
std::vector<char> pkt;
pkt.reserve(pktsize + ln);
pkt.push_back((char)cp_info::ENVEVENT);
pkt.push_back(pktsize);
pkt.insert(pkt.end(), (char *)&ln, ((char *)&ln) + sizeof(envvar_len_t));
pkt.insert(pkt.end(), ptr, ptr + ln);
queue_packet(std::move(pkt));
} catch (std::bad_alloc &exc) {
do_oom_close();
}
}

bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
{
bool was_empty = outbuf.empty();
Expand Down Expand Up @@ -1671,6 +1704,7 @@ control_conn_t::~control_conn_t() noexcept
for (auto p : service_key_map) {
p.first->remove_listener(this);
}
main_env.remove_listener(this);

active_control_conns--;
}
135 changes: 126 additions & 9 deletions src/dinit-monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <vector>
#include <string>
#include <unordered_map>
#include <unordered_set>

#include <cstring>
#include <csignal>
Expand Down Expand Up @@ -32,10 +33,13 @@ struct stringview {
static std::vector<stringview> split_command(const char *cmd);
static bool load_service(int socknum, cpbuffer_t &rbuffer, const char *name, handle_t *handle,
service_state_t *state);
static void request_environ(int socknum, cpbuffer_t &rbuffer);
static size_t get_allenv(int socknum, cpbuffer_t &rbuffer);

// dummy handler, so we can wait for children
static void sigchld_handler(int) { }
void issue_command(const char* service_name, const char* event_str,std::vector<stringview> &command_parts);
static void issue_command(const char* service_name, const char* event_str, std::vector<stringview> &command_parts, bool is_env = false);
static size_t read_and_issue(int socknum, cpbuffer_t &rbuffer, size_t dsz, const std::unordered_set<std::string> &varset, std::string &enval, std::vector<stringview> &command_parts);

int dinit_monitor_main(int argc, char **argv)
{
Expand All @@ -44,6 +48,7 @@ int dinit_monitor_main(int argc, char **argv)
const char *control_socket_path = nullptr;
bool user_dinit = (getuid() != 0); // communicate with user daemon
bool issue_init = false; // request initial service state
bool use_environ = false; // listening on activation environment changes
const char *str_started = "started";
const char *str_stopped = "stopped";
const char *str_failed = "failed";
Expand All @@ -61,6 +66,9 @@ int dinit_monitor_main(int argc, char **argv)
std::cout << "Dinit version " << DINIT_VERSION << ".\n";
return 0;
}
else if (strcmp(argv[i], "--env") == 0 || strcmp(argv[i], "-e") == 0) {
use_environ = true;
}
else if (strcmp(argv[i], "--system") == 0 || strcmp(argv[i], "-s") == 0) {
user_dinit = false;
}
Expand Down Expand Up @@ -102,7 +110,7 @@ int dinit_monitor_main(int argc, char **argv)
}
str_failed = argv[i];
}
else if (strcmp(argv[i], "-c") == 0 || strcmp(argv[i], "--command")) {
else if (strcmp(argv[i], "-c") == 0 || strcmp(argv[i], "--command") == 0) {
++i;
if (i == argc) {
std::cerr << "dinit-monitor: --command/-c should be followed by command\n";
Expand All @@ -120,10 +128,11 @@ int dinit_monitor_main(int argc, char **argv)
std::cout << "dinit-monitor: monitor Dinit services\n"
"\n"
"Usage:\n"
" dinit-monitor [options] <service-names...>\n"
" dinit-monitor [options] <service-names|environ-names...>\n"
"\n"
"Options:\n"
" --help : show this help\n"
" -e, --env : monitor activation environment changes\n"
" -s, --system : monitor system daemon (default if run as root)\n"
" -u, --user : monitor user daemon\n"
" -i, --initial : also execute command for initial service state\n"
Expand All @@ -140,7 +149,7 @@ int dinit_monitor_main(int argc, char **argv)
return 1;
}

if (services.empty()) {
if (services.empty() && !use_environ) {
std::cerr << "dinit-monitor: specify at least one service name\n";
return 1;
}
Expand Down Expand Up @@ -186,12 +195,20 @@ int dinit_monitor_main(int argc, char **argv)

// Load all services
std::unordered_map<handle_t, const char *> service_map;
std::unordered_set<std::string> environ_set;
std::vector<std::pair<const char *, service_state_t>> service_init_state;
std::string env_value;

for (const char *service_name : services) {

handle_t hndl;
service_state_t state;

if (use_environ) {
environ_set.emplace(service_name);
continue;
}

if (!load_service(socknum, rbuffer, service_name, &hndl, &state)) {
std::cerr << "dinit-monitor: cannot load service: " << service_name << "\n";
return 1;
Expand All @@ -201,8 +218,19 @@ int dinit_monitor_main(int argc, char **argv)
service_init_state.push_back(std::make_pair(service_name, state));
}

// Issue initial status commands if requested
if (issue_init) {
if (use_environ) {
// Request listening on environ events
request_environ(socknum, rbuffer);
if (issue_init) {
// Get the whole block and see if it's already set
auto envsz = get_allenv(socknum, rbuffer);
while (envsz > 0) {
envsz = read_and_issue(socknum, rbuffer, envsz, environ_set, env_value, command_parts);
}
}
}
else if (issue_init) {
// Issue initial status commands if requested
for (auto state : service_init_state ) {
if (state.second == service_state_t::STARTED) {
issue_command(state.first, str_started, command_parts);
Expand All @@ -221,7 +249,14 @@ int dinit_monitor_main(int argc, char **argv)
int pktlen = (unsigned char) rbuffer[1];
fill_buffer_to(rbuffer, socknum, pktlen);

if (rbuffer[0] == (char)cp_info::SERVICEEVENT) {
if (use_environ && rbuffer[0] == (char)cp_info::ENVEVENT) {
envvar_len_t envln;
rbuffer.extract((char *) &envln, 2, sizeof(envln));
rbuffer.consume(pktlen);
// this will return 0, we don't want to consume after this
pktlen = read_and_issue(socknum, rbuffer, envln, environ_set, env_value, command_parts);
}
else if (!use_environ && rbuffer[0] == (char)cp_info::SERVICEEVENT) {
handle_t ev_handle;
rbuffer.extract((char *) &ev_handle, 2, sizeof(ev_handle));
service_event_t event = static_cast<service_event_t>(rbuffer[2 + sizeof(ev_handle)]);
Expand Down Expand Up @@ -298,7 +333,7 @@ int dinit_monitor_main(int argc, char **argv)
}


void issue_command(const char* service_name, const char* event_str, std::vector<stringview> &command_parts) {
static void issue_command(const char* service_name, const char* event_str, std::vector<stringview> &command_parts, bool is_env) {
std::vector<std::string> final_cmd_parts;
std::vector<const char *> final_cmd_parts_cstr;

Expand All @@ -316,7 +351,7 @@ void issue_command(const char* service_name, const char* event_str, std::vector<
if (cmd_part.str[i] == 'n') {
cmd_part_str.append(service_name);
}
else if (cmd_part.str[i] == 's') {
else if (cmd_part.str[i] == is_env ? 'v' : 's') {
cmd_part_str.append(event_str);
}
else {
Expand Down Expand Up @@ -480,3 +515,85 @@ static bool load_service(int socknum, cpbuffer_t &rbuffer, const char *name, han

return true;
}

static void request_environ(int socknum, cpbuffer_t &rbuffer)
{
char c = (char)cp_cmd::LISTENENV;
write_all_x(socknum, &c, 1);

wait_for_reply(rbuffer, socknum);

cp_rply reply_pkt_h = (cp_rply)rbuffer[0];
if (reply_pkt_h != cp_rply::ACK) {
throw dinit_protocol_error();
}
rbuffer.consume(1);
}

static size_t get_allenv(int socknum, cpbuffer_t &rbuffer)
{
char buf[2] = { (char)cp_cmd::GETALLENV, 0 };
write_all_x(socknum, buf, 2);

wait_for_reply(rbuffer, socknum);

cp_rply reply_pkt_h = (cp_rply)rbuffer[0];
if (reply_pkt_h != cp_rply::ALLENV) {
throw dinit_protocol_error();
}

// 1-byte packet header, then size_t
constexpr size_t allenv_hdr_size = 1 + sizeof(size_t);
rbuffer.fill_to(socknum, allenv_hdr_size);

size_t dsize;
rbuffer.extract(&dsize, 1, sizeof(dsize));
rbuffer.consume(allenv_hdr_size);

return dsize;
}

static void issue_var(std::string &envar, const std::unordered_set<std::string> &varset, std::vector<stringview> &command_parts)
{
auto eq = envar.find('=');
if (eq == envar.npos) {
/* malformed */
throw dinit_protocol_error();
}
auto *sp = &envar[0];
sp[eq] = '\0';
if (varset.empty() || varset.find(sp) != varset.end()) {
issue_command(sp, &sp[eq + 1], command_parts, true);
}
}

static size_t read_and_issue(int socknum, cpbuffer_t &rbuffer, size_t dsz, const std::unordered_set<std::string> &varset, std::string &enval, std::vector<stringview> &command_parts)
{
enval.clear();
while (dsz > 0) {
auto colen = rbuffer.get_contiguous_length(rbuffer.get_ptr(0));
auto chlen = std::min((size_t)colen, dsz);
for (unsigned i = 0; i < chlen; ++i) {
if (rbuffer[i] != '\0') {
continue;
}
enval.append(rbuffer.get_ptr(0), rbuffer.get_ptr(0) + i);
rbuffer.consume(i + 1);
issue_var(enval, varset, command_parts);
return dsz - i - 1;
}
// copy what we have so far and fill some more
enval.append(rbuffer.get_ptr(0), rbuffer.get_ptr(0) + chlen);
rbuffer.consume(chlen);
dsz -= chlen;
if (dsz == 0) {
// didn't find null terminator, malformed
throw dinit_protocol_error();
}
if (rbuffer.get_length() == 0) {
fill_some(rbuffer, socknum);
}
}
// unreachable
throw dinit_protocol_error();
}
7 changes: 6 additions & 1 deletion src/includes/control-cmds.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// 3 - dinit 0.17.1 (adds QUERYSERVICEDSCDIR)
// 4 - dinit 0.18.0 (adds CLOSEHANDLE, GETALLENV)
// 5 - (unreleased) (process status now represented as ([int]si_code + [int]si_status) rather than
// a single integer; SERVICEEVENT5 sent alongside SERVICEEVENT)
// a single integer; SERVICEEVENT5 sent alongside SERVICEEVENT; adds LISTENENV, ENVEVENT)

// Requests:
enum class cp_cmd : dinit_cptypes::cp_cmd_t {
Expand Down Expand Up @@ -87,6 +87,9 @@ enum class cp_cmd : dinit_cptypes::cp_cmd_t {

// Query status of an individual service (5+)
SERVICESTATUS5 = 26,

// Start listening to environment events
LISTENENV = 27,
};

// Replies:
Expand Down Expand Up @@ -175,6 +178,8 @@ enum class cp_info : dinit_cptypes::cp_info_t {
SERVICEEVENT = 100,
// Service event for protocol version 5+ - 4 byte handle, 1 byte event code, proc_status_t status
SERVICEEVENT5 = 101,
// Environment event; 2 bytes length + env string
ENVEVENT = 102,
};

#endif
9 changes: 8 additions & 1 deletion src/includes/control.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <dinit.h>
#include <dinit-log.h>
#include <dinit-env.h>
#include <control-cmds.h>
#include <service-listener.h>
#include <cpbuffer.h>
Expand Down Expand Up @@ -81,7 +82,7 @@ class control_conn_watcher : public eventloop_t::bidi_fd_watcher_impl<control_co
}
};

class control_conn_t : private service_listener
class control_conn_t : private service_listener, private env_listener
{
friend rearm control_conn_cb(eventloop_t *loop, control_conn_watcher *watcher, int revents);
friend class control_conn_t_test;
Expand Down Expand Up @@ -190,6 +191,9 @@ class control_conn_t : private service_listener
// Get the complete environment
bool process_getallenv();

// Listen to environment events
bool process_listenenv();

// Notify that data is ready to be read from the socket. Returns true if the connection should
// be closed.
bool data_ready() noexcept;
Expand Down Expand Up @@ -226,6 +230,9 @@ class control_conn_t : private service_listener
// Note that this can potentially be called during packet processing (upon issuing
// service start/stop orders etc).
void service_event(service_record *service, service_event_t event) noexcept final override;

// Process environment event broadcast.
void environ_event(environment *env, std::string const &var_and_val) noexcept final override;

public:
control_conn_t(eventloop_t &loop, service_set * services_p, int fd)
Expand Down
Loading

0 comments on commit b5bf19d

Please sign in to comment.