diff --git a/examples/main.cpp b/examples/main.cpp index e53bfcc4..45148722 100644 --- a/examples/main.cpp +++ b/examples/main.cpp @@ -7,9 +7,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -182,14 +184,18 @@ int run(const char *device_name, dbType = AMSDBType::RMQ; } - AMSUQPolicy uq_policy = (std::strcmp(uq_policy_opt, "max") == 0) - ? AMSUQPolicy::FAISSMax - : AMSUQPolicy::FAISSMean; + AMSUQPolicy uq_policy; - if (uq_policy != AMSUQPolicy::FAISSMax) - uq_policy = ((std::strcmp(uq_policy_opt, "deltauq") == 0)) - ? AMSUQPolicy::DeltaUQ - : AMSUQPolicy::FAISSMean; + if (strcmp(uq_policy_opt, "faiss-max") == 0) + uq_policy = AMSUQPolicy::FAISS_Max; + else if (strcmp(uq_policy_opt, "faiss-mean") == 0) + uq_policy = AMSUQPolicy::FAISS_Mean; + else if (strcmp(uq_policy_opt, "deltauq-max") == 0) + uq_policy = AMSUQPolicy::DeltaUQ_Max; + else if (strcmp(uq_policy_opt, "deltauq-mean") == 0) + uq_policy = AMSUQPolicy::DeltaUQ_Mean; + else + throw std::runtime_error("Invalid UQ policy"); // set up a randomization seed srand(seed + rId); @@ -671,7 +677,7 @@ int main(int argc, char **argv) const char *precision_opt = "double"; AMSDType precision = AMSDType::Double; - const char *uq_policy_opt = "mean"; + const char *uq_policy_opt = "faiss-mean"; int k_nearest = 5; int seed = 0; @@ -795,11 +801,14 @@ int main(int argc, char **argv) "-uq", "--uqtype", "Types of UQ to select from: \n" - "\t 'mean' Uncertainty is computed in comparison against the " + "\t 'faiss-mean' Uncertainty is computed in comparison " + "against the " "mean distance of k-nearest neighbors\n" - "\t 'max': Uncertainty is computed in comparison with the " + "\t 'faiss-max': Uncertainty is computed in comparison with " + "the " "k'st cluster \n" - "\t 'deltauq': Uncertainty through DUQ (not supported)\n"); + "\t 'deltauq-mean': Uncertainty through DUQ using mean\n" + "\t 'deltauq-max': Uncertainty through DUQ using max\n"); args.AddOption( &verbose, "-v", "--verbose", "-qu", "--quiet", "Print extra stuff"); diff --git a/src/include/AMS.h b/src/include/AMS.h index 794c4eef..999de8be 100644 --- a/src/include/AMS.h +++ b/src/include/AMS.h @@ -57,10 +57,14 @@ typedef enum { UBALANCED = 0, BALANCED } AMSExecPolicy; typedef enum { None = 0, CSV, REDIS, HDF5, RMQ } AMSDBType; +// TODO: create a cleaner interface that separates UQ type (FAISS, DeltaUQ) with policy (max, mean). typedef enum { - FAISSMean = 0, - FAISSMax, - DeltaUQ // Not supported + AMSUQPolicy_BEGIN = 0, + FAISS_Mean, + FAISS_Max, + DeltaUQ_Mean, + DeltaUQ_Max, + AMSUQPolicy_END } AMSUQPolicy; typedef struct ams_conf { diff --git a/src/ml/hdcache.hpp b/src/ml/hdcache.hpp index 00e67f2c..81d5f882 100644 --- a/src/ml/hdcache.hpp +++ b/src/ml/hdcache.hpp @@ -69,7 +69,7 @@ class HDCache const bool m_use_random; const int m_knbrs = 0; - const AMSUQPolicy m_policy = AMSUQPolicy::FAISSMean; + const AMSUQPolicy m_policy = AMSUQPolicy::FAISS_Mean; AMSResourceType cache_location; @@ -209,6 +209,11 @@ class HDCache return cache; } + if (uqPolicy != AMSUQPolicy::FAISS_Mean && + uqPolicy != AMSUQPolicy::FAISS_Max) + THROW(std::invalid_argument, + "Invalid UQ policy for hdcache" + std::to_string(uqPolicy)); + DBG(UQModule, "Generating new cache under (%s)", cache_path.c_str()) std::shared_ptr> new_cache = std::shared_ptr>(new HDCache( @@ -224,7 +229,7 @@ class HDCache { static std::string random_path("random"); std::shared_ptr> cache = find_cache( - random_path, resource, AMSUQPolicy::FAISSMean, -1, threshold); + random_path, resource, AMSUQPolicy::FAISS_Mean, -1, threshold); if (cache) { DBG(UQModule, "Returning existing cache under (%s)", random_path.c_str()) return cache; @@ -547,16 +552,13 @@ class HDCache // compute means if (cache_location == AMSResourceType::HOST) { for (size_t i = 0; i < ndata; ++i) { - CFATAL(UQModule, - m_policy == AMSUQPolicy::DeltaUQ, - "DeltaUQ is not supported yet"); - if (m_policy == AMSUQPolicy::FAISSMean) { + if (m_policy == AMSUQPolicy::FAISS_Mean) { TypeValue mean_dist = std::accumulate(kdists + i * knbrs, kdists + (i + 1) * knbrs, 0.) * ook; is_acceptable[i] = mean_dist < acceptable_error; - } else if (m_policy == AMSUQPolicy::FAISSMax) { + } else if (m_policy == AMSUQPolicy::FAISS_Max) { // Take the furtherst cluster as the distance metric TypeValue max_dist = *std::max_element(&kdists[i * knbrs], @@ -566,9 +568,8 @@ class HDCache } } else { CFATAL(UQModule, - (m_policy == AMSUQPolicy::DeltaUQ) || - (m_policy == AMSUQPolicy::FAISSMax), - "DeltaUQ is not supported yet"); + m_policy == AMSUQPolicy::FAISS_Max, + "FAISS Max on device is not supported yet"); ams::Device::computePredicate( kdists, is_acceptable, ndata, knbrs, acceptable_error); diff --git a/src/ml/surrogate.hpp b/src/ml/surrogate.hpp index 240915fa..7a0f6bec 100644 --- a/src/ml/surrogate.hpp +++ b/src/ml/surrogate.hpp @@ -8,7 +8,7 @@ #ifndef __AMS_SURROGATE_HPP__ #define __AMS_SURROGATE_HPP__ - +#include #include #include #include @@ -39,7 +39,7 @@ class SurrogateModel private: const std::string model_path; AMSResourceType model_resource; - + const bool _is_DeltaUQ; #ifdef __ENABLE_TORCH__ // ------------------------------------------------------------------------- @@ -106,6 +106,30 @@ class SurrogateModel } } +PERFFASPECT() + inline void tensorToHostArray(at::Tensor tensor, + long numRows, + long numCols, + TypeInValue** array) + { + // Transpose to get continuous memory and + // perform single memcpy. + tensor = tensor.transpose(1, 0); + if (model_resource == AMSResourceType::HOST) { + for (long j = 0; j < numCols; j++) { + auto tmp = tensor[j].contiguous(); + TypeInValue* ptr = tmp.data_ptr(); + HtoHMemcpy(array[j], ptr, sizeof(TypeInValue) * numRows); + } + } else { + for (long j = 0; j < numCols; j++) { + auto tmp = tensor[j].contiguous(); + TypeInValue* ptr = tmp.data_ptr(); + DtoHMemcpy(array[j], ptr, sizeof(TypeInValue) * numRows); + } + } + } + // ------------------------------------------------------------------------- // loading a surrogate model! // ------------------------------------------------------------------------- @@ -154,13 +178,29 @@ class SurrogateModel size_t num_in, size_t num_out, const TypeInValue** inputs, - TypeInValue** outputs) + TypeInValue** outputs, + TypeInValue** outputs_stdev) { //torch::NoGradGuard no_grad; c10::InferenceMode guard(true); auto input = arrayToTensor(num_elements, num_in, inputs); input.set_requires_grad(false); - at::Tensor output = module.forward({input}).toTensor().detach(); + if (_is_DeltaUQ) { + assert(outputs_stdev && "Expected non-null outputs_stdev"); + // The deltauq surrogate returns a tuple of (outputs, outputs_stdev) + auto output_tuple = module.forward({input}).toTuple(); + at::Tensor output_mean_tensor = output_tuple->elements()[0].toTensor().detach(); + at::Tensor output_stdev_tensor = output_tuple->elements()[1].toTensor().detach(); + tensorToArray(output_mean_tensor, num_elements, num_out, outputs); + tensorToHostArray(output_stdev_tensor, + num_elements, + num_out, + outputs_stdev); + } + else { + at::Tensor output = module.forward({input}).toTensor().detach(); + tensorToArray(output, num_elements, num_out, outputs); + } DBG(Surrogate, "Evaluate surrogate model (%ld, %ld) -> (%ld, %ld)", @@ -168,7 +208,6 @@ class SurrogateModel num_in, num_elements, num_out); - tensorToArray(output, num_elements, num_out, outputs); } #else @@ -190,10 +229,11 @@ class SurrogateModel #endif - SurrogateModel(const char* model_path, AMSResourceType resource = AMSResourceType::HOST) - : model_path(model_path), model_resource(resource) + SurrogateModel(const char* model_path, + AMSResourceType resource = AMSResourceType::HOST, + bool is_DeltaUQ = false) + : model_path(model_path), model_resource(resource), _is_DeltaUQ(is_DeltaUQ) { - if (resource != AMSResourceType::DEVICE) _load(model_path, "cpu"); else @@ -226,7 +266,8 @@ class SurrogateModel static std::shared_ptr> getInstance( const char* model_path, - AMSResourceType resource = AMSResourceType::HOST) + AMSResourceType resource = AMSResourceType::HOST, + bool is_DeltaUQ = false) { auto model = SurrogateModel::instances.find(std::string(model_path)); @@ -238,6 +279,9 @@ class SurrogateModel "Currently we are not supporting loading the same model file on " "different devices."); + if(is_DeltaUQ != torch_model->is_DeltaUQ()) + THROW(std::runtime_error, "Loaded model instance is not DeltaUQ"); + if (!same_type(torch_model->is_double())) throw std::runtime_error( "Requesting model loading of different data types."); @@ -252,7 +296,7 @@ class SurrogateModel DBG(Surrogate, "Generating new model under (%s)", model_path); std::shared_ptr> torch_model = std::shared_ptr>( - new SurrogateModel(model_path, resource)); + new SurrogateModel(model_path, resource, is_DeltaUQ)); instances.insert(std::make_pair(std::string(model_path), torch_model)); return torch_model; }; @@ -268,9 +312,24 @@ class SurrogateModel size_t num_in, size_t num_out, const TypeInValue** inputs, - TypeInValue** outputs) + TypeInValue** outputs, + TypeInValue **outputs_stdev = nullptr) { - _evaluate(num_elements, num_in, num_out, inputs, outputs); + _evaluate(num_elements, num_in, num_out, inputs, outputs, outputs_stdev); + } + + PERFFASPECT() + inline void evaluate(long num_elements, + std::vector inputs, + std::vector outputs, + std::vector outputs_stdev) + { + _evaluate(num_elements, + inputs.size(), + outputs.size(), + static_cast(inputs.data()), + static_cast(outputs.data()), + static_cast(outputs_stdev.data())); } PERFFASPECT() @@ -282,7 +341,8 @@ class SurrogateModel inputs.size(), outputs.size(), static_cast(inputs.data()), - static_cast(outputs.data())); + static_cast(outputs.data()), + nullptr); } #ifdef __ENABLE_TORCH__ @@ -295,6 +355,7 @@ class SurrogateModel } #endif + bool is_DeltaUQ() { return _is_DeltaUQ; } }; template diff --git a/src/ml/uq.hpp b/src/ml/uq.hpp new file mode 100644 index 00000000..31af4e43 --- /dev/null +++ b/src/ml/uq.hpp @@ -0,0 +1,98 @@ +/* + * Copyright 2021-2023 Lawrence Livermore National Security, LLC and other + * AMSLib Project Developers + * + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + */ + +#ifndef __AMS_UQ_HPP__ +#define __AMS_UQ_HPP__ + +#include +#include + +#include "AMS.h" +#include "ml/hdcache.hpp" +#include "ml/surrogate.hpp" +#include "wf/resource_manager.hpp" + +template +class UQ +{ +public: +PERFFASPECT() + static void evaluate( + AMSUQPolicy uqPolicy, + const int totalElements, + std::vector &inputs, + std::vector &outputs, + const std::shared_ptr> &hdcache, + const std::shared_ptr> &surrogate, + bool *p_ml_acceptable) + { + if ((uqPolicy == AMSUQPolicy::DeltaUQ_Mean) || + (uqPolicy == AMSUQPolicy::DeltaUQ_Max)) { + CALIPER(CALI_MARK_BEGIN("DELTAUQ");) + const size_t ndims = outputs.size(); + std::vector outputs_stdev(ndims); + // TODO: Enable device-side allocation and predicate calculation. + for (int dim = 0; dim < ndims; ++dim) + outputs_stdev[dim] = + ams::ResourceManager::allocate(totalElements, + AMSResourceType::HOST); + + CALIPER(CALI_MARK_BEGIN("SURROGATE");) + DBG(Workflow, "Model exists, I am calling DeltaUQ surrogate (for all data)"); + surrogate->evaluate(totalElements, inputs, outputs, outputs_stdev); + CALIPER(CALI_MARK_END("SURROGATE");) + + if (uqPolicy == DeltaUQ_Mean) { + for (size_t i = 0; i < totalElements; ++i) { + // Use double for increased precision, range in the calculation + double mean = 0.0; + for (size_t dim = 0; dim < ndims; ++dim) + mean += outputs_stdev[dim][i]; + mean /= ndims; + p_ml_acceptable[i] = (mean < _threshold); + } + } else if (uqPolicy == DeltaUQ_Max) { + for (size_t i = 0; i < totalElements; ++i) { + bool is_acceptable = true; + for (size_t dim = 0; dim < ndims; ++dim) + if (outputs_stdev[dim][i] >= _threshold) { + is_acceptable = false; + break; + } + + p_ml_acceptable[i] = is_acceptable; + } + } else { + THROW(std::runtime_error, "Invalid UQ policy"); + } + + for (int dim = 0; dim < ndims; ++dim) + ams::ResourceManager::deallocate(outputs_stdev[dim], AMSResourceType::HOST); + CALIPER(CALI_MARK_END("DELTAUQ");) + } else { + CALIPER(CALI_MARK_BEGIN("HDCACHE");) + if (hdcache) hdcache->evaluate(totalElements, inputs, p_ml_acceptable); + CALIPER(CALI_MARK_END("HDCACHE");) + + CALIPER(CALI_MARK_BEGIN("SURROGATE");) + DBG(Workflow, "Model exists, I am calling surrogate (for all data)"); + surrogate->evaluate(totalElements, inputs, outputs); + CALIPER(CALI_MARK_END("SURROGATE");) + } + } + +PERFFASPECT() + static void setThreshold(FPTypeValue threshold) { _threshold = threshold; } + +private: + static FPTypeValue _threshold; +}; + +template +FPTypeValue UQ::_threshold = 0.5; + +#endif diff --git a/src/wf/debug.h b/src/wf/debug.h index e0765a89..2c50e835 100644 --- a/src/wf/debug.h +++ b/src/wf/debug.h @@ -116,6 +116,8 @@ inline uint32_t getVerbosityLevel() actual_size); \ } while (0); +#define THROW(exception, msg) \ + throw exception(std::string(__FILE__) + ":" + std::to_string(__LINE__) + " " + msg) #else // LIBAMS_VERBOSE is disabled #define CWARNING(id, condition, ...) diff --git a/src/wf/device.hpp b/src/wf/device.hpp index 00f6185c..649d0201 100644 --- a/src/wf/device.hpp +++ b/src/wf/device.hpp @@ -11,6 +11,9 @@ #include #include #include +#include + +#include "wf/debug.h" #ifdef __ENABLE_CUDA__ #include "cuda/utilities.cuh" @@ -34,6 +37,13 @@ void computePredicate(float *data, #endif } +PERFFASPECT() +void computePredicateDeltaUQ() +{ + THROW(std::runtime_error, + "Computing DeltaUQ predications on device is not supported yet"); +} + template PERFFASPECT() void linearize(TypeOutValue *output, diff --git a/src/wf/workflow.hpp b/src/wf/workflow.hpp index 4b0c7b8f..aedaa435 100644 --- a/src/wf/workflow.hpp +++ b/src/wf/workflow.hpp @@ -16,6 +16,7 @@ #include #include "AMS.h" +#include "ml/uq.hpp" #include "ml/hdcache.hpp" #include "ml/surrogate.hpp" #include "resource_manager.hpp" @@ -53,7 +54,7 @@ class AMSWorkflow std::shared_ptr> hdcache; /** The metric/type of UQ we will use to select between physics and ml computations **/ - const AMSUQPolicy uqPolicy = AMSUQPolicy::FAISSMean; + const AMSUQPolicy uqPolicy = AMSUQPolicy::FAISS_Mean; /** The Number of clusters we will use to compute FAISS UQ **/ const int nClusters = 10; @@ -186,14 +187,23 @@ class AMSWorkflow rId(_pId), wSize(_wSize), appDataLoc(appDataLoc), + uqPolicy(uqPolicy), ePolicy(policy) { surrogate = nullptr; - if (surrogate_path) - surrogate = - SurrogateModel::getInstance(surrogate_path, appDataLoc); + if (surrogate_path) { + bool is_DeltaUQ = ((uqPolicy == AMSUQPolicy::DeltaUQ_Max || + uqPolicy == AMSUQPolicy::DeltaUQ_Mean) + ? true + : false); + surrogate = SurrogateModel::getInstance( + surrogate_path, + appDataLoc, + is_DeltaUQ); + } + UQ::setThreshold(threshold); // TODO: Fix magic number. 10 represents the number of neighbours I am // looking at. if (uq_path) @@ -313,11 +323,15 @@ class AMSWorkflow // STEP 1: call the hdcache to look at input uncertainties // to decide if making a ML inference makes sense // ------------------------------------------------------------- - if (hdcache) { - CALIPER(CALI_MARK_BEGIN("UQ_MODULE");) - hdcache->evaluate(totalElements, origInputs, p_ml_acceptable); - CALIPER(CALI_MARK_END("UQ_MODULE");) - } + CALIPER(CALI_MARK_BEGIN("UQ_MODULE");) + UQ::evaluate(uqPolicy, + totalElements, + origInputs, + origOutputs, + hdcache, + surrogate, + p_ml_acceptable); + CALIPER(CALI_MARK_END("UQ_MODULE");) DBG(Workflow, "Computed Predicates") @@ -334,14 +348,6 @@ class AMSWorkflow bool *predicate = p_ml_acceptable; - CALIPER(CALI_MARK_BEGIN("SURROGATE");) - // We need to call the model on all data values. - // Because we expect it to be faster. - // I guess we may need to add some policy to do this - DBG(Workflow, "Model exists, I am calling surrogate (for all data)"); - surrogate->evaluate(totalElements, origInputs, origOutputs); - CALIPER(CALI_MARK_END("SURROGATE");) - // ----------------------------------------------------------------- // STEP 3: call physics module only where d_dense_need_phys = true // ----------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 829510d3..8e76b4cd 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -31,12 +31,13 @@ add_test(NAME AMSExampleInferSingle::HOST COMMAND ams_example --precision singl add_test(NAME AMSExampleInferDouble::HOST COMMAND ams_example --precision double -S ${CMAKE_CURRENT_SOURCE_DIR}/debug_model.pt -e 100) endif() +add_test(NAME AMSInferSingleDeltaUQ::HOST COMMAND ams_example --precision single --uqtype deltauq-mean -db ./db -S ${CMAKE_CURRENT_SOURCE_DIR}/tuple-single.torchscript -e 100) if(WITH_FAISS) -ADDTEST(ams_hdcache_mean_double test_hdcache.cpp AMSHDCacheMeanPolicyDouble ${CMAKE_CURRENT_SOURCE_DIR}/faiss_debug.pt "double" 0 10 4.0 4 5) +ADDTEST(ams_hdcache_mean_double test_hdcache.cpp AMSHDCacheMeanPolicyDouble ${CMAKE_CURRENT_SOURCE_DIR}/faiss_debug.pt "double" 1 10 4.0 4 5) # The max case fails on DEVICE. We should be aware abou this when adding support for CI for GPUs -ADDTEST(ams_hdcache_max_double test_hdcache.cpp AMSHDCacheMaxPolicyDouble ${CMAKE_CURRENT_SOURCE_DIR}/faiss_debug.pt "double" 1 10 4.0 4 5) +ADDTEST(ams_hdcache_max_double test_hdcache.cpp AMSHDCacheMaxPolicyDouble ${CMAKE_CURRENT_SOURCE_DIR}/faiss_debug.pt "double" 2 10 4.0 4 5) -ADDTEST(ams_hdcache_mean_single test_hdcache.cpp AMSHDCacheMeanPolicySingle ${CMAKE_CURRENT_SOURCE_DIR}/faiss_debug.pt "single" 0 10 4.0 4 5) +ADDTEST(ams_hdcache_mean_single test_hdcache.cpp AMSHDCacheMeanPolicySingle ${CMAKE_CURRENT_SOURCE_DIR}/faiss_debug.pt "single" 1 10 4.0 4 5) # The max case fails on DEVICE. We should be aware abou this when adding support for CI for GPUs ADDTEST(ams_hdcache_max_single test_hdcache.cpp AMSHDCacheMaxPolicySingle ${CMAKE_CURRENT_SOURCE_DIR}/faiss_debug.pt "single" 1 10 4.0 4 5) if (WITH_TORCH) diff --git a/tests/tuple-single.torchscript b/tests/tuple-single.torchscript new file mode 100644 index 00000000..c40411b4 Binary files /dev/null and b/tests/tuple-single.torchscript differ