Skip to content

Commit

Permalink
thread_pool: task type as member instead of NTTP
Browse files Browse the repository at this point in the history
simplifies thread pool management
  • Loading branch information
wirew0rm committed Jun 21, 2023
1 parent 61a0e78 commit e882dfb
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 31 deletions.
4 changes: 2 additions & 2 deletions bench/bm_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ void exec_bm(auto& scheduler, const std::string& test_case) {
[[maybe_unused]] inline const boost::ut::suite scheduler_tests = [] {
using namespace boost::ut;
using namespace benchmark;
using thread_pool = fair::thread_pool::BasicThreadPool<fair::thread_pool::CPU_BOUND>;
using thread_pool = fair::thread_pool::BasicThreadPool;
using fg::scheduler::execution_policy::multi_threaded;

auto pool = std::make_shared<thread_pool>("custom-pool", 2, 2);
auto pool = std::make_shared<thread_pool>("custom-pool", fair::thread_pool::CPU_BOUND, 2, 2);

fg::scheduler::simple sched1(test_graph_linear<float>(2 * N_NODES), pool);
"linear graph - simple scheduler"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched1]() {
Expand Down
18 changes: 10 additions & 8 deletions include/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,17 @@ void run_on_pool(std::span<node_model*> job, std::size_t n_batches, std::atomic_
/**
* Trivial loop based scheduler, which iterates over all nodes in definition order in the graph until no node did any processing
*/
template<execution_policy executionPolicy = single_threaded, typename thread_pool_type = thread_pool::BasicThreadPool<thread_pool::CPU_BOUND>>
class simple : public node<simple<executionPolicy, thread_pool_type>>{
template<execution_policy executionPolicy = single_threaded>
class simple : public node<simple<executionPolicy>>{
using node_t = node_model*;
using thread_pool_type = thread_pool::BasicThreadPool;
init_proof _init;
fair::graph::graph _graph;
std::shared_ptr<thread_pool_type> _pool;
std::vector<std::vector<node_t>> _job_lists{};
public:
explicit simple(fair::graph::graph &&graph, std::shared_ptr<thread_pool_type> thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool<thread_pool::CPU_BOUND>>("simple-scheduler-pool"))
: _init{fair::graph::scheduler::init(graph)}, _graph(std::move(graph)), _pool(thread_pool) {
explicit simple(fair::graph::graph &&graph, std::shared_ptr<thread_pool_type> thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool>("simple-scheduler-pool", thread_pool::CPU_BOUND))
: _init{fair::graph::scheduler::init(graph)}, _graph(std::move(graph)), _pool(std::move(thread_pool)) {
// generate job list
const auto n_batches = std::min(static_cast<std::size_t>(_pool->maxThreads()), _graph.blocks().size());
_job_lists.reserve(n_batches);
Expand Down Expand Up @@ -150,17 +151,18 @@ class simple : public node<simple<executionPolicy, thread_pool_type>>{
* Breadth first traversal scheduler which traverses the graph starting from the source nodes in a breath first fashion
* detecting cycles and nodes which can be reached from several source nodes.
*/
template<execution_policy executionPolicy = single_threaded, typename thread_pool_type = thread_pool::BasicThreadPool<thread_pool::CPU_BOUND>>
class breadth_first : public node<breadth_first<executionPolicy, thread_pool_type>> {
template<execution_policy executionPolicy = single_threaded>
class breadth_first : public node<breadth_first<executionPolicy>> {
using node_t = node_model*;
using thread_pool_type = thread_pool::BasicThreadPool;
init_proof _init;
fair::graph::graph _graph;
std::vector<node_t> _nodelist;
std::vector<std::vector<node_t>> _job_lists;
std::shared_ptr<thread_pool_type> _pool;
public:
explicit breadth_first(fair::graph::graph &&graph, std::shared_ptr<thread_pool_type> thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool<thread_pool::CPU_BOUND>>("breadth-first-pool"))
: _init{fair::graph::scheduler::init(graph)}, _graph(std::move(graph)), _pool(thread_pool) {
explicit breadth_first(fair::graph::graph &&graph, std::shared_ptr<thread_pool_type> thread_pool = std::make_shared<thread_pool_type>("breadth-first-pool", thread_pool::CPU_BOUND))
: _init{fair::graph::scheduler::init(graph)}, _graph(std::move(graph)), _pool(std::move(thread_pool)) {
std::map<node_t, std::vector<node_t>> _adjacency_list{};
std::vector<node_t> _source_nodes{};
// compute the adjacency list
Expand Down
23 changes: 10 additions & 13 deletions include/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ concept ThreadPool = requires(T t, std::function<void()> &&func) {
* }
* @endcode
*/
template<TaskType taskType>
class BasicThreadPool {
using Task = thread_pool::detail::Task;
using TaskQueue = thread_pool::detail::TaskQueue;
Expand Down Expand Up @@ -351,15 +350,16 @@ class BasicThreadPool {
int _schedulingPriority = 0;

const std::string _poolName;
const TaskType _taskType;
const uint32_t _minThreads;
const uint32_t _maxThreads;

public:
std::chrono::microseconds sleepDuration = std::chrono::milliseconds(1);
std::chrono::milliseconds keepAliveDuration = std::chrono::seconds(10);

BasicThreadPool(const std::string_view &name = generateName(), uint32_t min = std::thread::hardware_concurrency(), uint32_t max = std::thread::hardware_concurrency())
: _poolName(name), _minThreads(min), _maxThreads(max) {
BasicThreadPool(const std::string_view &name = generateName(), const TaskType taskType = TaskType::CPU_BOUND, uint32_t min = std::thread::hardware_concurrency(), uint32_t max = std::thread::hardware_concurrency())
: _poolName(name), _taskType(taskType), _minThreads(min), _maxThreads(max) {
assert(min > 0 && "minimum number of threads must be > 0");
assert(min <= max && "minimum number of threads must be <= maximum number of threads");
for (uint32_t i = 0; i < _minThreads; ++i) {
Expand All @@ -376,9 +376,9 @@ class BasicThreadPool {
}

BasicThreadPool(const BasicThreadPool &) = delete;
BasicThreadPool(BasicThreadPool &&) = default;
BasicThreadPool(BasicThreadPool &&) = delete;
BasicThreadPool &operator=(const BasicThreadPool &) = delete;
BasicThreadPool &operator=(BasicThreadPool &&) = default;
BasicThreadPool &operator=(BasicThreadPool &&) = delete;

[[nodiscard]] std::string poolName() const noexcept { return _poolName; }
[[nodiscard]] uint32_t minThreads() const noexcept { return _minThreads; };
Expand Down Expand Up @@ -435,7 +435,7 @@ class BasicThreadPool {

_taskQueue.push(createTask<taskName, priority, cpuID>(std::forward<decltype(func)>(func), std::forward<decltype(func)>(args)...));
_condition.notify_one();
if constexpr (taskType == TaskType::IO_BOUND) {
if (_taskType == TaskType::IO_BOUND) {
spinWait.spinOnce();
spinWait.spinOnce();
while (_taskQueue.size() > 0) {
Expand Down Expand Up @@ -493,7 +493,7 @@ class BasicThreadPool {
thread::setThreadName(fmt::format("{}#{}", _poolName, threadID), thread);
thread::setThreadSchedulingParameter(_schedulingPolicy, _schedulingPriority, thread);
if (!_affinityMask.empty()) {
if (taskType == TaskType::IO_BOUND) {
if (_taskType == TaskType::IO_BOUND) {
thread::setThreadAffinity(_affinityMask);
return;
}
Expand Down Expand Up @@ -636,12 +636,9 @@ class BasicThreadPool {
} while (running);
}
};
template<TaskType T>
inline std::atomic<uint64_t> BasicThreadPool<T>::_globalPoolId = 0U;
template<TaskType T>
inline std::atomic<uint64_t> BasicThreadPool<T>::_taskID = 0U;
static_assert(ThreadPool<BasicThreadPool<IO_BOUND>>);
static_assert(ThreadPool<BasicThreadPool<CPU_BOUND>>);
inline std::atomic<uint64_t> BasicThreadPool::_globalPoolId = 0U;
inline std::atomic<uint64_t> BasicThreadPool::_taskID = 0U;
static_assert(ThreadPool<BasicThreadPool>);

}

Expand Down
2 changes: 1 addition & 1 deletion test/qa_hier_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ make_graph(std::size_t events_count) {

int
main() {
auto thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool<fair::thread_pool::CPU_BOUND>>("custom pool", 2,2);
auto thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool>("custom pool", fair::thread_pool::CPU_BOUND, 2,2);

fg::scheduler::simple scheduler(make_graph(10), thread_pool);

Expand Down
2 changes: 1 addition & 1 deletion test/qa_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ void check_node_names(std::vector<node_type> joblist, std::set<std::string> set)
const boost::ut::suite SchedulerTests = [] {
using namespace boost::ut;
using namespace fair::graph;
auto thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool<fair::thread_pool::CPU_BOUND>>("custom pool", 2,2);
auto thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool>("custom pool", fair::thread_pool::CPU_BOUND, 2, 2);

"SimpleScheduler_linear"_test = [&thread_pool] {
using scheduler = fair::graph::scheduler::simple<>;
Expand Down
2 changes: 1 addition & 1 deletion test/qa_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ const boost::ut::suite SettingsTests = [] {
expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(block1).to<"in">(block2)));
expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(block2).to<"in">(sink)));

auto thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool<fair::thread_pool::CPU_BOUND>>("custom pool", 2,2);
auto thread_pool = std::make_shared<fair::thread_pool::BasicThreadPool>("custom pool", fair::thread_pool::CPU_BOUND, 2, 2);
fair::graph::scheduler::simple sched{ std::move(flow_graph), thread_pool };
expect(src.settings().auto_update_parameters().contains("sample_rate"));
std::ignore = src.settings().set({ { "sample_rate", 49000.0f } });
Expand Down
10 changes: 5 additions & 5 deletions test/qa_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ const boost::ut::suite ThreadPoolTests = [] {
using namespace boost::ut;

"Basic ThreadPool tests"_test = [] {
expect(nothrow([] { fair::thread_pool::BasicThreadPool<fair::thread_pool::IO_BOUND>(); }));
expect(nothrow([] { fair::thread_pool::BasicThreadPool<fair::thread_pool::CPU_BOUND>(); }));
expect(nothrow([] { fair::thread_pool::BasicThreadPool("test", fair::thread_pool::IO_BOUND); }));
expect(nothrow([] { fair::thread_pool::BasicThreadPool("test2", fair::thread_pool::CPU_BOUND); }));

std::atomic<int> enqueueCount{0};
std::atomic<int> executeCount{0};
fair::thread_pool::BasicThreadPool<fair::thread_pool::IO_BOUND> pool("TestPool", 1, 2);
fair::thread_pool::BasicThreadPool pool("TestPool", fair::thread_pool::IO_BOUND, 1, 2);
expect(nothrow([&] { pool.sleepDuration = std::chrono::milliseconds(1); }));
expect(nothrow([&] { pool.keepAliveDuration = std::chrono::seconds(10); }));
pool.waitUntilInitialised();
Expand Down Expand Up @@ -60,7 +60,7 @@ const boost::ut::suite ThreadPoolTests = [] {
};
"contention tests"_test = [] {
std::atomic<int> counter{0};
fair::thread_pool::BasicThreadPool<fair::thread_pool::IO_BOUND> pool("contention", 1, 4);
fair::thread_pool::BasicThreadPool pool("contention", fair::thread_pool::IO_BOUND, 1, 4);
pool.waitUntilInitialised();
expect(that % pool.isInitialised());
expect(pool.numThreads() == 1_u);
Expand Down Expand Up @@ -99,7 +99,7 @@ const boost::ut::suite ThreadPoolTests = [] {
std::atomic<int> counter{0};

// Pool with min and max thread count
fair::thread_pool::BasicThreadPool<fair::thread_pool::IO_BOUND> pool("count_test", minThreads, maxThreads);
fair::thread_pool::BasicThreadPool pool("count_test", fair::thread_pool::IO_BOUND, minThreads, maxThreads);
pool.keepAliveDuration = std::chrono::milliseconds(10); // default is 10 seconds, reducing for testing
pool.waitUntilInitialised();

Expand Down

0 comments on commit e882dfb

Please sign in to comment.