Skip to content

Commit

Permalink
Added fault-tolerance features to RMQ broker (#34).
Browse files Browse the repository at this point in the history
- Broker will restart if underlying connection is faulty

- Broker will send unacknowledged from previous connection (this could result in messages being received twice or more if errors happen with the wrong timing)

- Fixed a Static Initialization Order Fiasco (actually destruction fiasco) in ResourceManager and AMS DB

Signed-off-by: Loic Pottier <[email protected]>
  • Loading branch information
lpottier committed Feb 12, 2024
1 parent fac0442 commit 68cfa91
Show file tree
Hide file tree
Showing 18 changed files with 796 additions and 422 deletions.
2 changes: 2 additions & 0 deletions examples/app/eos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class EOS
outputs[3]);
}

virtual ~EOS() = default;

virtual void Eval(const int length,
const FPType *density,
const FPType *energy,
Expand Down
39 changes: 29 additions & 10 deletions src/AMSlib/AMS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include <vector>

#include "wf/resource_manager.hpp"
#include "wf/basedb.hpp"
#include "wf/workflow.hpp"
#include "wf/debug.h"

struct AMSWrap {
std::vector<std::pair<AMSDType, void *>> executors;
Expand Down Expand Up @@ -39,15 +41,11 @@ void _AMSExecute(AMSExecutor executor,
int outputDim,
MPI_Comm Comm = 0)
{
static std::once_flag flag;
std::call_once(flag, [&]() { ams::ResourceManager::init(); });

uint64_t index = reinterpret_cast<uint64_t>(executor);

if (index >= _amsWrap.executors.size())
throw std::runtime_error("AMS Executor identifier does not exist\n");

auto currExec = _amsWrap.executors[index];

if (currExec.first == AMSDType::Double) {
ams::AMSWorkflow<double> *dWF =
reinterpret_cast<ams::AMSWorkflow<double> *>(currExec.second);
Expand Down Expand Up @@ -80,6 +78,12 @@ extern "C" {

AMSExecutor AMSCreateExecutor(const AMSConfig config)
{
static std::once_flag flag;
std::call_once(flag, [&]() {
auto& rm = ams::ResourceManager::getInstance();
rm.init();
});

if (config.dType == Double) {
ams::AMSWorkflow<double> *dWF =
new ams::AMSWorkflow<double>(config.cBack,
Expand All @@ -94,7 +98,6 @@ AMSExecutor AMSCreateExecutor(const AMSConfig config)
config.pId,
config.wSize,
config.ePolicy);

_amsWrap.executors.push_back(
std::make_pair(config.dType, static_cast<void *>(dWF)));
return reinterpret_cast<AMSExecutor>(_amsWrap.executors.size() - 1L);
Expand All @@ -114,7 +117,6 @@ AMSExecutor AMSCreateExecutor(const AMSConfig config)
config.ePolicy);
_amsWrap.executors.push_back(
std::make_pair(config.dType, static_cast<void *>(sWF)));

return reinterpret_cast<AMSExecutor>(_amsWrap.executors.size() - 1L);
} else {
throw std::invalid_argument("Data type is not supported by AMSLib!");
Expand All @@ -139,6 +141,22 @@ void AMSExecute(AMSExecutor executor,
outputDim);
}

void AMSDestroyExecutor(AMSExecutor executor) {
uint64_t index = reinterpret_cast<uint64_t>(executor);
if (index >= _amsWrap.executors.size())
throw std::runtime_error("AMS Executor identifier does not exist\n");
auto currExec = _amsWrap.executors[index];

if (currExec.first == AMSDType::Double) {
delete reinterpret_cast<ams::AMSWorkflow<double> *>(currExec.second);
} else if (currExec.first == AMSDType::Single) {
delete reinterpret_cast<ams::AMSWorkflow<float> *>(currExec.second);
} else {
throw std::invalid_argument("Data type is not supported by AMSLib!");
return;
}
}

#ifdef __ENABLE_MPI__
void AMSDistributedExecute(AMSExecutor executor,
MPI_Comm Comm,
Expand All @@ -160,15 +178,16 @@ void AMSDistributedExecute(AMSExecutor executor,
}
#endif


const char *AMSGetAllocatorName(AMSResourceType device)
{
return std::move(ams::ResourceManager::getAllocatorName(device)).c_str();
auto& rm = ams::ResourceManager::getInstance();
return std::move(rm.getAllocatorName(device)).c_str();
}

void AMSSetAllocator(AMSResourceType resource, const char *alloc_name)
{
ams::ResourceManager::setAllocator(std::string(alloc_name), resource);
auto& rm = ams::ResourceManager::getInstance();
rm.setAllocator(std::string(alloc_name), resource);
}

#ifdef __cplusplus
Expand Down
19 changes: 11 additions & 8 deletions src/AMSlib/ml/hdcache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ class HDCache
TypeValue *lin_data =
data_handler::linearize_features(cache_location, ndata, inputs);
_add(ndata, lin_data);
ams::ResourceManager::deallocate(lin_data, cache_location);
auto& rm = ams::ResourceManager::getInstance();
rm.deallocate(lin_data, cache_location);
}

//! -----------------------------------------------------------------------
Expand All @@ -336,7 +337,8 @@ class HDCache
TypeValue *lin_data =
data_handler::linearize_features(cache_location, ndata, inputs);
_train(ndata, lin_data);
ams::ResourceManager::deallocate(lin_data, cache_location);
auto& rm = ams::ResourceManager::getInstance();
rm.deallocate(lin_data, cache_location);
}

//! ------------------------------------------------------------------------
Expand Down Expand Up @@ -395,7 +397,8 @@ class HDCache
TypeValue *lin_data =
data_handler::linearize_features(cache_location, ndata, inputs);
_evaluate(ndata, lin_data, is_acceptable);
ams::ResourceManager::deallocate(lin_data, cache_location);
auto& rm = ams::ResourceManager::getInstance();
rm.deallocate(lin_data, cache_location);
DBG(UQModule, "Done with evalution of uq");
}

Expand Down Expand Up @@ -471,12 +474,12 @@ class HDCache

const size_t knbrs = static_cast<size_t>(m_knbrs);
static const TypeValue ook = 1.0 / TypeValue(knbrs);

auto& rm = ams::ResourceManager::getInstance();
TypeValue *kdists =
ams::ResourceManager::allocate<TypeValue>(ndata * knbrs,
rm.allocate<TypeValue>(ndata * knbrs,
cache_location);
TypeIndex *kidxs =
ams::ResourceManager::allocate<TypeIndex>(ndata * knbrs,
rm.allocate<TypeIndex>(ndata * knbrs,
cache_location);

// query faiss
Expand Down Expand Up @@ -523,8 +526,8 @@ class HDCache
kdists, is_acceptable, ndata, knbrs, acceptable_error);
}

ams::ResourceManager::deallocate(kdists, cache_location);
ams::ResourceManager::deallocate(kidxs, cache_location);
rm.deallocate(kdists, cache_location);
rm.deallocate(kidxs, cache_location);
}

//! evaluate cache uncertainty when (data type != TypeValue)
Expand Down
5 changes: 3 additions & 2 deletions src/AMSlib/ml/uq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ class UQ
const size_t ndims = outputs.size();
std::vector<FPTypeValue *> outputs_stdev(ndims);
// TODO: Enable device-side allocation and predicate calculation.
auto& rm = ams::ResourceManager::getInstance();
for (int dim = 0; dim < ndims; ++dim)
outputs_stdev[dim] =
ams::ResourceManager::allocate<FPTypeValue>(totalElements,
rm.allocate<FPTypeValue>(totalElements,
AMSResourceType::HOST);

CALIPER(CALI_MARK_BEGIN("SURROGATE");)
Expand Down Expand Up @@ -110,7 +111,7 @@ class UQ
}

for (int dim = 0; dim < ndims; ++dim)
ams::ResourceManager::deallocate(outputs_stdev[dim],
rm.deallocate(outputs_stdev[dim],
AMSResourceType::HOST);
CALIPER(CALI_MARK_END("DELTAUQ");)
} else if (uqPolicy == AMSUQPolicy::FAISS_Mean ||
Expand Down
Loading

0 comments on commit 68cfa91

Please sign in to comment.