Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change future destroy back to future reset in flux dtl recv #113

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/demos/ecp_feb_2023/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
DYAD_LIB_PATH = $(DYAD_INSTALL_PREFIX)/lib
DYAD_INCLUDE_PATH = $(DYAD_INSTALL_PREFIX)/include
#UCX_PATH = /p/gpfs1/ice4hpc/install
#UCXLIBS = -L$(UCX_PATH)/lib -Wl,-rpath=$(UCX_PATH)/lib -lucs -lucp

CFLAGS_LOC = -g -std=c11 -DDYAD_HAS_CONFIG=1 $(CFLAGS)
CPPFLAGS_LOC = -g -O3 -I. $(CPPFLAGS)
CXXFLAGS_LOC = -g -std=c++11 -DDYAD_HAS_CONFIG=1 -I$(DYAD_INCLUDE_PATH) $(CXXFLAGS)
CXXLIBS_LOC = -L$(DYAD_LIB_PATH) -Wl,-rpath=$(DYAD_LIB_PATH) -ldyad_fstream $(CXXLIBS)
CXXLIBS_LOC = -L$(DYAD_LIB_PATH) -Wl,-rpath=$(DYAD_LIB_PATH) -ldyad_fstream $(CXXLIBS) $(UCXLIBS)

all: c_prod c_cons cpp_prod cpp_cons

Expand Down
1 change: 1 addition & 0 deletions src/dyad/core/dyad_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ get_done:;
memcpy (&read_len, *file_data, sizeof (read_len));
if (read_len < 0l) {
*file_len = 0ul;
DYAD_LOG_DEBUG (ctx, "Not able to read from %s file", mdata->fpath);
rc = DYAD_RC_BADFIO;
} else {
*file_len = (size_t) read_len;
Expand Down
2 changes: 1 addition & 1 deletion src/dyad/dtl/flux_dtl.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ dyad_rc_t dyad_dtl_flux_recv (const dyad_ctx_t* ctx, void** buf, size_t* buflen)
dyad_rc = DYAD_RC_OK;
finish_recv:
if (dtl_handle->f != NULL)
flux_future_destroy (dtl_handle->f);
flux_future_reset (dtl_handle->f);
DYAD_C_FUNCTION_UPDATE_INT ("tmp_buflen", tmp_buflen);
DYAD_C_FUNCTION_END();
return dyad_rc;
Expand Down
8 changes: 4 additions & 4 deletions src/dyad/dtl/ucx_dtl.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ static inline ucs_status_ptr_t ucx_recv_no_wait (const dyad_ctx_t* ctx,

#ifndef DYAD_ENABLE_UCX_RMA
ucx_recv_no_wait_done:;
#endif // DYAD_ENABLE_UCX_
#endif // DYAD_ENABLE_UCX_RMA
DYAD_C_FUNCTION_END();
return stat_ptr;
}
Expand All @@ -428,7 +428,7 @@ static dyad_rc_t ucx_warmup (const dyad_ctx_t* ctx)
#ifndef DYAD_ENABLE_UCX_RMA
ucs_status_ptr_t recv_stat_ptr = NULL;
size_t recv_buf_len = 0;
#endif // DYAD_ENABLE_UCX_
#endif // DYAD_ENABLE_UCX_RMA
ucs_status_t send_status = UCS_OK;
ucs_status_t recv_status = UCS_OK;
DYAD_LOG_INFO (ctx, "Starting warmup for UCX DTL");
Expand Down Expand Up @@ -470,7 +470,7 @@ static dyad_rc_t ucx_warmup (const dyad_ctx_t* ctx)
DYAD_LOG_INFO (ctx, "Waiting on warmup recv to finish");
recv_status =
dyad_ucx_request_wait (ctx, recv_stat_ptr);
#endif // DYAD_ENABLE_UCX_
#endif // DYAD_ENABLE_UCX_RMA
DYAD_LOG_INFO (ctx, "Waiting on warmup send to finish");
send_status =
dyad_ucx_request_wait (ctx, send_stat_ptr);
Expand All @@ -487,7 +487,7 @@ static dyad_rc_t ucx_warmup (const dyad_ctx_t* ctx)
DYAD_LOG_INFO (ctx, "Communication succeeded (according to UCX)");
#ifndef DYAD_ENABLE_UCX_RMA
assert (recv_buf_len == 1);
#endif // DYAD_ENABLE_UCX_
#endif // DYAD_ENABLE_UCX_RMA
DYAD_LOG_INFO (ctx, "Correct amount of data received in warmup");
free (recv_buf);
rc = DYAD_RC_OK;
Expand Down
3 changes: 1 addition & 2 deletions src/dyad/modules/dyad.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ dyad_fetch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg,
goto fetch_error_wo_flock;
}
} else {
dyad_release_flock (mod_ctx->ctx, fd, &shared_lock);
close (fd);
goto fetch_error;
}
DYAD_LOG_DEBUG(mod_ctx->ctx, "Close RPC message stream with an ENODATA (%d) message", ENODATA);
if (flux_respond_error (h, msg, ENODATA, NULL) < 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/dyad/stream/dyad_stream_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void dyad_stream_core::init (const bool reinit)
m_ctx = m_ctx_mutable = dyad_ctx_get ();
log_info ("Stream core is initialized by env variables.");
} else {
log_info ("Steam core skips initialization as it has already been initialized.");
log_info ("Stream core skips initialization as it has already been initialized.");
}

// TODO figure out if we want to error if init fails
Expand Down
Loading