diff --git a/docs/demos/ecp_feb_2023/Makefile b/docs/demos/ecp_feb_2023/Makefile index 9a23caf4..bd089fdd 100644 --- a/docs/demos/ecp_feb_2023/Makefile +++ b/docs/demos/ecp_feb_2023/Makefile @@ -1,10 +1,12 @@ DYAD_LIB_PATH = $(DYAD_INSTALL_PREFIX)/lib DYAD_INCLUDE_PATH = $(DYAD_INSTALL_PREFIX)/include +#UCX_PATH = /p/gpfs1/ice4hpc/install +#UCXLIBS = -L$(UCX_PATH)/lib -Wl,-rpath=$(UCX_PATH)/lib -lucs -lucp CFLAGS_LOC = -g -std=c11 -DDYAD_HAS_CONFIG=1 $(CFLAGS) CPPFLAGS_LOC = -g -O3 -I. $(CPPFLAGS) CXXFLAGS_LOC = -g -std=c++11 -DDYAD_HAS_CONFIG=1 -I$(DYAD_INCLUDE_PATH) $(CXXFLAGS) -CXXLIBS_LOC = -L$(DYAD_LIB_PATH) -Wl,-rpath=$(DYAD_LIB_PATH) -ldyad_fstream $(CXXLIBS) +CXXLIBS_LOC = -L$(DYAD_LIB_PATH) -Wl,-rpath=$(DYAD_LIB_PATH) -ldyad_fstream $(CXXLIBS) $(UCXLIBS) all: c_prod c_cons cpp_prod cpp_cons diff --git a/src/dyad/core/dyad_core.c b/src/dyad/core/dyad_core.c index f7d706fe..df688d05 100644 --- a/src/dyad/core/dyad_core.c +++ b/src/dyad/core/dyad_core.c @@ -456,14 +456,15 @@ get_done:; #ifdef DYAD_ENABLE_UCX_RMA ctx->dtl_handle->get_buffer(ctx, 0, (void**)file_data); ssize_t read_len = 0l; - memcpy (&read_len, *file_data, sizeof(ssize_t)); + memcpy (&read_len, *file_data, sizeof(read_len)); if (read_len < 0l) { *file_len = 0ul; + DYAD_LOG_DEBUG (ctx, "Not able to read from %s file", mdata->fpath); rc = DYAD_RC_BADFIO; } else { *file_len = (size_t) read_len; } - *file_data = ((char*)*file_data) + sizeof(ssize_t); + *file_data = ((char*)*file_data) + sizeof(read_len); DYAD_LOG_INFO (ctx, "Read %zd bytes from %s file", *file_len, mdata->fpath); #endif DYAD_LOG_INFO (ctx, "Destroy the Flux future for the RPC\n"); diff --git a/src/dyad/dtl/flux_dtl.c b/src/dyad/dtl/flux_dtl.c index 638c7634..4083f5cb 100644 --- a/src/dyad/dtl/flux_dtl.c +++ b/src/dyad/dtl/flux_dtl.c @@ -217,7 +217,7 @@ dyad_rc_t dyad_dtl_flux_recv (const dyad_ctx_t* ctx, void** buf, size_t* buflen) dyad_rc = DYAD_RC_OK; finish_recv: if (dtl_handle->f != NULL) - flux_future_destroy (dtl_handle->f); + flux_future_reset (dtl_handle->f); DYAD_C_FUNCTION_UPDATE_INT ("tmp_buflen", tmp_buflen); DYAD_C_FUNCTION_END(); return dyad_rc; diff --git a/src/dyad/modules/dyad.c b/src/dyad/modules/dyad.c index c919e12c..c1e424be 100644 --- a/src/dyad/modules/dyad.c +++ b/src/dyad/modules/dyad.c @@ -235,8 +235,7 @@ dyad_fetch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, goto fetch_error_wo_flock; } } else { - dyad_release_flock (mod_ctx->ctx, fd, &shared_lock); - close (fd); + goto fetch_error; } DYAD_LOG_DEBUG(mod_ctx->ctx, "Close RPC message stream with an ENODATA (%d) message", ENODATA); if (flux_respond_error (h, msg, ENODATA, NULL) < 0) { diff --git a/src/dyad/stream/dyad_stream_core.cpp b/src/dyad/stream/dyad_stream_core.cpp index 4faaa08e..c34f9cc5 100644 --- a/src/dyad/stream/dyad_stream_core.cpp +++ b/src/dyad/stream/dyad_stream_core.cpp @@ -107,7 +107,7 @@ void dyad_stream_core::init (const bool reinit) m_ctx = m_ctx_mutable = dyad_ctx_get (); log_info ("Stream core is initialized by env variables."); } else { - log_info ("Steam core skips initialization as it has already been initialized."); + log_info ("Stream core skips initialization as it has already been initialized."); } // TODO figure out if we want to error if init fails