Skip to content

Commit

Permalink
lib: make ipc slightly less fragile
Browse files Browse the repository at this point in the history
Each process now gets an IPC handle that takes ownership of the ipc socket
and tracks the queue and any associated callbacks.  This gets passed around
to make sure that we're always operating on the right socket and the right
received data, rather than potentially crossing wires if some packets were
still in the queue from a process that's since disappeared.

This fixes an issue seen in the tty tests where we end up closing the socket
out from underneath the next process because another process has been
spawned before our current process is GC'd.

Signed-off-by: Kyle Evans <[email protected]>
(cherry picked from commit c06977b)
  • Loading branch information
kevans91 committed Jan 28, 2024
1 parent b558904 commit 5e0b093
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 126 deletions.
139 changes: 79 additions & 60 deletions lib/core/orch_ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,82 +20,102 @@ struct orch_ipc_msgq {
struct orch_ipc_msgq *next;
};

static struct orch_ipc_register {
struct orch_ipc_register {
orch_ipc_handler *handler;
void *cookie;
} orch_ipc_registration[IPC_LAST - 1];
};

static struct orch_ipc_msgq *head, *tail;
static int sockfd = -1;
struct orch_ipc {
struct orch_ipc_register *callbacks;
struct orch_ipc_msgq *head;
struct orch_ipc_msgq *tail;
int sockfd;
};

static int orch_ipc_drain(void);
static int orch_ipc_pop(struct orch_ipc_msg **);
static int orch_ipc_drain(orch_ipc_t);
static int orch_ipc_pop(orch_ipc_t, struct orch_ipc_msg **);

int
orch_ipc_close(void)
orch_ipc_close(orch_ipc_t ipc)
{
int error;

if (ipc == NULL)
return (0);

error = 0;
if (sockfd != -1) {
shutdown(sockfd, SHUT_WR);
if (ipc->sockfd != -1) {
shutdown(ipc->sockfd, SHUT_WR);

/*
* orch_ipc_drain() should hit EOF then close the socket.
* orch_ipc_drain() should hit EOF then close the socket. This
* will just drain the socket, a follow-up orch_ipc_pop() will
* drain the read queue and invoke callbacks.
*/
while (sockfd != -1 && error == 0) {
orch_ipc_wait(NULL);
while (ipc->sockfd != -1 && error == 0) {
orch_ipc_wait(ipc, NULL);

error = orch_ipc_drain();
error = orch_ipc_drain(ipc);
}

close(ipc->sockfd);
ipc->sockfd = -1;
}

/*
* We may have hit EOF at an inopportune time, just cope with it
* and free the queue.
*/
error = orch_ipc_pop(NULL);
assert(head == NULL);
tail = NULL;
for (size_t i = 0; i < IPC_LAST; i++) {
struct orch_ipc_register *reg = &orch_ipc_registration[i];

reg->handler = NULL;
reg->cookie = NULL;
}
error = orch_ipc_pop(ipc, NULL);
assert(ipc->head == NULL);

free(ipc->callbacks);
free(ipc);

return (error);
}

void
orch_ipc_t
orch_ipc_open(int fd)
{
orch_ipc_t hdl;

hdl = malloc(sizeof(*hdl));
if (hdl == NULL)
return (NULL);

hdl->callbacks = calloc(IPC_LAST - 1, sizeof(*hdl->callbacks));
if (hdl->callbacks == NULL) {
free(hdl);
return (NULL);
}

assert(sockfd == -1);
sockfd = fd;
hdl->head = hdl->tail = NULL;
hdl->sockfd = fd;
return (hdl);
}

bool
orch_ipc_okay(void)
orch_ipc_okay(orch_ipc_t ipc)
{

return (sockfd >= 0);
return (ipc->sockfd >= 0);
}

static int
orch_ipc_drain(void)
orch_ipc_drain(orch_ipc_t ipc)
{
struct orch_ipc_header hdr;
struct orch_ipc_msg *msg;
struct orch_ipc_msgq *msgq;
ssize_t readsz;
size_t off, resid;

if (sockfd == -1)
if (!orch_ipc_okay(ipc))
return (0);

for (;;) {
readsz = read(sockfd, &hdr, sizeof(hdr));
readsz = read(ipc->sockfd, &hdr, sizeof(hdr));
if (readsz == -1) {
if (errno == EAGAIN)
break;
Expand Down Expand Up @@ -133,7 +153,7 @@ orch_ipc_drain(void)
resid = hdr.size - sizeof(hdr);

while (resid != 0) {
readsz = read(sockfd, &msg->data[off], resid);
readsz = read(ipc->sockfd, &msg->data[off], resid);
if (readsz == -1) {
if (errno != EAGAIN) {
free(msg);
Expand All @@ -153,11 +173,11 @@ orch_ipc_drain(void)
resid -= readsz;
}

if (head == NULL) {
head = tail = msgq;
if (ipc->head == NULL) {
ipc->head = ipc->tail = msgq;
} else {
tail->next = msgq;
tail = msgq;
ipc->tail->next = msgq;
ipc->tail = msgq;
}

msg = NULL;
Expand All @@ -167,24 +187,25 @@ orch_ipc_drain(void)
return (0);
eof:

close(sockfd);
sockfd = -1;
close(ipc->sockfd);
ipc->sockfd = -1;

return (0);
}

static int
orch_ipc_pop(struct orch_ipc_msg **omsg)
orch_ipc_pop(orch_ipc_t ipc, struct orch_ipc_msg **omsg)
{
struct orch_ipc_register *reg;
struct orch_ipc_msgq *msgq;
struct orch_ipc_msg *msg;
int error;

error = 0;
while (head != NULL) {
while (ipc->head != NULL) {
/* Dequeue a msg */
msgq = head;
head = head->next;
msgq = ipc->head;
ipc->head = ipc->head->next;

/* Free the container */
msg = msgq->msg;
Expand All @@ -193,11 +214,11 @@ orch_ipc_pop(struct orch_ipc_msg **omsg)
msgq = NULL;

/* Do we have a handler for it? */
reg = &orch_ipc_registration[msg->hdr.tag - 1];
reg = &ipc->callbacks[msg->hdr.tag - 1];
if (reg->handler != NULL) {
int serr;

error = (*reg->handler)(msg, reg->cookie);
error = (*reg->handler)(ipc, msg, reg->cookie);
if (error != 0)
serr = errno;

Expand Down Expand Up @@ -236,43 +257,43 @@ orch_ipc_pop(struct orch_ipc_msg **omsg)
}

int
orch_ipc_recv(struct orch_ipc_msg **omsg)
orch_ipc_recv(orch_ipc_t ipc, struct orch_ipc_msg **omsg)
{
struct orch_ipc_msg *rcvmsg;
int error;

if (orch_ipc_drain() != 0)
if (orch_ipc_drain(ipc) != 0)
return (-1);

rcvmsg = NULL;
error = orch_ipc_pop(&rcvmsg);
error = orch_ipc_pop(ipc, &rcvmsg);
if (error == 0)
*omsg = rcvmsg;
return (error);
}

int
orch_ipc_register(enum orch_ipc_tag tag, orch_ipc_handler *handler,
void *cookie)
orch_ipc_register(orch_ipc_t ipc, enum orch_ipc_tag tag,
orch_ipc_handler *handler, void *cookie)
{
struct orch_ipc_register *reg = &orch_ipc_registration[tag - 1];
struct orch_ipc_register *reg = &ipc->callbacks[tag - 1];

reg->handler = handler;
reg->cookie = cookie;
return (0);
}

int
orch_ipc_send(struct orch_ipc_msg *msg)
orch_ipc_send(orch_ipc_t ipc, struct orch_ipc_msg *msg)
{
ssize_t writesz;
size_t off, resid;

retry:
if (orch_ipc_drain() != 0)
if (orch_ipc_drain(ipc) != 0)
return (-1);

writesz = write(sockfd, &msg->hdr, sizeof(msg->hdr));
writesz = write(ipc->sockfd, &msg->hdr, sizeof(msg->hdr));
if (writesz == -1) {
if (errno != EAGAIN)
return (-1);
Expand All @@ -285,7 +306,7 @@ orch_ipc_send(struct orch_ipc_msg *msg)
off = 0;
resid = msg->hdr.size - sizeof(msg->hdr);
while (resid != 0) {
writesz = write(sockfd, &msg->data[off], resid);
writesz = write(ipc->sockfd, &msg->data[off], resid);
if (writesz == -1) {
if (errno != EAGAIN)
return (-1);
Expand All @@ -299,10 +320,8 @@ orch_ipc_send(struct orch_ipc_msg *msg)
return (0);
}

#include <stdio.h>

int
orch_ipc_wait(bool *eof_seen)
orch_ipc_wait(orch_ipc_t ipc, bool *eof_seen)
{
fd_set rfd;
int error;
Expand All @@ -314,20 +333,20 @@ orch_ipc_wait(bool *eof_seen)
* If we have any messages in the queue, don't bother polling; recv
* will return something.
*/
if (head != NULL)
if (ipc->head != NULL)
return (0);

FD_ZERO(&rfd);
do {
if (sockfd == -1) {
if (ipc->sockfd == -1) {
if (eof_seen != NULL)
*eof_seen = true;
return (0);
}

FD_SET(sockfd, &rfd);
FD_SET(ipc->sockfd, &rfd);

error = select(sockfd + 1, &rfd, NULL, NULL, NULL);
error = select(ipc->sockfd + 1, &rfd, NULL, NULL, NULL);
} while (error == -1 && errno == EINTR);

return (error);
Expand Down
Loading

0 comments on commit 5e0b093

Please sign in to comment.