Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/allocators #18

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 131 additions & 20 deletions examples/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <mfem.hpp>
#include <random>
#include <string>
#include <umpire/strategy/QuickPool.hpp>
#include <unordered_set>
#include <vector>

Expand All @@ -24,6 +25,94 @@

#include "AMS.h"

void printMemory(std::unordered_set<std::string> &allocators)
{
auto &rm = umpire::ResourceManager::getInstance();
for (auto AN : allocators) {
auto alloc = rm.getAllocator(AN);
size_t wm = alloc.getHighWatermark();
size_t cs = alloc.getCurrentSize();
size_t as = alloc.getActualSize();
std::cout << "Allocator '" << AN << "' High WaterMark:" << wm
<< " Current Size:" << cs << " Actual Size:" << as << "\n";
}
}


void createUmpirePool(std::string parent_name, std::string pool_name)
{
std::cout << "Pool Name " << pool_name << "Parent Allocation " << parent_name
<< "\n";
auto &rm = umpire::ResourceManager::getInstance();
auto alloc_resource = rm.makeAllocator<umpire::strategy::QuickPool, true>(
pool_name, rm.getAllocator(parent_name));
}

std::unordered_set<std::string> createMemoryAllocators(
std::string pool,
std::string &physics_host_alloc,
std::string &physics_device_alloc,
std::string &physics_pinned_alloc,
std::string &ams_host_alloc,
std::string &ams_device_alloc,
std::string &ams_pinned_alloc)
{
std::unordered_set<std::string> allocator_names;
if (pool == "default") {
physics_host_alloc = ams_host_alloc = "HOST";
allocator_names.insert(ams_host_alloc);
#ifdef __ENABLE_CUDA__
physics_device_alloc = ams_device_alloc = "DEVICE";
allocator_names.insert(ams_device_alloc);
physics_pinned_alloc = ams_pinned_alloc = "PINNED";
allocator_names.insert(ams_pinned_alloc);
#endif
} else if (pool == "split") {
physics_host_alloc = "phys-host";
createUmpirePool("HOST", "phys-host");
allocator_names.insert(physics_host_alloc);

ams_host_alloc = "ams-host";
createUmpirePool("HOST", ams_host_alloc);
allocator_names.insert(ams_host_alloc);

#ifdef __ENABLE_CUDA__
physics_device_alloc = "phys-device";
createUmpirePool("DEVICE", physics_device_alloc);
allocator_names.insert(physics_device_alloc);

physics_pinned_alloc = "phys-pinned";
createUmpirePool("PINNED", physics_pinned_alloc);
allocator_names.insert(physics_pinned_alloc);

ams_device_alloc = "ams-device";
createUmpirePool("DEVICE", ams_device_alloc);
allocator_names.insert(ams_device_alloc);

ams_pinned_alloc = "ams-pinned";
createUmpirePool("PINNED", ams_pinned_alloc);
allocator_names.insert(ams_pinned_alloc);
#endif
} else if (pool == "same") {
physics_host_alloc = ams_host_alloc = "common-host";
createUmpirePool("HOST", "common-host");
allocator_names.insert(physics_host_alloc);
#ifdef __ENABLE_CUDA__
physics_device_alloc = ams_device_alloc = "common-device";
createUmpirePool("DEVICE", "common-device");
allocator_names.insert(ams_device_alloc);
physics_pinned_alloc = ams_pinned_alloc = "common-pinned";
createUmpirePool("PINNED", "common-pinned");
allocator_names.insert(ams_pinned_alloc);
#endif
} else {
std::cout << "Stategy is " << pool << "\n";
throw std::runtime_error("Pool strategy does not exist\n");
}
return std::move(allocator_names);
}


using TypeValue = double;
using mfem::ForallWrap;

Expand Down Expand Up @@ -96,6 +185,7 @@ int main(int argc, char **argv)
TypeValue avg = 0.5;
TypeValue stdDev = 0.2;
bool reqDB = false;
const char *pool = "default";

#ifdef __ENABLE_DB__
reqDB = true;
Expand Down Expand Up @@ -203,6 +293,14 @@ int main(int argc, char **argv)
args.AddOption(
&verbose, "-v", "--verbose", "-qu", "--quiet", "Print extra stuff");

args.AddOption(&pool,
"-ptype",
"--pool-type",
"How to assign memory pools to AMSlib:\n"
"\t 'default' Use the default Umpire pool\n"
"\t 'split' provide a separate pool to AMSlib\n"
"\t 'same': assign the same with physics to AMS\n");

// -------------------------------------------------------------------------
// parse arguments
// -------------------------------------------------------------------------
Expand Down Expand Up @@ -305,31 +403,44 @@ int main(int argc, char **argv)

std::cerr << "Rank:" << rId << " Threshold " << threshold << "\n";

// -------------------------------------------------------------------------
// setup data allocators
// -------------------------------------------------------------------------
AMSSetupAllocator(AMSResourceType::HOST);
if (use_device) {
AMSSetupAllocator(AMSResourceType::DEVICE);
AMSSetupAllocator(AMSResourceType::PINNED);
AMSSetDefaultAllocator(AMSResourceType::DEVICE);
} else {
AMSSetDefaultAllocator(AMSResourceType::HOST);
}

// -------------------------------------------------------------------------
// setup mfem memory manager
// -------------------------------------------------------------------------
// hardcoded names!
const std::string &alloc_name_host(
AMSGetAllocatorName(AMSResourceType::HOST));
const std::string &alloc_name_device(
AMSGetAllocatorName(AMSResourceType::DEVICE));

mfem::MemoryManager::SetUmpireHostAllocatorName(alloc_name_host.c_str());
std::string physics_host_alloc;
std::string physics_device_alloc;
std::string physics_pinned_alloc;

std::string ams_host_alloc;
std::string ams_device_alloc;
std::string ams_pinned_alloc;

auto allocator_names = createMemoryAllocators(std::string(pool),
physics_host_alloc,
physics_device_alloc,
physics_pinned_alloc,
ams_host_alloc,
ams_device_alloc,
ams_pinned_alloc);


mfem::MemoryManager::SetUmpireHostAllocatorName(physics_host_alloc.c_str());
if (use_device) {
mfem::MemoryManager::SetUmpireDeviceAllocatorName(
alloc_name_device.c_str());
physics_device_alloc.c_str());
}


// When we are not allocating from parent/root umpire allocator
// we need to inform AMS about the pool allocators.
if (strcmp(pool, "default") != 0) {
AMSSetAllocator(AMSResourceType::HOST, ams_host_alloc.c_str());

if (use_device) {
AMSSetAllocator(AMSResourceType::DEVICE, ams_device_alloc.c_str());
AMSSetAllocator(AMSResourceType::PINNED, ams_pinned_alloc.c_str());
}
}

mfem::Device::SetMemoryTypes(mfem::MemoryType::HOST_UMPIRE,
Expand All @@ -340,7 +451,6 @@ int main(int argc, char **argv)
device.Print();
std::cout << std::endl;

//AMSResourceInfo();

// -------------------------------------------------------------------------
// setup indicators
Expand Down Expand Up @@ -399,7 +509,7 @@ int main(int argc, char **argv)
if (eos_name == std::string("ideal_gas")) {
eoses[mat_idx] = new IdealGas(1.6, 1.4);
} else if (eos_name == std::string("constant_host")) {
eoses[mat_idx] = new ConstantEOSOnHost(alloc_name_host.c_str(), 1.0);
eoses[mat_idx] = new ConstantEOSOnHost(physics_host_alloc.c_str(), 1.0);
} else {
std::cerr << "unknown eos `" << eos_name << "'" << std::endl;
return 1;
Expand Down Expand Up @@ -696,6 +806,7 @@ int main(int argc, char **argv)
}
CALIPER(CALI_MARK_END("Cycle");)
MPI_CALL(MPI_Barrier(MPI_COMM_WORLD));
printMemory(allocator_names);
koparasy marked this conversation as resolved.
Show resolved Hide resolved
}
#ifdef USE_AMS
delete[] workflow;
Expand Down
60 changes: 24 additions & 36 deletions src/AMS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
*/

#include "AMS.h"

#include <vector>

#include "AMS.h"
#include "wf/resource_manager.hpp"
#include "wf/workflow.hpp"

struct AMSWrap{
struct AMSWrap {
std::vector<std::pair<AMSDType, void *>> executors;
~AMSWrap() {
for ( auto E : executors ){
if ( E.second != nullptr ){
if ( E.first == AMSDType::Double ){
delete reinterpret_cast<ams::AMSWorkflow<double> *> (E.second);
} else{
delete reinterpret_cast<ams::AMSWorkflow<float> *> (E.second);
~AMSWrap()
{
for (auto E : executors) {
if (E.second != nullptr) {
if (E.first == AMSDType::Double) {
delete reinterpret_cast<ams::AMSWorkflow<double> *>(E.second);
} else {
delete reinterpret_cast<ams::AMSWorkflow<float> *>(E.second);
}
}
}
Expand All @@ -36,6 +39,9 @@ void _AMSExecute(AMSExecutor executor,
int outputDim,
MPI_Comm Comm = 0)
{
static std::once_flag flag;
std::call_once(flag, [&]() { ams::ResourceManager::init(); });

uint64_t index = reinterpret_cast<uint64_t>(executor);

if (index >= _amsWrap.executors.size())
Expand Down Expand Up @@ -81,15 +87,16 @@ AMSExecutor AMSCreateExecutor(const AMSConfig config)
config.SPath,
config.DBPath,
config.dbType,
config.device == AMSResourceType::HOST,
config.device,
config.threshold,
config.uqPolicy,
config.nClusters,
config.pId,
config.wSize,
config.ePolicy);

_amsWrap.executors.push_back(std::make_pair(config.dType, static_cast<void *>(dWF)));
_amsWrap.executors.push_back(
std::make_pair(config.dType, static_cast<void *>(dWF)));
return reinterpret_cast<AMSExecutor>(_amsWrap.executors.size() - 1L);
} else if (config.dType == AMSDType::Single) {
ams::AMSWorkflow<float> *sWF =
Expand All @@ -98,14 +105,15 @@ AMSExecutor AMSCreateExecutor(const AMSConfig config)
config.SPath,
config.DBPath,
config.dbType,
config.device == AMSResourceType::HOST,
config.device,
static_cast<float>(config.threshold),
config.uqPolicy,
config.nClusters,
config.pId,
config.wSize,
config.ePolicy);
_amsWrap.executors.push_back(std::make_pair(config.dType, static_cast<void *>(sWF)));
_amsWrap.executors.push_back(
std::make_pair(config.dType, static_cast<void *>(sWF)));

return reinterpret_cast<AMSExecutor>(_amsWrap.executors.size() - 1L);
} else {
Expand Down Expand Up @@ -155,32 +163,12 @@ void AMSDistributedExecute(AMSExecutor executor,

const char *AMSGetAllocatorName(AMSResourceType device)
{
if (device == AMSResourceType::HOST) {
return ams::ResourceManager::getHostAllocatorName();
} else if (device == AMSResourceType::DEVICE) {
return ams::ResourceManager::getDeviceAllocatorName();
}

throw std::runtime_error("requested Device Allocator does not exist");

return nullptr;
}

void AMSSetupAllocator(const AMSResourceType Resource)
{
ams::ResourceManager::setup(Resource);
}

void AMSResourceInfo() { ams::ResourceManager::list_allocators(); }

int AMSGetLocationId(void *ptr)
{
return ams::ResourceManager::getDataAllocationId(ptr);
return std::move(ams::ResourceManager::getAllocatorName(device)).c_str();
}

void AMSSetDefaultAllocator(const AMSResourceType device)
void AMSSetAllocator(AMSResourceType resource, const char *alloc_name)
{
ams::ResourceManager::setDefaultDataAllocator(device);
ams::ResourceManager::setAllocator(std::string(alloc_name), resource);
}

#ifdef __cplusplus
Expand Down
9 changes: 3 additions & 6 deletions src/include/AMS.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ typedef enum { UBALANCED = 0, BALANCED } AMSExecPolicy;
typedef enum { None = 0, CSV, REDIS, HDF5, RMQ } AMSDBType;

typedef enum {
FAISSMean =0,
FAISSMean = 0,
FAISSMax,
DeltaUQ // Not supported
DeltaUQ // Not supported
} AMSUQPolicy;

typedef struct ams_conf {
Expand Down Expand Up @@ -105,11 +105,8 @@ void AMSDestroyExecutor(AMSExecutor executor);
int AMSSetCommunicator(MPI_Comm Comm);
#endif

void AMSSetAllocator(AMSResourceType resource, const char *alloc_name);
const char *AMSGetAllocatorName(AMSResourceType device);
void AMSSetupAllocator(const AMSResourceType device);
void AMSSetDefaultAllocator(const AMSResourceType device);
void AMSResourceInfo();
int AMSGetLocationId(void *ptr);

#ifdef __cplusplus
}
Expand Down
Loading