diff --git a/CMakeLists.txt b/CMakeLists.txt index a46a579..d2d55c4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 2.8.12) +cmake_minimum_required(VERSION 3.1.3) project(flow_graph_test) @@ -10,11 +10,18 @@ add_executable(${PROJECT_NAME} simplesequencegraph.cpp limitgraph.cpp limitsequencegraph.cpp + parallelgraph.cpp ) +set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD_REQUIRED ON) +set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 17) + set(TBB_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/tbb) include(${TBB_ROOT}/cmake/TBBBuild.cmake) tbb_build(TBB_ROOT ${TBB_ROOT} CONFIG_DIR TBB_DIR MAKE_ARGS stdver=c++11 tbb_cpf=1) find_package(TBB REQUIRED tbb_preview) target_link_libraries(${PROJECT_NAME} ${TBB_IMPORTED_TARGETS}) + +find_package(Threads REQUIRED) +target_link_libraries (${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/main.cpp b/main.cpp index 3a42e97..a929039 100644 --- a/main.cpp +++ b/main.cpp @@ -2,6 +2,7 @@ #include "limitsequencegraph.h" #include "simplegraph.h" #include "simplesequencegraph.h" +#include "parallelgraph.h" int main(int argc, char *argv[]) { @@ -14,9 +15,12 @@ int main(int argc, char *argv[]) // LimitGraph limitGraph; // limitGraph.exec(); - const size_t maxParallelJobs = 12; - LimitSequenceGraph limitSequenceGraph(maxParallelJobs); - limitSequenceGraph.exec(); +// const size_t maxParallelJobs = 12; +// LimitSequenceGraph limitSequenceGraph(maxParallelJobs); +// limitSequenceGraph.exec(); + + ParallelGraph parallelGraph; + parallelGraph.exec(); return 0; } diff --git a/parallelgraph.cpp b/parallelgraph.cpp new file mode 100644 index 0000000..8e627cd --- /dev/null +++ b/parallelgraph.cpp @@ -0,0 +1,134 @@ +#include "parallelgraph.h" + +#include +#include + +ParallelGraph::ParallelGraph(size_t maxParallel /*= 8*/) + : m_maxParallel(maxParallel) + , m_graph(new graph) +{ + auto f = [this] (Message msg) -> Message { + // Just waste some cpu cycles and memory - simulate decompressing columns + const size_t size = 20000000; + msg.data.reset(new float[size]); + for (auto i = 0U; i < size; ++i) { + msg.data[i] = static_cast(msg.id); + msg.data[i]++; + msg.data[i]--; + } + + // Imagine some work was done here with decompressed data + msg.beta = processAsync(msg.id, msg.data); + + return msg; + }; + + // Do some work on maxParallel threads at once + m_computeNode.reset(new function_node(*m_graph, m_maxParallel, f)); + + // Decide whether to continue calculations or discard + auto g = [] (decision_node::input_type input, + decision_node::output_ports_type &outputPorts) { + + std::get<0>(outputPorts).try_put(continue_msg()); + + if (input.beta < -2) { + // Do global computation + std::get<1>(outputPorts).try_put(std::move(input)); + } else { + // Discard + std::get<0>(outputPorts).try_put(continue_msg()); + } + }; + + m_decisionNode.reset(new decision_node(*m_graph, m_maxParallel, g)); + + // Do global computation + auto h = [] (Message msg) -> continue_msg { + + std::cout << "Global computation " << msg.id + << "; beta: " << msg.beta << std::endl; + + // Just waste some cpu cycles and memory - simulate calculations + const size_t size = 20000000; + for (auto i = 0U; i < size; ++i) { + msg.data[i] = static_cast(msg.id); + msg.data[i]++; + msg.data[i]--; + } + + return continue_msg(); + }; + // Use the serial policy + m_globalComputeNode.reset(new function_node(*m_graph, serial, h)); + + // Limit the number of parallel computations + m_limit.reset(new limiter_node(*m_graph, m_maxParallel)); + + // Enforce the correct order, based on the message id + m_ordering.reset(new sequencer_node(*m_graph, [] (const Message& msg) -> unsigned int { + return msg.id; + })); + + // Set up the graph topology: + // + // orderingNode -> limitNode -> decompressionAndSamplingNode (parallel) + // ^ | + // |___discard____decisionNode (parallel) + // ^ | + // | | keep + // | | + // |______________globalCompute (serial) + // + // Run the decompressionAndSampling node in the correct order, but do not + // wait for the most up-to-date data. + make_edge(*m_ordering, *m_limit); + make_edge(*m_limit, *m_computeNode); + + // Feedback that we can now decompress another column, OR + make_edge(*m_computeNode, *m_decisionNode); + make_edge(output_port<0>(*m_decisionNode), m_limit->decrement); + // Do the global computation + make_edge(output_port<1>(*m_decisionNode), *m_globalComputeNode); + + // Feedback that we can now decompress another column + make_edge(*m_globalComputeNode, m_limit->decrement); +} + +void ParallelGraph::exec() +{ + // Push some messages into the top of the graph to be processed - representing the column indices + for (unsigned int i = 0; i < 1000; ++i) { + Message msg { i }; + m_ordering->try_put(msg); + } + + // Wait for the graph to complete + m_graph->wait_for_all(); +} + +double ParallelGraph::processAsync(const size_t id, const DataPtr &data) +{ + double beta; + { + // Simulate getting shared data + std::shared_lock lock(m_mutex); + beta = 0.0; + } + + // Just waste some cpu cycles and memory - simulate calculations + const size_t size = 20000000; + for (auto i = 0U; i < size; ++i) { + data[i] = static_cast(id); + data[i]++; + data[i]--; + } + + { + // Simulate writing shared data + std::unique_lock lock(m_mutex); + beta = m_distribution(m_generator); + } + + return beta; +} diff --git a/parallelgraph.h b/parallelgraph.h new file mode 100644 index 0000000..afaf033 --- /dev/null +++ b/parallelgraph.h @@ -0,0 +1,46 @@ +#ifndef PARALLELGRAPH_H +#define PARALLELGRAPH_H + +#include "tbb/flow_graph.h" +#include +#include +#include + +using namespace tbb::flow; + +class ParallelGraph +{ +public: + ParallelGraph(size_t maxParallel = 8); + + void exec(); + +private: + using DataPtr = std::shared_ptr; + + struct Message { + unsigned int id = 0; + DataPtr data = nullptr; + double beta = 0; + }; + + size_t m_maxParallel = 8; + std::unique_ptr m_graph; + std::unique_ptr> m_computeNode; + std::unique_ptr> m_limit; + std::unique_ptr> m_ordering; + std::unique_ptr> m_globalComputeNode; + + using decision_node = multifunction_node >; + std::unique_ptr m_decisionNode; + + std::random_device m_randomDevice {}; + std::mt19937 m_generator {m_randomDevice()}; + std::normal_distribution<> m_distribution {0, 1}; + + mutable std::shared_mutex m_mutex; + + double processAsync(const size_t id, const DataPtr &data); +}; + +#endif // PARALLELGRAPH_H diff --git a/tbb b/tbb index 3147923..2ace525 160000 --- a/tbb +++ b/tbb @@ -1 +1 @@ -Subproject commit 314792356bf75f4a190277536aea543b9b6b310b +Subproject commit 2ace525889b0c3de9c90da943fac9259220ef35f