Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelGraph #1

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 2.8.12)
cmake_minimum_required(VERSION 3.1.3)

project(flow_graph_test)

Expand All @@ -10,11 +10,18 @@ add_executable(${PROJECT_NAME}
simplesequencegraph.cpp
limitgraph.cpp
limitsequencegraph.cpp
parallelgraph.cpp
)

set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD_REQUIRED ON)
set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 17)

set(TBB_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/tbb)
include(${TBB_ROOT}/cmake/TBBBuild.cmake)
tbb_build(TBB_ROOT ${TBB_ROOT} CONFIG_DIR TBB_DIR MAKE_ARGS stdver=c++11 tbb_cpf=1)
find_package(TBB REQUIRED tbb_preview)

target_link_libraries(${PROJECT_NAME} ${TBB_IMPORTED_TARGETS})

find_package(Threads REQUIRED)
target_link_libraries (${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT})
10 changes: 7 additions & 3 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "limitsequencegraph.h"
#include "simplegraph.h"
#include "simplesequencegraph.h"
#include "parallelgraph.h"

int main(int argc, char *argv[])
{
Expand All @@ -14,9 +15,12 @@ int main(int argc, char *argv[])
// LimitGraph limitGraph;
// limitGraph.exec();

const size_t maxParallelJobs = 12;
LimitSequenceGraph limitSequenceGraph(maxParallelJobs);
limitSequenceGraph.exec();
// const size_t maxParallelJobs = 12;
// LimitSequenceGraph limitSequenceGraph(maxParallelJobs);
// limitSequenceGraph.exec();

ParallelGraph parallelGraph;
parallelGraph.exec();

return 0;
}
134 changes: 134 additions & 0 deletions parallelgraph.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#include "parallelgraph.h"

#include <iostream>
#include <mutex>

ParallelGraph::ParallelGraph(size_t maxParallel /*= 8*/)
: m_maxParallel(maxParallel)
, m_graph(new graph)
{
auto f = [this] (Message msg) -> Message {
// Just waste some cpu cycles and memory - simulate decompressing columns
const size_t size = 20000000;
msg.data.reset(new float[size]);
for (auto i = 0U; i < size; ++i) {
msg.data[i] = static_cast<float>(msg.id);
msg.data[i]++;
msg.data[i]--;
}

// Imagine some work was done here with decompressed data
msg.beta = processAsync(msg.id, msg.data);

return msg;
};

// Do some work on maxParallel threads at once
m_computeNode.reset(new function_node<Message, Message>(*m_graph, m_maxParallel, f));

// Decide whether to continue calculations or discard
auto g = [] (decision_node::input_type input,
decision_node::output_ports_type &outputPorts) {

std::get<0>(outputPorts).try_put(continue_msg());

if (input.beta < -2) {
// Do global computation
std::get<1>(outputPorts).try_put(std::move(input));
} else {
// Discard
std::get<0>(outputPorts).try_put(continue_msg());
}
};

m_decisionNode.reset(new decision_node(*m_graph, m_maxParallel, g));

// Do global computation
auto h = [] (Message msg) -> continue_msg {

std::cout << "Global computation " << msg.id
<< "; beta: " << msg.beta << std::endl;

// Just waste some cpu cycles and memory - simulate calculations
const size_t size = 20000000;
for (auto i = 0U; i < size; ++i) {
msg.data[i] = static_cast<float>(msg.id);
msg.data[i]++;
msg.data[i]--;
}

return continue_msg();
};
// Use the serial policy
m_globalComputeNode.reset(new function_node<Message>(*m_graph, serial, h));

// Limit the number of parallel computations
m_limit.reset(new limiter_node<Message>(*m_graph, m_maxParallel));

// Enforce the correct order, based on the message id
m_ordering.reset(new sequencer_node<Message>(*m_graph, [] (const Message& msg) -> unsigned int {
return msg.id;
}));

// Set up the graph topology:
//
// orderingNode -> limitNode -> decompressionAndSamplingNode (parallel)
// ^ |
// |___discard____decisionNode (parallel)
// ^ |
// | | keep
// | |
// |______________globalCompute (serial)
//
// Run the decompressionAndSampling node in the correct order, but do not
// wait for the most up-to-date data.
make_edge(*m_ordering, *m_limit);
make_edge(*m_limit, *m_computeNode);

// Feedback that we can now decompress another column, OR
make_edge(*m_computeNode, *m_decisionNode);
make_edge(output_port<0>(*m_decisionNode), m_limit->decrement);
// Do the global computation
make_edge(output_port<1>(*m_decisionNode), *m_globalComputeNode);

// Feedback that we can now decompress another column
make_edge(*m_globalComputeNode, m_limit->decrement);
}

void ParallelGraph::exec()
{
// Push some messages into the top of the graph to be processed - representing the column indices
for (unsigned int i = 0; i < 1000; ++i) {
Message msg { i };
m_ordering->try_put(msg);
}

// Wait for the graph to complete
m_graph->wait_for_all();
}

double ParallelGraph::processAsync(const size_t id, const DataPtr &data)
{
double beta;
{
// Simulate getting shared data
std::shared_lock lock(m_mutex);
beta = 0.0;
}

// Just waste some cpu cycles and memory - simulate calculations
const size_t size = 20000000;
for (auto i = 0U; i < size; ++i) {
data[i] = static_cast<float>(id);
data[i]++;
data[i]--;
}

{
// Simulate writing shared data
std::unique_lock lock(m_mutex);
beta = m_distribution(m_generator);
}

return beta;
}
46 changes: 46 additions & 0 deletions parallelgraph.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#ifndef PARALLELGRAPH_H
#define PARALLELGRAPH_H

#include "tbb/flow_graph.h"
#include <memory>
#include <shared_mutex>
#include <random>

using namespace tbb::flow;

class ParallelGraph
{
public:
ParallelGraph(size_t maxParallel = 8);

void exec();

private:
using DataPtr = std::shared_ptr<float[]>;

struct Message {
unsigned int id = 0;
DataPtr data = nullptr;
double beta = 0;
};

size_t m_maxParallel = 8;
std::unique_ptr<graph> m_graph;
std::unique_ptr<function_node<Message, Message>> m_computeNode;
std::unique_ptr<limiter_node<Message>> m_limit;
std::unique_ptr<sequencer_node<Message>> m_ordering;
std::unique_ptr<function_node<Message>> m_globalComputeNode;

using decision_node = multifunction_node<Message, tbb::flow::tuple<continue_msg, Message> >;
std::unique_ptr<decision_node> m_decisionNode;

std::random_device m_randomDevice {};
std::mt19937 m_generator {m_randomDevice()};
std::normal_distribution<> m_distribution {0, 1};

mutable std::shared_mutex m_mutex;

double processAsync(const size_t id, const DataPtr &data);
};

#endif // PARALLELGRAPH_H
2 changes: 1 addition & 1 deletion tbb
Submodule tbb updated from 314792 to 2ace52