From 2d6d78794d6cb26159133e96a78278373779521f Mon Sep 17 00:00:00 2001 From: Alexander Krimm Date: Thu, 1 Jun 2023 10:28:07 +0200 Subject: [PATCH] ThreadPool: backport changes from graph-prototype Backport of changes from graph prototype. See original issue: https://github.com/fair-acc/graph-prototype/pull/67 Some noteworthy changes: - emscripten compatibility: std::jthread -> std::thread, ranges usages - fix race condition on thread shutdown - fix #243 (starting and stopping threads in ThreadPool) --- concepts/core/BasicThreadPool_example.cpp | 2 +- src/core/include/ThreadAffinity.hpp | 56 +++++++++-------- src/core/include/ThreadPool.hpp | 76 +++++++++++++++-------- src/core/test/ThreadPool_tests.cpp | 51 ++++++++++++++- 4 files changed, 129 insertions(+), 56 deletions(-) diff --git a/concepts/core/BasicThreadPool_example.cpp b/concepts/core/BasicThreadPool_example.cpp index 7af6ef02..a650e557 100644 --- a/concepts/core/BasicThreadPool_example.cpp +++ b/concepts/core/BasicThreadPool_example.cpp @@ -14,7 +14,7 @@ int main() { opencmw::BasicThreadPool poolWork("CustomCpuPool", 1, 1); // pool for CPU-bound tasks with exactly 1 thread opencmw::BasicThreadPool poolIO("CustomIOPool", 1, 1000); // pool for IO-bound (potentially blocking) tasks with at least 1 and a max of 1000 threads - poolIO.keepAliveDuration() = seconds(10); // keeps idling threads alive for 10 seconds + poolIO.keepAliveDuration = seconds(10); // keeps idling threads alive for 10 seconds poolIO.waitUntilInitialised(); // wait until the pool is initialised (optional) assert(poolIO.isInitialised()); // check if the pool is initialised diff --git a/src/core/include/ThreadAffinity.hpp b/src/core/include/ThreadAffinity.hpp index 0f039e93..90cbbc08 100644 --- a/src/core/include/ThreadAffinity.hpp +++ b/src/core/include/ThreadAffinity.hpp @@ -15,7 +15,7 @@ #if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__))) // UNIX-style OS #include -#ifdef _POSIX_VERSION // POSIX compliant +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) #include #include #endif @@ -57,10 +57,14 @@ class thread_exception : public std::error_category { }; template +#ifdef __EMSCRIPTEN__ +concept thread_type = std::is_same_v; +#else concept thread_type = std::is_same_v || std::is_same_v; +#endif namespace detail { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) template constexpr decltype(auto) firstElement(Tp && t, Us && ...) noexcept { return std::forward(t); @@ -80,21 +84,21 @@ namespace detail { } char threadName[THREAD_MAX_NAME_LENGTH]; if (int rc = pthread_getname_np(handle, threadName, THREAD_MAX_NAME_LENGTH); rc != 0) { - throw std::system_error(rc, thread_exception(), fmt::format("getThreadName(thread_type)")); + throw std::system_error(rc, thread_exception(), "getThreadName(thread_type)"); } return std::string{ threadName, std::min(strlen(threadName), THREAD_MAX_NAME_LENGTH) }; } inline int getPid() { return getpid(); } #else - int detail::getPid() { + int getPid() { return 0; } #endif } // namespace detail inline std::string getProcessName(const int pid = detail::getPid()) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) if (std::ifstream in(fmt::format("/proc/{}/comm", pid), std::ios::in); in.is_open()) { std::string fileContent; std::getline(in, fileContent, '\n'); @@ -105,10 +109,10 @@ inline std::string getProcessName(const int pid = detail::getPid()) { } // namespace detail inline std::string getThreadName(thread_type auto &...thread) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) const pthread_t handle = detail::getPosixHandler(thread...); if (handle == 0U) { - throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getThreadName(thread_type)")); + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), "getThreadName(thread_type)"); } return detail::getThreadName(handle); #else @@ -117,7 +121,7 @@ inline std::string getThreadName(thread_type auto &...thread) { } inline void setProcessName(const std::string_view &processName, int pid = detail::getPid()) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) std::ofstream out(fmt::format("/proc/{}/comm", pid), std::ios::out); if (!out.is_open()) { throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessName({},{})", processName, pid)); @@ -128,7 +132,7 @@ inline void setProcessName(const std::string_view &processName, int pid = detail } inline void setThreadName(const std::string_view &threadName, thread_type auto &...thread) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) const pthread_t handle = detail::getPosixHandler(thread...); if (handle == 0U) { throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadName({}, thread_type)", threadName, detail::getThreadName(handle))); @@ -140,7 +144,7 @@ inline void setThreadName(const std::string_view &threadName, thread_type auto & } namespace detail { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) inline std::vector getAffinityMask(const cpu_set_t &cpuSet) { std::vector bitMask(std::min(sizeof(cpu_set_t), static_cast(std::thread::hardware_concurrency()))); for (size_t i = 0; i < bitMask.size(); i++) { @@ -167,7 +171,7 @@ inline constexpr cpu_set_t getAffinityMask(const T &threadMap) { #endif } // namespace detail -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) inline std::vector getThreadAffinity(thread_type auto &...thread) { const pthread_t handle = detail::getPosixHandler(thread...); if (handle == 0U) { @@ -187,15 +191,15 @@ std::vector getThreadAffinity(thread_type auto &...) { template requires requires(T value) { value[0]; } -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) inline constexpr void setThreadAffinity(const T &threadMap, thread_type auto &...thread) { const pthread_t handle = detail::getPosixHandler(thread...); if (handle == 0U) { - throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadAffinity(std::vector = {{{}}}, thread_type)", threadMap.size(), fmt::join(threadMap, ", "))); + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadAffinity(std::vector = {{{}}}, thread_type)", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "))); } cpu_set_t cpuSet = detail::getAffinityMask(threadMap); if (int rc = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuSet); rc != 0) { - throw std::system_error(rc, thread_exception(), fmt::format("setThreadAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), detail::getThreadName(handle))); + throw std::system_error(rc, thread_exception(), fmt::format("setThreadAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), detail::getThreadName(handle))); } } #else @@ -205,7 +209,7 @@ constexpr bool setThreadAffinity(const T &threadMap, thread_type auto &...) { #endif inline std::vector getProcessAffinity(const int pid = detail::getPid()) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) if (pid <= 0) { throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getProcessAffinity({}) -- invalid pid", pid)); } @@ -222,13 +226,13 @@ inline std::vector getProcessAffinity(const int pid = detail::getPid()) { template requires requires(T value) { std::get<0>(value); } inline constexpr bool setProcessAffinity(const T &threadMap, const int pid = detail::getPid()) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) if (pid <= 0) { - throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), pid)); + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), pid)); } cpu_set_t cpuSet = detail::getAffinityMask(threadMap); if (int rc = sched_setaffinity(pid, sizeof(cpu_set_t), &cpuSet); rc != 0) { - throw std::system_error(rc, thread_exception(), fmt::format("setProcessAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), pid)); + throw std::system_error(rc, thread_exception(), fmt::format("setProcessAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), pid)); } return true; @@ -251,7 +255,7 @@ struct SchedulingParameter { namespace detail { inline Policy getEnumPolicy(const int policy) { switch (policy) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) case SCHED_FIFO: return Policy::FIFO; case SCHED_RR: return Policy::ROUND_ROBIN; case SCHED_OTHER: return Policy::OTHER; @@ -263,7 +267,7 @@ inline Policy getEnumPolicy(const int policy) { } // namespace detail inline struct SchedulingParameter getProcessSchedulingParameter(const int pid = detail::getPid()) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) if (pid <= 0) { throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getProcessSchedulingParameter({}) -- invalid pid", pid)); } @@ -274,12 +278,12 @@ inline struct SchedulingParameter getProcessSchedulingParameter(const int pid = } return SchedulingParameter{ .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority }; #else - return struct SchedulingParameter {}; + return {}; #endif } inline void setProcessSchedulingParameter(Policy scheduler, int priority, const int pid = detail::getPid()) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) if (pid <= 0) { throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessSchedulingParameter({}, {}, {}) -- invalid pid", scheduler, priority, pid)); } @@ -298,7 +302,7 @@ inline void setProcessSchedulingParameter(Policy scheduler, int priority, const } inline struct SchedulingParameter getThreadSchedulingParameter(thread_type auto &...thread) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) const pthread_t handle = detail::getPosixHandler(thread...); if (handle == 0U) { throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getThreadSchedulingParameter(thread_type) -- invalid thread")); @@ -308,14 +312,14 @@ inline struct SchedulingParameter getThreadSchedulingParameter(thread_type auto if (int rc = pthread_getschedparam(handle, &policy, ¶m); rc != 0) { throw std::system_error(rc, thread_exception(), fmt::format("getThreadSchedulingParameter({}) - sched_getparam error", detail::getThreadName(handle))); } - return SchedulingParameter{ .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority }; + return { .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority }; #else - return struct SchedulingParameter {}; + return {}; #endif } inline void setThreadSchedulingParameter(Policy scheduler, int priority, thread_type auto &...thread) { -#ifdef _POSIX_VERSION +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) const pthread_t handle = detail::getPosixHandler(thread...); if (handle == 0U) { throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadSchedulingParameter({}, {}, thread_type) -- invalid thread", scheduler, priority)); diff --git a/src/core/include/ThreadPool.hpp b/src/core/include/ThreadPool.hpp index 95b19fc4..6049e874 100644 --- a/src/core/include/ThreadPool.hpp +++ b/src/core/include/ThreadPool.hpp @@ -156,6 +156,8 @@ class TaskQueue { } // namespace thread_pool::detail +class TaskQueue; + enum TaskType { IO_BOUND = 0, CPU_BOUND = 1 @@ -196,7 +198,7 @@ concept ThreadPool = requires(T t, std::function &&func) { * * // pool for IO-bound (potentially blocking) tasks with at least 1 and a max of 1000 threads * opencmw::BasicThreadPool<opencmw::IO_BOUND> poolIO("CustomIOPool", 1, 1000); - * poolIO.keepAliveDuration() = seconds(10); // keeps idling threads alive for 10 seconds (optional) + * poolIO.keepAliveDuration = seconds(10); // keeps idling threads alive for 10 seconds (optional) * poolIO.waitUntilInitialised(); // wait until the pool is initialised (optional) * poolIO.setAffinityMask({ true, true, true, false }); // allows executor threads to run on the first four CPU cores * @@ -232,7 +234,7 @@ class BasicThreadPool { std::mutex _threadListMutex; std::atomic _numThreads = 0U; - std::list _threads; + std::list _threads; std::vector _affinityMask; thread::Policy _schedulingPolicy = thread::Policy::OTHER; @@ -242,10 +244,10 @@ class BasicThreadPool { const uint32_t _minThreads; const uint32_t _maxThreads; - std::chrono::microseconds _sleepDuration = std::chrono::milliseconds(1); - std::chrono::seconds _keepAliveDurationIO = std::chrono::seconds(10); - public: + std::chrono::microseconds sleepDuration = std::chrono::milliseconds(1); + std::chrono::milliseconds keepAliveDuration = std::chrono::seconds(10); + BasicThreadPool(const std::string_view &name = generateName(), uint32_t min = std::thread::hardware_concurrency(), uint32_t max = std::thread::hardware_concurrency()) : _poolName(name), _minThreads(min), _maxThreads(max) { assert(min > 0 && "minimum number of threads must be > 0"); @@ -258,6 +260,9 @@ class BasicThreadPool { ~BasicThreadPool() { _shutdown = true; _condition.notify_all(); + for (auto &t : _threads) { + t.join(); + } } BasicThreadPool(const BasicThreadPool &) = delete; @@ -273,13 +278,14 @@ class BasicThreadPool { [[nodiscard]] std::size_t numTasksRunning() const noexcept { return std::atomic_load_explicit(&_numTasksRunning, std::memory_order_acquire); } [[nodiscard]] std::size_t numTasksQueued() const { return std::atomic_load_explicit(&_numTaskedQueued, std::memory_order_acquire); } [[nodiscard]] std::size_t numTasksRecycled() const { return _recycledTasks.size(); } - std::chrono::microseconds &sleepDuration() noexcept { return _sleepDuration; } - std::chrono::seconds &keepAliveDuration() noexcept { return _keepAliveDurationIO; } [[nodiscard]] bool isInitialised() const { return _initialised.load(std::memory_order::acquire); } void waitUntilInitialised() const { _initialised.wait(false); } void requestShutdown() { _shutdown = true; _condition.notify_all(); + for (auto &t: _threads) { + t.join(); + } } [[nodiscard]] bool isShutdown() const { return _shutdown; } @@ -289,7 +295,7 @@ class BasicThreadPool { void setAffinityMask(const std::vector &threadAffinityMask) { _affinityMask.clear(); - std::ranges::copy(threadAffinityMask, std::back_inserter(_affinityMask)); + std::copy(threadAffinityMask.begin(), threadAffinityMask.end(), std::back_inserter(_affinityMask)); cleanupFinishedThreads(); updateThreadConstraints(); } @@ -312,7 +318,7 @@ class BasicThreadPool { if constexpr (cpuID >= 0) { if (cpuID >= _affinityMask.size() || (cpuID >= 0 && !_affinityMask[cpuID])) { throw std::invalid_argument(fmt::format("requested cpuID {} incompatible with set affinity mask({}): [{}]", - cpuID, _affinityMask.size(), fmt::join(_affinityMask, ", "))); + cpuID, _affinityMask.size(), fmt::join(_affinityMask.begin(), _affinityMask.end(), ", "))); } } _numTaskedQueued.fetch_add(1U); @@ -370,10 +376,10 @@ class BasicThreadPool { std::scoped_lock lock(_threadListMutex); // std::erase_if(_threads, [](auto &thread) { return !thread.joinable(); }); - std::ranges::for_each(_threads, [this, threadID = std::size_t{ 0 }](auto &thread) mutable { updateThreadConstraints(threadID++, thread); }); + std::for_each(_threads.begin(), _threads.end(), [this, threadID = std::size_t{ 0 }](auto &thread) mutable { this->updateThreadConstraints(threadID++, thread); }); } - void updateThreadConstraints(const std::size_t threadID, std::jthread &thread) const { + void updateThreadConstraints(const std::size_t threadID, std::thread &thread) const { thread::setThreadName(fmt::format("{}#{}", _poolName, threadID), thread); thread::setThreadSchedulingParameter(_schedulingPolicy, _schedulingPriority, thread); if (!_affinityMask.empty()) { @@ -382,7 +388,7 @@ class BasicThreadPool { return; } const std::vector affinityMask = distributeThreadAffinityAcrossCores(_affinityMask, threadID); - std::cout << fmt::format("{}#{} affinity mask: {}", _poolName, threadID, fmt::join(affinityMask, ",")) << std::endl; + std::cout << fmt::format("{}#{} affinity mask: {}", _poolName, threadID, fmt::join(affinityMask.begin(), affinityMask.end(), ",")) << std::endl; thread::setThreadAffinity(affinityMask); } } @@ -406,7 +412,7 @@ class BasicThreadPool { void createWorkerThread() { std::scoped_lock lock(_threadListMutex); const std::size_t nThreads = numThreads(); - std::jthread &thread = _threads.emplace_back(&BasicThreadPool::worker, this); + std::thread &thread = _threads.emplace_back(&BasicThreadPool::worker, this); updateThreadConstraints(nThreads + 1, thread); } @@ -465,6 +471,7 @@ class BasicThreadPool { _initialised.notify_all(); } _numThreads.notify_one(); + bool running = true; do { TaskQueue::TaskContainer currentTaskContainer = popTask(); if (!currentTaskContainer.empty()) { @@ -490,27 +497,42 @@ class BasicThreadPool { noop_counter = noop_counter / 2; cleanupFinishedThreads(); - _condition.wait_for(lock, _keepAliveDurationIO, [this] { return numTasksQueued() > 0 || isShutdown(); }); + _condition.wait_for(lock, keepAliveDuration, [this] { return numTasksQueued() > 0 || isShutdown(); }); } + // check if this thread is to be kept timeDiffSinceLastUsed = std::chrono::steady_clock::now() - lastUsed; - } while (!isShutdown() && (numThreads() <= _minThreads || timeDiffSinceLastUsed < _keepAliveDurationIO)); - auto nThread = _numThreads.fetch_sub(1); - _numThreads.notify_all(); - - if (nThread == 1) { - // cleanup - _recycledTasks.clear(); - _taskQueue.clear(); - } + if (isShutdown()) { + auto nThread = _numThreads.fetch_sub(1); + _numThreads.notify_all(); + if (nThread == 1) { // cleanup last thread + _recycledTasks.clear(); + _taskQueue.clear(); + } + running = false; + } else if (timeDiffSinceLastUsed > keepAliveDuration) { // decrease to the minimum of _minThreads in a thread safe way + unsigned long nThreads = numThreads(); + while(nThreads > minThreads()) { // compare and swap loop + if (_numThreads.compare_exchange_weak(nThreads, nThreads - 1, std::memory_order_acq_rel)) { + _numThreads.notify_all(); + if (nThreads == 1) { // cleanup last thread + _recycledTasks.clear(); + _taskQueue.clear(); + } + running = false; + break; + } + } + } + } while (running); } }; template inline std::atomic BasicThreadPool::_globalPoolId = 0U; template inline std::atomic BasicThreadPool::_taskID = 0U; -static_assert(ThreadPool>); -static_assert(ThreadPool>); +static_assert(ThreadPool>); +static_assert(ThreadPool>); -} /* namespace opencmw */ +} -#endif // OPENCMW_CPP_THREADPOOL_HPP +#endif // THREADPOOL_HPP diff --git a/src/core/test/ThreadPool_tests.cpp b/src/core/test/ThreadPool_tests.cpp index c5c97cd8..06f7e60d 100644 --- a/src/core/test/ThreadPool_tests.cpp +++ b/src/core/test/ThreadPool_tests.cpp @@ -10,8 +10,8 @@ TEST_CASE("Basic ThreadPool tests", "[ThreadPool]") { std::atomic enqueueCount{ 0 }; std::atomic executeCount{ 0 }; opencmw::BasicThreadPool pool("TestPool", 1, 2); - REQUIRE_NOTHROW(pool.sleepDuration() = std::chrono::milliseconds(1)); - REQUIRE_NOTHROW(pool.keepAliveDuration() = std::chrono::seconds(10)); + REQUIRE_NOTHROW(pool.sleepDuration = std::chrono::milliseconds(1)); + REQUIRE_NOTHROW(pool.keepAliveDuration = std::chrono::seconds(10)); pool.waitUntilInitialised(); REQUIRE(pool.isInitialised()); std::this_thread::sleep_for(std::chrono::milliseconds(5)); @@ -55,4 +55,51 @@ TEST_CASE("Basic ThreadPool tests", "[ThreadPool]") { counter.wait(1); REQUIRE(counter == 2); } +} + +TEST_CASE("ThreadPool: Thread count tests", "[ThreadPool][MinMaxThreads]") { + struct bounds_def { + std::uint32_t min, max; + }; + std::array bounds{ + bounds_def{1, 1}, + bounds_def{1, 4}, + bounds_def{2, 2}, + bounds_def{2, 8}, + bounds_def{4, 8} + }; + + for (const auto [minThreads, maxThreads]: bounds) { + for (const auto taskCount: {2, 8, 32}) { + fmt::print("## Test with min={} and max={} and taskCount={}\n", minThreads, maxThreads, taskCount); + std::atomic counter{ 0 }; + + // Pool with min and max thread count + opencmw::BasicThreadPool pool("count_test", minThreads, maxThreads); + pool.keepAliveDuration = std::chrono::milliseconds(10); // default is 10 seconds, reducing for testing + pool.waitUntilInitialised(); + + for (int i = 0; i < taskCount; ++i) { + pool.execute([&counter] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::atomic_fetch_add(&counter, 1); + counter.notify_all(); + }); + } + REQUIRE(pool.numThreads() >= minThreads); + // the maximum number of threads is not a hard limit, if there is a burst of execute calls, it will spwawn more than maxThreads trheads. + // REQUIRE(pool.numThreads() == std::min(std::uint32_t(taskCount), maxThreads)); + + for (int i = 0; i < taskCount; ++i) { + counter.wait(i); + REQUIRE(pool.numThreads() >= minThreads); + // REQUIRE(pool.numThreads() <= maxThreads); + } + + // We should have gotten back to minimum + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + REQUIRE(pool.numThreads() == minThreads); + REQUIRE(counter == taskCount); + } + } } \ No newline at end of file