Skip to content

Commit

Permalink
Rethrow exceptions in WorkerThreadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
BUYT-1 committed Sep 10, 2024
1 parent 127018e commit 77a60a7
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
5 changes: 4 additions & 1 deletion src/core/util/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ namespace util {
WorkerThreadPool::WorkerThreadPool(std::size_t thread_num)
: barrier_(thread_num, Completion{*this}) {
assert(thread_num > 1);
tasks_.reserve(thread_num);
tasks_.emplace_back(DoWork{}); // main thread task
worker_threads_.reserve(--thread_num);
try {
while (thread_num--) {
worker_threads_.emplace_back(&WorkerThreadPool::Work, this);
worker_threads_.emplace_back(&WorkerThreadPool::Work, this,
std::ref(tasks_.emplace_back(DoWork{})));
}
} catch (std::system_error&) {
Terminate();
Expand Down
33 changes: 26 additions & 7 deletions src/core/util/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <thread>
Expand All @@ -29,8 +30,22 @@ class WorkerThreadPool {
}
};

struct DoWork {
void operator()(WorkerThreadPool& pool) {
DESBORDANTE_ASSUME(pool.work_);
try {
pool.work_();
} catch (...) {
pool.barrier_.ArriveAndWait();
throw;
}
pool.barrier_.ArriveAndWait();
}
};

Worker work_;
std::vector<std::jthread> worker_threads_;
std::vector<std::packaged_task<void(WorkerThreadPool&)>> tasks_;
util::Barrier<Completion> barrier_;
std::condition_variable working_var_;
std::mutex working_mutex_;
Expand All @@ -41,25 +56,24 @@ class WorkerThreadPool {
action(is_working_);
}

void Work() {
void Work(std::packaged_task<void(WorkerThreadPool&)>& thread_task) {
while (true) {
{
std::unique_lock<std::mutex> lk{working_mutex_};
working_var_.wait(lk, [this]() { return is_working_; });
}
if (!work_) break;
WorkUntilComplete();
ResetAndWork(thread_task);
}
}

void Terminate() {
SetWork(nullptr);
}

void WorkUntilComplete() {
DESBORDANTE_ASSUME(work_);
work_();
barrier_.ArriveAndWait();
void ResetAndWork(std::packaged_task<void(WorkerThreadPool&)>& thread_task) {
thread_task.reset();
thread_task(*this);
}

public:
Expand Down Expand Up @@ -110,7 +124,12 @@ class WorkerThreadPool {

// Main thread must call this to finish.
void Wait() {
WorkUntilComplete();
std::packaged_task<void(WorkerThreadPool&)>& main_task = tasks_.front();
ResetAndWork(main_task);
// Rethrow exceptions
for (std::packaged_task<void(WorkerThreadPool&)>& task : tasks_) {
task.get_future().get();
}
}

std::size_t ThreadNum() const noexcept {
Expand Down

0 comments on commit 77a60a7

Please sign in to comment.