Skip to content

Commit

Permalink
Fixes request waiting for UCX DTL
Browse files Browse the repository at this point in the history
  • Loading branch information
ilumsden committed Jul 27, 2023
1 parent 0c0d579 commit c27802b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
19 changes: 14 additions & 5 deletions src/dtl/ucx_dtl.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,21 @@ static ucs_status_t dyad_ucx_request_wait(dyad_dtl_ucx_t *dtl_handle,
// is scheduled, but not yet completed.
if (UCS_PTR_IS_PTR(request))
{
FLUX_LOG_ERR (dtl_handle->h, "request is PTR\n");
// Spin lock until the request is completed
// The spin lock shouldn't be costly (performance-wise)
// because the wait should always come directly after other UCX calls
// that minimize the size of the worker's event queue.
// In other words, prior UCX calls should mean that this loop only runs
// a couple of times at most.
while (request->completed != 1)
{
do {
ucp_worker_progress(dtl_handle->ucx_worker);
}
// Get the final status of the communication operation
final_request_status = ucp_request_check_status(request);
// usleep(100);
// Get the final status of the communication operation
final_request_status = ucp_request_check_status(request);
} while (final_request_status == UCS_INPROGRESS);
FLUX_LOG_ERR (dtl_handle->h, "request COMPLETED? %s", request->completed == 1 ? "YES" : "NO");
FLUX_LOG_ERR (dtl_handle->h, "Final request status: %d\n", final_request_status);
// Free and deallocate the request object
ucp_request_free(request);
return final_request_status;
Expand All @@ -92,11 +95,13 @@ static ucs_status_t dyad_ucx_request_wait(dyad_dtl_ucx_t *dtl_handle,
// object for the error.
else if (UCS_PTR_IS_ERR(request))
{
FLUX_LOG_ERR (dtl_handle->h, "request is ERR\n");
return UCS_PTR_STATUS(request);
}
// If 'request' is neither a request handle nor an error, then
// the communication operation immediately completed successfully.
// So, we simply set the status to UCS_OK
FLUX_LOG_ERR (dtl_handle->h, "request is OK\n");
return UCS_OK;
}

Expand Down Expand Up @@ -350,6 +355,10 @@ dyad_rc_t dyad_dtl_ucx_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char
FLUX_LOG_INFO (dtl_handle->h, "Decoding consumer UCP address using base64\n");
dtl_handle->addr_len = base64_decoded_length(enc_addr_len);
dtl_handle->consumer_address = (ucp_address_t*) malloc(dtl_handle->addr_len);
if (dtl_handle->consumer_address == NULL) {
FLUX_LOG_ERR (dtl_handle->h, "Could not allocate memory for consumer address");
return DYAD_RC_SYSFAIL;
}
decoded_len = base64_decode_using_maps (&base64_maps_rfc4648,
(char*)dtl_handle->consumer_address, dtl_handle->addr_len,
enc_addr, enc_addr_len);
Expand Down
4 changes: 4 additions & 0 deletions src/modules/dyad.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ static dyad_mod_ctx_t *getctx (flux_t *h)

if (!ctx) {
ctx = (dyad_mod_ctx_t *) malloc (sizeof (*ctx));
if (ctx == NULL) {
FLUX_LOG_ERR (h, "DYAD_MOD: could not allocate memory for context");
goto getctx_error;
}
ctx->h = h;
ctx->debug = false;
ctx->handlers = NULL;
Expand Down

0 comments on commit c27802b

Please sign in to comment.