From 32b852ff3e1b75031f73f1d2ca15a4fe666bb34f Mon Sep 17 00:00:00 2001 From: Jae-Seung Yeom Date: Wed, 28 Feb 2024 15:09:04 -0800 Subject: [PATCH] change future destroy back to future reset in flux dtl recv --- docs/demos/ecp_feb_2023/Makefile | 4 +++- src/dyad/core/dyad_core.c | 1 + src/dyad/dtl/flux_dtl.c | 2 +- src/dyad/dtl/ucx_dtl.c | 8 ++++---- src/dyad/modules/dyad.c | 3 +-- src/dyad/stream/dyad_stream_core.cpp | 2 +- 6 files changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/demos/ecp_feb_2023/Makefile b/docs/demos/ecp_feb_2023/Makefile index 9a23caf4..bd089fdd 100644 --- a/docs/demos/ecp_feb_2023/Makefile +++ b/docs/demos/ecp_feb_2023/Makefile @@ -1,10 +1,12 @@ DYAD_LIB_PATH = $(DYAD_INSTALL_PREFIX)/lib DYAD_INCLUDE_PATH = $(DYAD_INSTALL_PREFIX)/include +#UCX_PATH = /p/gpfs1/ice4hpc/install +#UCXLIBS = -L$(UCX_PATH)/lib -Wl,-rpath=$(UCX_PATH)/lib -lucs -lucp CFLAGS_LOC = -g -std=c11 -DDYAD_HAS_CONFIG=1 $(CFLAGS) CPPFLAGS_LOC = -g -O3 -I. $(CPPFLAGS) CXXFLAGS_LOC = -g -std=c++11 -DDYAD_HAS_CONFIG=1 -I$(DYAD_INCLUDE_PATH) $(CXXFLAGS) -CXXLIBS_LOC = -L$(DYAD_LIB_PATH) -Wl,-rpath=$(DYAD_LIB_PATH) -ldyad_fstream $(CXXLIBS) +CXXLIBS_LOC = -L$(DYAD_LIB_PATH) -Wl,-rpath=$(DYAD_LIB_PATH) -ldyad_fstream $(CXXLIBS) $(UCXLIBS) all: c_prod c_cons cpp_prod cpp_cons diff --git a/src/dyad/core/dyad_core.c b/src/dyad/core/dyad_core.c index a3e97395..c79d5d1f 100644 --- a/src/dyad/core/dyad_core.c +++ b/src/dyad/core/dyad_core.c @@ -459,6 +459,7 @@ get_done:; memcpy (&read_len, *file_data, sizeof (read_len)); if (read_len < 0l) { *file_len = 0ul; + DYAD_LOG_DEBUG (ctx, "Not able to read from %s file", mdata->fpath); rc = DYAD_RC_BADFIO; } else { *file_len = (size_t) read_len; diff --git a/src/dyad/dtl/flux_dtl.c b/src/dyad/dtl/flux_dtl.c index ae6d0b15..80c7aeda 100644 --- a/src/dyad/dtl/flux_dtl.c +++ b/src/dyad/dtl/flux_dtl.c @@ -217,7 +217,7 @@ dyad_rc_t dyad_dtl_flux_recv (const dyad_ctx_t* ctx, void** buf, size_t* buflen) dyad_rc = DYAD_RC_OK; finish_recv: if (dtl_handle->f != NULL) - flux_future_destroy (dtl_handle->f); + flux_future_reset (dtl_handle->f); DYAD_C_FUNCTION_UPDATE_INT ("tmp_buflen", tmp_buflen); DYAD_C_FUNCTION_END(); return dyad_rc; diff --git a/src/dyad/dtl/ucx_dtl.c b/src/dyad/dtl/ucx_dtl.c index 988ba04f..23b28fb4 100644 --- a/src/dyad/dtl/ucx_dtl.c +++ b/src/dyad/dtl/ucx_dtl.c @@ -413,7 +413,7 @@ static inline ucs_status_ptr_t ucx_recv_no_wait (const dyad_ctx_t* ctx, #ifndef DYAD_ENABLE_UCX_RMA ucx_recv_no_wait_done:; -#endif // DYAD_ENABLE_UCX_ +#endif // DYAD_ENABLE_UCX_RMA DYAD_C_FUNCTION_END(); return stat_ptr; } @@ -428,7 +428,7 @@ static dyad_rc_t ucx_warmup (const dyad_ctx_t* ctx) #ifndef DYAD_ENABLE_UCX_RMA ucs_status_ptr_t recv_stat_ptr = NULL; size_t recv_buf_len = 0; -#endif // DYAD_ENABLE_UCX_ +#endif // DYAD_ENABLE_UCX_RMA ucs_status_t send_status = UCS_OK; ucs_status_t recv_status = UCS_OK; DYAD_LOG_INFO (ctx, "Starting warmup for UCX DTL"); @@ -470,7 +470,7 @@ static dyad_rc_t ucx_warmup (const dyad_ctx_t* ctx) DYAD_LOG_INFO (ctx, "Waiting on warmup recv to finish"); recv_status = dyad_ucx_request_wait (ctx, recv_stat_ptr); -#endif // DYAD_ENABLE_UCX_ +#endif // DYAD_ENABLE_UCX_RMA DYAD_LOG_INFO (ctx, "Waiting on warmup send to finish"); send_status = dyad_ucx_request_wait (ctx, send_stat_ptr); @@ -487,7 +487,7 @@ static dyad_rc_t ucx_warmup (const dyad_ctx_t* ctx) DYAD_LOG_INFO (ctx, "Communication succeeded (according to UCX)"); #ifndef DYAD_ENABLE_UCX_RMA assert (recv_buf_len == 1); -#endif // DYAD_ENABLE_UCX_ +#endif // DYAD_ENABLE_UCX_RMA DYAD_LOG_INFO (ctx, "Correct amount of data received in warmup"); free (recv_buf); rc = DYAD_RC_OK; diff --git a/src/dyad/modules/dyad.c b/src/dyad/modules/dyad.c index 1b032405..f82157cc 100644 --- a/src/dyad/modules/dyad.c +++ b/src/dyad/modules/dyad.c @@ -230,8 +230,7 @@ dyad_fetch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, goto fetch_error_wo_flock; } } else { - dyad_release_flock (mod_ctx->ctx, fd, &shared_lock); - close (fd); + goto fetch_error; } DYAD_LOG_DEBUG(mod_ctx->ctx, "Close RPC message stream with an ENODATA (%d) message", ENODATA); if (flux_respond_error (h, msg, ENODATA, NULL) < 0) { diff --git a/src/dyad/stream/dyad_stream_core.cpp b/src/dyad/stream/dyad_stream_core.cpp index e3f74560..900461bf 100644 --- a/src/dyad/stream/dyad_stream_core.cpp +++ b/src/dyad/stream/dyad_stream_core.cpp @@ -108,7 +108,7 @@ void dyad_stream_core::init (const bool reinit) m_ctx = m_ctx_mutable = dyad_ctx_get (); log_info ("Stream core is initialized by env variables."); } else { - log_info ("Steam core skips initialization as it has already been initialized."); + log_info ("Stream core skips initialization as it has already been initialized."); } // TODO figure out if we want to error if init fails