Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: add thread pool backed parallel implementation to "simple" and "breadth_first" schedulers #78

Merged
merged 6 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions bench/bm_case1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ invoke_work(auto &sched) {
using namespace benchmark;
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
sched.work();
sched.run_and_wait();
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
}
Expand Down Expand Up @@ -598,7 +598,7 @@ inline const boost::ut::suite _runtime_tests = [] {
::benchmark::benchmark<1LU>{ test_name }.repeat<N_ITER>(N_SAMPLES) = [&sched]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
sched.work();
sched.run_and_wait();
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand Down Expand Up @@ -629,7 +629,7 @@ inline const boost::ut::suite _simd_tests = [] {
"runtime src->mult(2.0)->mult(0.5)->add(-1)->sink (SIMD)"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
sched.work();
sched.run_and_wait();
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand Down Expand Up @@ -665,7 +665,8 @@ inline const boost::ut::suite _simd_tests = [] {
"runtime src->(mult(2.0)->mult(0.5)->add(-1))^10->sink (SIMD)"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
sched.work();
sched.run_and_wait();
sched.reset();
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand Down Expand Up @@ -693,7 +694,7 @@ inline const boost::ut::suite _sample_by_sample_vs_bulk_access_tests = [] {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
fg::scheduler::simple sched{ std::move(flow_graph) };
sched.work();
sched.run_and_wait();
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand All @@ -719,7 +720,7 @@ inline const boost::ut::suite _sample_by_sample_vs_bulk_access_tests = [] {
::benchmark::benchmark<1LU>{ test_name }.repeat<N_ITER>(N_SAMPLES) = [&sched]() {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
sched.work();
sched.run_and_wait();
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
};
Expand Down
2 changes: 1 addition & 1 deletion bench/bm_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ invoke_work(auto &sched) {
using namespace benchmark;
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
sched.work();
sched.run_and_wait();
expect(eq(test::n_samples_produced, N_SAMPLES)) << "did not produce enough output samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "did not consume enough input samples";
}
Expand Down
70 changes: 48 additions & 22 deletions bench/bm_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace fg = fair::graph;

inline constexpr std::size_t N_ITER = 10;
inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000'000, 1024);
inline constexpr std::size_t N_NODES = 5;

template<typename T, char op>
class math_op : public fg::node<math_op<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
Expand Down Expand Up @@ -98,34 +99,59 @@ void exec_bm(auto& scheduler, const std::string& test_case) {
using namespace benchmark;
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
scheduler.work();
scheduler.run_and_wait();
expect(eq(test::n_samples_produced, N_SAMPLES)) << fmt::format("did not produce enough output samples for {}", test_case);
expect(ge(test::n_samples_consumed, N_SAMPLES)) << fmt::format("did not consume enough input samples for {}", test_case);
scheduler.reset();
}

[[maybe_unused]] inline const boost::ut::suite _scheduler = [] {
[[maybe_unused]] inline const boost::ut::suite scheduler_tests = [] {
using namespace boost::ut;
using namespace benchmark;

fg::scheduler::simple sched1(test_graph_linear<float>(10));
"linear graph - simple scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched1]() {
exec_bm(sched1, "linear-graph simple-sched");
};

fg::scheduler::breadth_first sched2(test_graph_linear<float>(10));
"linear graph - BFS scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched2]() {
exec_bm(sched2, "linear-graph BFS-sched");
};

fg::scheduler::simple sched3(test_graph_bifurcated<float>(5));
"bifurcated graph - simple scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched3]() {
exec_bm(sched3, "bifurcated-graph simple-sched");
};

fg::scheduler::breadth_first sched4(test_graph_bifurcated<float>(5));
"bifurcated graph - BFS scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched4]() {
exec_bm(sched4, "bifurcated-graph BFS-sched");
};
using thread_pool = fair::thread_pool::BasicThreadPool;
using fg::scheduler::execution_policy::multi_threaded;

auto pool = std::make_shared<thread_pool>("custom-pool", fair::thread_pool::CPU_BOUND, 2, 2);

fg::scheduler::simple sched1(test_graph_linear<float>(2 * N_NODES), pool);
"linear graph - simple scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched1]() {
exec_bm(sched1, "linear-graph simple-sched");
};

fg::scheduler::breadth_first sched2(test_graph_linear<float>(2 * N_NODES), pool);
"linear graph - BFS scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched2]() {
exec_bm(sched2, "linear-graph BFS-sched");
};

fg::scheduler::simple sched3(test_graph_bifurcated<float>(N_NODES), pool);
"bifurcated graph - simple scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched3]() {
exec_bm(sched3, "bifurcated-graph simple-sched");
};

fg::scheduler::breadth_first sched4(test_graph_bifurcated<float>(N_NODES), pool);
"bifurcated graph - BFS scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched4]() {
exec_bm(sched4, "bifurcated-graph BFS-sched");
};

fg::scheduler::simple<multi_threaded> sched1_mt(test_graph_linear<float>(2 * N_NODES), pool);
"linear graph - simple scheduler (multi-threaded)"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched1_mt]() {
exec_bm(sched1_mt, "linear-graph simple-sched (multi-threaded)");
};

fg::scheduler::breadth_first<multi_threaded> sched2_mt(test_graph_linear<float>(2 * N_NODES), pool);
"linear graph - BFS scheduler (multi-threaded)"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched2_mt]() {
exec_bm(sched2_mt, "linear-graph BFS-sched (multi-threaded)");
};

fg::scheduler::simple<multi_threaded> sched3_mt(test_graph_bifurcated<float>(N_NODES), pool);
"bifurcated graph - simple scheduler (multi-threaded)"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched3_mt]() {
exec_bm(sched3_mt, "bifurcated-graph simple-sched (multi-threaded)");
};

fg::scheduler::breadth_first<multi_threaded> sched4_mt(test_graph_bifurcated<float>(N_NODES), pool);
"bifurcated graph - BFS scheduler (multi-threaded)"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched4_mt]() {
exec_bm(sched4_mt, "bifurcated-graph BFS-sched (multi-threaded)");
};
};

int
Expand Down
17 changes: 11 additions & 6 deletions include/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,9 @@ class edge {

class graph {
private:
std::vector<std::function<connection_result_t()>> _connection_definitions;
std::vector<std::unique_ptr<node_model>> _nodes;
std::vector<edge> _edges;
std::vector<std::function<connection_result_t(graph&)>> _connection_definitions;
std::vector<std::unique_ptr<node_model>> _nodes;
std::vector<edge> _edges;

template<typename Node>
std::unique_ptr<node_model> &
Expand Down Expand Up @@ -705,8 +705,8 @@ class graph {
if (!is_node_known(source) || !is_node_known(destination)) {
throw fmt::format("Source {} and/or destination {} do not belong to this graph\n", source.name(), destination.name());
}
self._connection_definitions.push_back([self = &self, source = &source, source_port = &port, destination = &destination, destination_port = &destination_port]() {
return self->connect_impl<src_port_index, dst_port_index>(*source, *source_port, *destination, *destination_port);
self._connection_definitions.push_back([source = &source, source_port = &port, destination = &destination, destination_port = &destination_port](graph &graph) {
return graph.connect_impl<src_port_index, dst_port_index>(*source, *source_port, *destination, *destination_port);
});
return connection_result_t::SUCCESS;
}
Expand Down Expand Up @@ -761,6 +761,11 @@ class graph {
connect(Source &source, Port Source::*member_ptr);

public:
graph(graph&) = delete;
graph(graph&&) = default;
graph() = default;
graph &operator=(graph&) = delete;
graph &operator=(graph&&) = default;
/**
* @return a list of all blocks contained in this graph
* N.B. some 'blocks' may be (sub-)graphs themselves
Expand Down Expand Up @@ -846,7 +851,7 @@ class graph {
return result;
}

const std::vector<std::function<connection_result_t()>> &
const std::vector<std::function<connection_result_t(graph&)>> &
connection_definitions() {
return _connection_definitions;
}
Expand Down
Loading