Skip to content

Commit

Permalink
change future destroy back to future reset in flux dtl recv
Browse files Browse the repository at this point in the history
  • Loading branch information
JaeseungYeom committed Feb 28, 2024
1 parent 24406a0 commit 7dc3bf3
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 7 deletions.
4 changes: 3 additions & 1 deletion docs/demos/ecp_feb_2023/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
DYAD_LIB_PATH = $(DYAD_INSTALL_PREFIX)/lib
DYAD_INCLUDE_PATH = $(DYAD_INSTALL_PREFIX)/include
#UCX_PATH = /p/gpfs1/ice4hpc/install
#UCXLIBS = -L$(UCX_PATH)/lib -Wl,-rpath=$(UCX_PATH)/lib -lucs -lucp

CFLAGS_LOC = -g -std=c11 -DDYAD_HAS_CONFIG=1 $(CFLAGS)
CPPFLAGS_LOC = -g -O3 -I. $(CPPFLAGS)
CXXFLAGS_LOC = -g -std=c++11 -DDYAD_HAS_CONFIG=1 -I$(DYAD_INCLUDE_PATH) $(CXXFLAGS)
CXXLIBS_LOC = -L$(DYAD_LIB_PATH) -Wl,-rpath=$(DYAD_LIB_PATH) -ldyad_fstream $(CXXLIBS)
CXXLIBS_LOC = -L$(DYAD_LIB_PATH) -Wl,-rpath=$(DYAD_LIB_PATH) -ldyad_fstream $(CXXLIBS) $(UCXLIBS)

all: c_prod c_cons cpp_prod cpp_cons

Expand Down
5 changes: 3 additions & 2 deletions src/dyad/core/dyad_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,15 @@ get_done:;
#ifdef DYAD_ENABLE_UCX_RMA
ctx->dtl_handle->get_buffer(ctx, 0, (void**)file_data);
ssize_t read_len = 0l;
memcpy (&read_len, *file_data, sizeof(ssize_t));
memcpy (&read_len, *file_data, sizeof(read_len));
if (read_len < 0l) {
*file_len = 0ul;
DYAD_LOG_DEBUG (ctx, "Not able to read from %s file", mdata->fpath);
rc = DYAD_RC_BADFIO;
} else {
*file_len = (size_t) read_len;
}
*file_data = ((char*)*file_data) + sizeof(ssize_t);
*file_data = ((char*)*file_data) + sizeof(read_len);
DYAD_LOG_INFO (ctx, "Read %zd bytes from %s file", *file_len, mdata->fpath);
#endif
DYAD_LOG_INFO (ctx, "Destroy the Flux future for the RPC\n");
Expand Down
2 changes: 1 addition & 1 deletion src/dyad/dtl/flux_dtl.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ dyad_rc_t dyad_dtl_flux_recv (const dyad_ctx_t* ctx, void** buf, size_t* buflen)
dyad_rc = DYAD_RC_OK;
finish_recv:
if (dtl_handle->f != NULL)
flux_future_destroy (dtl_handle->f);
flux_future_reset (dtl_handle->f);
DYAD_C_FUNCTION_UPDATE_INT ("tmp_buflen", tmp_buflen);
DYAD_C_FUNCTION_END();
return dyad_rc;
Expand Down
3 changes: 1 addition & 2 deletions src/dyad/modules/dyad.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ dyad_fetch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg,
goto fetch_error_wo_flock;
}
} else {
dyad_release_flock (mod_ctx->ctx, fd, &shared_lock);
close (fd);
goto fetch_error;
}
DYAD_LOG_DEBUG(mod_ctx->ctx, "Close RPC message stream with an ENODATA (%d) message", ENODATA);
if (flux_respond_error (h, msg, ENODATA, NULL) < 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/dyad/stream/dyad_stream_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void dyad_stream_core::init (const bool reinit)
m_ctx = m_ctx_mutable = dyad_ctx_get ();
log_info ("Stream core is initialized by env variables.");
} else {
log_info ("Steam core skips initialization as it has already been initialized.");
log_info ("Stream core skips initialization as it has already been initialized.");
}

// TODO figure out if we want to error if init fails
Expand Down

0 comments on commit 7dc3bf3

Please sign in to comment.