diff --git a/.github/workflows/ci-meson.yml b/.github/workflows/ci-meson.yml index 1ba79096..46cfcad1 100644 --- a/.github/workflows/ci-meson.yml +++ b/.github/workflows/ci-meson.yml @@ -33,7 +33,7 @@ jobs: meson-build-type: [ release, debug ] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 with: fetch-depth: 100 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e549c366..c6c74a7c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,7 +33,7 @@ jobs: cmake-build-type: [ Release, Debug ] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 with: fetch-depth: 100 diff --git a/CMakeLists.txt b/CMakeLists.txt index 673fab0f..46507c3f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,12 +9,15 @@ set_project_warnings(graph-prototype-options) if (EMSCRIPTEN) set(CMAKE_EXECUTABLE_SUFFIX ".js") + target_compile_options(graph-prototype-options INTERFACE + -fwasm-exceptions + -pthread + ) target_link_options(graph-prototype-options INTERFACE "SHELL:-s ALLOW_MEMORY_GROWTH=1" -fwasm-exceptions - ) - target_compile_options(graph-prototype-options INTERFACE - -fwasm-exceptions + -pthread + "SHELL:-s PTHREAD_POOL_SIZE=30" ) endif() diff --git a/include/thread_affinity.hpp b/include/thread_affinity.hpp new file mode 100644 index 00000000..fabd2a5f --- /dev/null +++ b/include/thread_affinity.hpp @@ -0,0 +1,343 @@ +#ifndef THREADAFFINITY_HPP +#define THREADAFFINITY_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__))) // UNIX-style OS +#include +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) +#include +#include +#endif +#endif + +namespace fair::thread_pool::thread { + +constexpr size_t THREAD_MAX_NAME_LENGTH = 16; +constexpr int THREAD_UNINITIALISED = 1; +constexpr int THREAD_ERROR_UNKNOWN = 2; +constexpr int THREAD_VALUE_RANGE = 3; +constexpr int THREAD_INVALID_ARGUMENT = 22; +constexpr int THREAD_ERANGE = 34; + +class thread_exception : public std::error_category { + using std::error_category::error_category; + +public: + constexpr thread_exception() + : std::error_category(){}; + + const char *name() const noexcept override { return "thread_exception"; }; + std::string message(int errorCode) const override { + switch (errorCode) { + case THREAD_UNINITIALISED: + return "thread uninitialised or user does not have the appropriate rights (ie. CAP_SYS_NICE capability)"; + case THREAD_ERROR_UNKNOWN: + return "thread error code 2"; + case THREAD_INVALID_ARGUMENT: + return "invalid argument"; + case THREAD_ERANGE: + return fmt::format("length of the string specified pointed to by name exceeds the allowed limit THREAD_MAX_NAME_LENGTH = '{}'", THREAD_MAX_NAME_LENGTH); + case THREAD_VALUE_RANGE: + return fmt::format("priority out of valid range for scheduling policy", THREAD_MAX_NAME_LENGTH); + default: + return fmt::format("unknown threading error code {}", errorCode); + } + }; +}; + +template +#ifdef __EMSCRIPTEN__ + concept thread_type = std::is_same_v; +#else + concept thread_type = std::is_same_v || std::is_same_v; +#endif + +namespace detail { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + template + constexpr decltype(auto) firstElement(Tp && t, Us && ...) noexcept { + return std::forward(t); + } + + inline constexpr pthread_t getPosixHandler(thread_type auto &...t) noexcept { + if constexpr (sizeof...(t) > 0) { + return firstElement(t...).native_handle(); + } else { + return pthread_self(); + } + } + + inline std::string getThreadName(const pthread_t &handle) { + if (handle == 0U) { + return "uninitialised thread"; + } + char threadName[THREAD_MAX_NAME_LENGTH]; + if (int rc = pthread_getname_np(handle, threadName, THREAD_MAX_NAME_LENGTH); rc != 0) { + throw std::system_error(rc, thread_exception(), "getThreadName(thread_type)"); + } + return std::string{ threadName, std::min(strlen(threadName), THREAD_MAX_NAME_LENGTH) }; + } + + inline int getPid() { return getpid(); } +#else + int getPid() { + return 0; + } +#endif +} // namespace detail + +inline std::string getProcessName(const int pid = detail::getPid()) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + if (std::ifstream in(fmt::format("/proc/{}/comm", pid), std::ios::in); in.is_open()) { + std::string fileContent; + std::getline(in, fileContent, '\n'); + return fileContent; + } +#endif + return "unknown_process"; +} // namespace detail + +inline std::string getThreadName(thread_type auto &...thread) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + const pthread_t handle = detail::getPosixHandler(thread...); + if (handle == 0U) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), "getThreadName(thread_type)"); + } + return detail::getThreadName(handle); +#else + return "unknown thread name"; +#endif +} + +inline void setProcessName(const std::string_view &processName, int pid = detail::getPid()) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + std::ofstream out(fmt::format("/proc/{}/comm", pid), std::ios::out); + if (!out.is_open()) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessName({},{})", processName, pid)); + } + out << std::string{ processName.cbegin(), std::min(15LU, processName.size()) }; + out.close(); +#endif +} + +inline void setThreadName(const std::string_view &threadName, thread_type auto &...thread) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + const pthread_t handle = detail::getPosixHandler(thread...); + if (handle == 0U) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadName({}, thread_type)", threadName, detail::getThreadName(handle))); + } + if (int rc = pthread_setname_np(handle, threadName.data()); rc < 0) { + throw std::system_error(rc, thread_exception(), fmt::format("setThreadName({},{}) - error code '{}'", threadName, detail::getThreadName(handle), rc)); + } +#endif +} + +namespace detail { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) +inline std::vector getAffinityMask(const cpu_set_t &cpuSet) { + std::vector bitMask(std::min(sizeof(cpu_set_t), static_cast(std::thread::hardware_concurrency()))); + for (size_t i = 0; i < bitMask.size(); i++) { + bitMask[i] = CPU_ISSET(i, &cpuSet); + } + return bitMask; +} + +template +requires requires(T value) { value[0]; } +inline constexpr cpu_set_t getAffinityMask(const T &threadMap) { + cpu_set_t cpuSet; + CPU_ZERO(&cpuSet); + size_t nMax = std::min(threadMap.size(), static_cast(std::thread::hardware_concurrency())); + for (size_t i = 0; i < nMax; i++) { + if (threadMap[i]) { + CPU_SET(i, &cpuSet); + } else { + CPU_CLR(i, &cpuSet); + } + } + return cpuSet; +} +#endif +} // namespace detail + +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) +inline std::vector getThreadAffinity(thread_type auto &...thread) { + const pthread_t handle = detail::getPosixHandler(thread...); + if (handle == 0U) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getThreadAffinity(thread_type)")); + } + cpu_set_t cpuSet; + if (int rc = pthread_getaffinity_np(handle, sizeof(cpu_set_t), &cpuSet); rc != 0) { + throw std::system_error(rc, thread_exception(), fmt::format("getThreadAffinity({})", detail::getThreadName(handle))); + } + return detail::getAffinityMask(cpuSet); +} +#else +std::vector getThreadAffinity(thread_type auto &...) { + return std::vector(std::thread::hardware_concurrency()); // cannot set affinity for non-posix threads +} +#endif + +template +requires requires(T value) { value[0]; } +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) +inline constexpr void setThreadAffinity(const T &threadMap, thread_type auto &...thread) { + const pthread_t handle = detail::getPosixHandler(thread...); + if (handle == 0U) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadAffinity(std::vector = {{{}}}, thread_type)", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "))); + } + cpu_set_t cpuSet = detail::getAffinityMask(threadMap); + if (int rc = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuSet); rc != 0) { + throw std::system_error(rc, thread_exception(), fmt::format("setThreadAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), detail::getThreadName(handle))); + } +} +#else +constexpr bool setThreadAffinity(const T &threadMap, thread_type auto &...) { + return false; // cannot set affinity for non-posix threads +} +#endif + +inline std::vector getProcessAffinity(const int pid = detail::getPid()) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + if (pid <= 0) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getProcessAffinity({}) -- invalid pid", pid)); + } + cpu_set_t cpuSet; + if (int rc = sched_getaffinity(pid, sizeof(cpu_set_t), &cpuSet); rc != 0) { + throw std::system_error(rc, thread_exception(), fmt::format("getProcessAffinity(std::bitset<{{}}> = {{}}, thread_type)")); // todo: fix format string + } + return detail::getAffinityMask(cpuSet); +#else + return std::vector(std::thread::hardware_concurrency()); // cannot set affinity for non-posix threads +#endif +} + +template +requires requires(T value) { std::get<0>(value); } +inline constexpr bool setProcessAffinity(const T &threadMap, const int pid = detail::getPid()) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + if (pid <= 0) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), pid)); + } + cpu_set_t cpuSet = detail::getAffinityMask(threadMap); + if (int rc = sched_setaffinity(pid, sizeof(cpu_set_t), &cpuSet); rc != 0) { + throw std::system_error(rc, thread_exception(), fmt::format("setProcessAffinity(std::vector = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), pid)); + } + + return true; +#else + return false; // cannot set affinity for non-posix threads +#endif +} +enum Policy { + UNKNOWN = -1, + OTHER = 0, + FIFO = 1, + ROUND_ROBIN = 2 +}; + +struct SchedulingParameter { + Policy policy; // e.g. SCHED_OTHER, SCHED_RR, FSCHED_FIFO + int priority; +}; + +namespace detail { +inline Policy getEnumPolicy(const int policy) { + switch (policy) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + case SCHED_FIFO: return Policy::FIFO; + case SCHED_RR: return Policy::ROUND_ROBIN; + case SCHED_OTHER: return Policy::OTHER; +#endif + default: + return Policy::UNKNOWN; + } +} +} // namespace detail + +inline struct SchedulingParameter getProcessSchedulingParameter(const int pid = detail::getPid()) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + if (pid <= 0) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getProcessSchedulingParameter({}) -- invalid pid", pid)); + } + struct sched_param param; + const int policy = sched_getscheduler(pid); + if (int rc = sched_getparam(pid, ¶m); rc != 0) { + throw std::system_error(rc, thread_exception(), fmt::format("getProcessSchedulingParameter({}) - sched_getparam error", pid)); + } + return SchedulingParameter{ .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority }; +#else + return {}; +#endif +} + +inline void setProcessSchedulingParameter(Policy scheduler, int priority, const int pid = detail::getPid()) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + if (pid <= 0) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessSchedulingParameter({}, {}, {}) -- invalid pid", scheduler, priority, pid)); + } + const int minPriority = sched_get_priority_min(scheduler); + const int maxPriority = sched_get_priority_max(scheduler); + if (priority < minPriority || priority > maxPriority) { + throw std::system_error(THREAD_VALUE_RANGE, thread_exception(), fmt::format("setProcessSchedulingParameter({}, {}, {}) -- requested priority out-of-range [{}, {}]", scheduler, priority, pid, minPriority, maxPriority)); + } + struct sched_param param { + .sched_priority = priority + }; + if (int rc = sched_setscheduler(pid, scheduler, ¶m); rc != 0) { + throw std::system_error(rc, thread_exception(), fmt::format("setProcessSchedulingParameter({}, {}, {}) - sched_setscheduler return code: {}", scheduler, priority, pid, rc)); + } +#endif +} + +inline struct SchedulingParameter getThreadSchedulingParameter(thread_type auto &...thread) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + const pthread_t handle = detail::getPosixHandler(thread...); + if (handle == 0U) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getThreadSchedulingParameter(thread_type) -- invalid thread")); + } + struct sched_param param; + int policy; + if (int rc = pthread_getschedparam(handle, &policy, ¶m); rc != 0) { + throw std::system_error(rc, thread_exception(), fmt::format("getThreadSchedulingParameter({}) - sched_getparam error", detail::getThreadName(handle))); + } + return { .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority }; +#else + return {}; +#endif +} + +inline void setThreadSchedulingParameter(Policy scheduler, int priority, thread_type auto &...thread) { +#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__) + const pthread_t handle = detail::getPosixHandler(thread...); + if (handle == 0U) { + throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadSchedulingParameter({}, {}, thread_type) -- invalid thread", scheduler, priority)); + } + const int minPriority = sched_get_priority_min(scheduler); + const int maxPriority = sched_get_priority_max(scheduler); + if (priority < minPriority || priority > maxPriority) { + throw std::system_error(THREAD_VALUE_RANGE, thread_exception(), fmt::format("setThreadSchedulingParameter({}, {}, {}) -- requested priority out-of-range [{}, {}]", scheduler, priority, detail::getThreadName(handle), minPriority, maxPriority)); + } + struct sched_param param { + .sched_priority = priority + }; + if (int rc = pthread_setschedparam(handle, scheduler, ¶m); rc != 0) { + throw std::system_error(rc, thread_exception(), fmt::format("setThreadSchedulingParameter({}, {}, {}) - pthread_setschedparam return code: {}", scheduler, priority, detail::getThreadName(handle), rc)); + } +#endif +} + +} // namespace opencmw::thread + +#endif // THREADAFFINITY_HPP diff --git a/include/thread_pool.hpp b/include/thread_pool.hpp new file mode 100644 index 00000000..b0542b19 --- /dev/null +++ b/include/thread_pool.hpp @@ -0,0 +1,644 @@ +#ifndef THREADPOOL_HPP +#define THREADPOOL_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace fair::thread_pool { +namespace detail { + +// TODO remove all the below and use std when moved to modules // support code from mpunits for basic_fixed_string +template +constexpr bool equal(InputIt1 first1, InputIt1 last1, InputIt2 first2) +{ + for (; first1 != last1; ++first1, ++first2) { + if (!(*first1 == *first2)) { + return false; + } + } + return true; +} + +template +constexpr auto lexicographical_compare_three_way(I1 f1, I1 l1, I2 f2, I2 l2, Cmp comp) -> decltype(comp(*f1, *f2)) +{ + using ret_t = decltype(comp(*f1, *f2)); + static_assert(std::disjunction_v, std::is_same, + std::is_same>, + "The return type must be a comparison category type."); + + bool exhaust1 = (f1 == l1); + bool exhaust2 = (f2 == l2); + for (; !exhaust1 && !exhaust2; exhaust1 = (++f1 == l1), exhaust2 = (++f2 == l2)) if (auto c = comp(*f1, *f2); c != 0) return c; + + return !exhaust1 ? std::strong_ordering::greater + : !exhaust2 ? std::strong_ordering::less + : std::strong_ordering::equal; +} + +template +constexpr auto lexicographical_compare_three_way(I1 f1, I1 l1, I2 f2, I2 l2) +{ + return lexicographical_compare_three_way(f1, l1, f2, l2, std::compare_three_way()); +} +/** + * @brief A compile-time fixed string + * taken from https://github.com/mpusz/units/blob/master/src/core/include/units/bits/external/fixed_string.h + * + * @tparam CharT Character type to be used by the string + * @tparam N The size of the string + */ +template +struct basic_fixed_string { + CharT data_[N + 1] = {}; + + using iterator = CharT*; + using const_iterator = const CharT*; + + constexpr explicit(false) basic_fixed_string(CharT ch) noexcept { data_[0] = ch; } + + constexpr explicit(false) basic_fixed_string(const CharT (&txt)[N + 1]) noexcept + { + if constexpr (N != 0) + for (std::size_t i = 0; i < N; ++i) data_[i] = txt[i]; + } + + [[nodiscard]] constexpr bool empty() const noexcept { return N == 0; } + [[nodiscard]] constexpr std::size_t size() const noexcept { return N; } + [[nodiscard]] constexpr const CharT* data() const noexcept { return data_; } + [[nodiscard]] constexpr const CharT* c_str() const noexcept { return data(); } + [[nodiscard]] constexpr const CharT& operator[](std::size_t index) const noexcept { return data()[index]; } + [[nodiscard]] constexpr CharT operator[](std::size_t index) noexcept { return data()[index]; } + + [[nodiscard]] constexpr iterator begin() noexcept { return data(); } + [[nodiscard]] constexpr const_iterator begin() const noexcept { return data(); } + [[nodiscard]] constexpr iterator end() noexcept { return data() + size(); } + [[nodiscard]] constexpr const_iterator end() const noexcept { return data() + size(); } + + template + [[nodiscard]] constexpr friend basic_fixed_string operator+( + const basic_fixed_string& lhs, const basic_fixed_string& rhs) noexcept + { + CharT txt[N + N2 + 1] = {}; + + for (size_t i = 0; i != N; ++i) txt[i] = lhs[i]; + for (size_t i = 0; i != N2; ++i) txt[N + i] = rhs[i]; + + return basic_fixed_string(txt); + } + + [[nodiscard]] constexpr bool operator==(const basic_fixed_string& other) const + { + if (size() != other.size()) return false; + return detail::equal(begin(), end(), other.begin()); // TODO std::ranges::equal(*this, other) + } + + template + [[nodiscard]] friend constexpr bool operator==(const basic_fixed_string&, const basic_fixed_string&) + { + return false; + } + + template + [[nodiscard]] friend constexpr auto operator<=>(const basic_fixed_string& lhs, + const basic_fixed_string& rhs) + { + // TODO std::lexicographical_compare_three_way(lhs.begin(), lhs.end(), rhs.begin(), rhs.end()); + return detail::lexicographical_compare_three_way(lhs.begin(), lhs.end(), rhs.begin(), rhs.end()); + } +}; + +template +basic_fixed_string(const CharT (&str)[N]) -> basic_fixed_string; + +template +basic_fixed_string(CharT) -> basic_fixed_string; + +template +using fixed_string = basic_fixed_string; + +/** + * @brief a move-only implementation of std::function by Matthias Kretz, GSI + * TODO(C++23): to be replaced once C++23's STL version is out/available: + * https://en.cppreference.com/w/cpp/utility/functional/move_only_function/move_only_function + */ +class move_only_function { + using FunPtr = std::unique_ptr; + FunPtr _erased_fun = { nullptr, [](void *) {} }; + void (*_call)(void *) = nullptr; + +public: + constexpr move_only_function() = default; + + template + requires(!std::is_reference_v) constexpr move_only_function(F &&fun) + : _erased_fun(new F(std::forward(fun)), [](void *ptr) { delete static_cast(ptr); }), _call([](void *ptr) { (*static_cast(ptr))(); }) {} + + template + requires(!std::is_reference_v) constexpr move_only_function &operator=(F &&fun) { + _erased_fun = FunPtr(new F(std::forward(fun)), [](void *ptr) { delete static_cast(ptr); }); + _call = [](void *ptr) { (*static_cast(ptr))(); }; + return *this; + } + + constexpr void operator()() { + if (_call) { + _call(_erased_fun.get()); + } + } + constexpr void operator()() const { + if (_call) { + _call(_erased_fun.get()); + } + } +}; + +struct Task { + uint64_t id; + move_only_function func; + std::string name{}; // Default value provided to avoid warnings on construction with {.id = ..., .func = ...} + int32_t priority = 0; + int32_t cpuID = -1; + std::weak_ordering operator<=>(const Task &other) const noexcept { return priority <=> other.priority; } + + // We want to reuse objects to avoid reallocations + void reset() noexcept { + *this = Task(); + } +}; + +template +struct conditional_lock : public std::scoped_lock { + using std::scoped_lock::scoped_lock; +}; + +template +struct conditional_lock { + conditional_lock(const Args &...){}; +}; + +class TaskQueue { +public: + using TaskContainer = std::list; + +private: + mutable gr::AtomicMutex<> _mutex; + + TaskContainer _tasks; + + template + using conditional_lock = conditional_lock>; + +public: + TaskQueue() = default; + TaskQueue(const TaskQueue &queue) = delete; + TaskQueue &operator=(const TaskQueue &queue) = delete; + ~TaskQueue() { + clear(); + } + + template + void clear() { + conditional_lock lock(_mutex); + _tasks.clear(); + } + + template + std::size_t size() const { + conditional_lock lock(_mutex); + return _tasks.size(); + } + + template + void push(TaskContainer jobContainer) { + conditional_lock lock(_mutex); + assert(!jobContainer.empty()); + auto &job = jobContainer.front(); + const auto currentJobPriority = job.priority; + + const auto insertPosition = [&] { + if (currentJobPriority == 0) { + return _tasks.end(); + } else { + return std::find_if(_tasks.begin(), _tasks.end(), + [currentJobPriority](const auto &task) { + return task.priority < currentJobPriority; + }); + } + }(); + + _tasks.splice(insertPosition, jobContainer, jobContainer.begin(), jobContainer.end()); + } + + template + TaskContainer pop() { + conditional_lock lock(_mutex); + TaskContainer result; + if (!_tasks.empty()) { + result.splice(result.begin(), _tasks, _tasks.begin(), std::next(_tasks.begin())); + } + return result; + } +}; + +} // namespace thread_pool::detail + +class TaskQueue; + +enum TaskType { + IO_BOUND = 0, + CPU_BOUND = 1 +}; + +template +concept ThreadPool = requires(T t, std::function &&func) { + { t.execute(std::move(func)) } -> std::same_as; +}; + +/** + *

Basic thread pool that uses a fixed-number or optionally grow/shrink between a [min, max] number of threads.

+ * The growth policy is controlled by the TaskType template parameter: + *
    + *
  1. TaskType::IO_BOUND if the task is IO bound, i.e. it is likely to block the thread for a long time, or + *
  2. TaskType::CPU_BOUND if the task is CPU bound, i.e. it is primarily limited by the CPU and memory bandwidth. + *
+ *
+ * For the IO_BOUND policy, unused threads are kept alive for a pre-defined amount of time to be reused and gracefully + * shut down to the minimum number of threads when unused. + *
+ * For the CPU_BOUND policy, the threads are equally spread and pinned across the set CPU affinity. + *
+ * The CPU affinity and OS scheduling policy and priorities are controlled by: + *
    + *
  • setAffinityMask(std::vector<bool> threadAffinityMask);
  • + *
  • setThreadSchedulingPolicy(const thread::Policy schedulingPolicy, const int schedulingPriority)
  • + *
+ * Some user-level examples:
+ * @code + * + * // pool for CPU-bound tasks with exactly 1 thread + * opencmw::BasicThreadPool<opencmw::CPU_BOUND> poolWork("CustomCpuPool", 1, 1); + * // enqueue and add task to list -- w/o return type + * poolWork.execute([] { fmt::print("Hello World from thread '{}'!\n", getThreadName()); }); // here: caller thread-name + * poolWork.execute([](const auto &...args) { fmt::print(fmt::runtime("Hello World from thread '{}'!\n"), args...); }, getThreadName()); // here: executor thread-name + * // [..] + * + * // pool for IO-bound (potentially blocking) tasks with at least 1 and a max of 1000 threads + * opencmw::BasicThreadPool<opencmw::IO_BOUND> poolIO("CustomIOPool", 1, 1000); + * poolIO.keepAliveDuration = seconds(10); // keeps idling threads alive for 10 seconds (optional) + * poolIO.waitUntilInitialised(); // wait until the pool is initialised (optional) + * poolIO.setAffinityMask({ true, true, true, false }); // allows executor threads to run on the first four CPU cores + * + * constexpr auto func1 = [](const auto &...args) { return fmt::format(fmt::runtime("thread '{1}' scheduled task '{0}'!\n"), getThreadName(), args...); }; + * std::future<std::string> result = poolIO.execute<"customTaskName">(func1, getThreadName()); // N.B. the calling thread is owner of the std::future + * + * // execute a task with a name, a priority and single-core affinity (here: 2) + * poolIO.execute<"task name", 20U, 2>([]() { fmt::print("Hello World from custom thread '{}'!\n", getThreadName()); }); + * + * try { + * poolIO.execute<"customName", 20U, 3>([]() { [..] this potentially long-running task is trackable via it's 'customName' thread name [..] }); + * } catch (const std::invalid_argument &e) { + * fmt::print("caught exception: {}\n", e.what()); + * } + * @endcode + */ +template +class BasicThreadPool { + using Task = thread_pool::detail::Task; + using TaskQueue = thread_pool::detail::TaskQueue; + static std::atomic _globalPoolId; + static std::atomic _taskID; + static std::string generateName() { return fmt::format("BasicThreadPool#{}", _globalPoolId.fetch_add(1)); } + + std::atomic _initialised = ATOMIC_FLAG_INIT; + bool _shutdown = false; + + std::condition_variable _condition; + std::atomic _numTaskedQueued = 0U; // cache for _taskQueue.size() + std::atomic _numTasksRunning = 0U; + TaskQueue _taskQueue; + TaskQueue _recycledTasks; + + std::mutex _threadListMutex; + std::atomic _numThreads = 0U; + std::list _threads; + + std::vector _affinityMask; + thread::Policy _schedulingPolicy = thread::Policy::OTHER; + int _schedulingPriority = 0; + + const std::string _poolName; + const uint32_t _minThreads; + const uint32_t _maxThreads; + +public: + std::chrono::microseconds sleepDuration = std::chrono::milliseconds(1); + std::chrono::milliseconds keepAliveDuration = std::chrono::seconds(10); + + BasicThreadPool(const std::string_view &name = generateName(), uint32_t min = std::thread::hardware_concurrency(), uint32_t max = std::thread::hardware_concurrency()) + : _poolName(name), _minThreads(min), _maxThreads(max) { + assert(min > 0 && "minimum number of threads must be > 0"); + assert(min <= max && "minimum number of threads must be <= maximum number of threads"); + for (uint32_t i = 0; i < _minThreads; ++i) { + createWorkerThread(); + } + } + + ~BasicThreadPool() { + _shutdown = true; + _condition.notify_all(); + for (auto &t : _threads) { + t.join(); + } + } + + BasicThreadPool(const BasicThreadPool &) = delete; + BasicThreadPool(BasicThreadPool &&) = delete; + BasicThreadPool &operator=(const BasicThreadPool &) = delete; + BasicThreadPool &operator=(BasicThreadPool &&) = delete; + + [[nodiscard]] std::string poolName() const noexcept { return _poolName; } + [[nodiscard]] uint32_t minThreads() const noexcept { return _minThreads; }; + [[nodiscard]] uint32_t maxThreads() const noexcept { return _maxThreads; }; + + [[nodiscard]] std::size_t numThreads() const noexcept { return std::atomic_load_explicit(&_numThreads, std::memory_order_acquire); } + [[nodiscard]] std::size_t numTasksRunning() const noexcept { return std::atomic_load_explicit(&_numTasksRunning, std::memory_order_acquire); } + [[nodiscard]] std::size_t numTasksQueued() const { return std::atomic_load_explicit(&_numTaskedQueued, std::memory_order_acquire); } + [[nodiscard]] std::size_t numTasksRecycled() const { return _recycledTasks.size(); } + [[nodiscard]] bool isInitialised() const { return _initialised.load(std::memory_order::acquire); } + void waitUntilInitialised() const { _initialised.wait(false); } + void requestShutdown() { + _shutdown = true; + _condition.notify_all(); + for (auto &t: _threads) { + t.join(); + } + } + [[nodiscard]] bool isShutdown() const { return _shutdown; } + + // + + [[nodiscard]] std::vector getAffinityMask() const { return _affinityMask; } + + void setAffinityMask(const std::vector &threadAffinityMask) { + _affinityMask.clear(); + std::copy(threadAffinityMask.begin(), threadAffinityMask.end(), std::back_inserter(_affinityMask)); + cleanupFinishedThreads(); + updateThreadConstraints(); + } + + [[nodiscard]] auto getSchedulingPolicy() const { return _schedulingPolicy; } + + [[nodiscard]] auto getSchedulingPriority() const { return _schedulingPriority; } + + void setThreadSchedulingPolicy(const thread::Policy schedulingPolicy = thread::Policy::OTHER, const int schedulingPriority = 0) { + _schedulingPolicy = schedulingPolicy; + _schedulingPriority = schedulingPriority; + cleanupFinishedThreads(); + updateThreadConstraints(); + } + + // TODO: Do we need support for cancellation? + template> + requires(std::is_same_v) void execute(Callable &&func, Args &&...args) { + static thread_local gr::SpinWait spinWait; + if constexpr (cpuID >= 0) { + if (cpuID >= _affinityMask.size() || (cpuID >= 0 && !_affinityMask[cpuID])) { + throw std::invalid_argument(fmt::format("requested cpuID {} incompatible with set affinity mask({}): [{}]", + cpuID, _affinityMask.size(), fmt::join(_affinityMask.begin(), _affinityMask.end(), ", "))); + } + } + _numTaskedQueued.fetch_add(1U); + + _taskQueue.push(createTask(std::forward(func), std::forward(args)...)); + _condition.notify_one(); + if constexpr (taskType == TaskType::IO_BOUND) { + spinWait.spinOnce(); + spinWait.spinOnce(); + while (_taskQueue.size() > 0) { + if (const auto nThreads = numThreads(); nThreads <= numTasksRunning() && nThreads <= _maxThreads) { + createWorkerThread(); + } + _condition.notify_one(); + spinWait.spinOnce(); + spinWait.spinOnce(); + } + spinWait.reset(); + } + } + + template> + requires(!std::is_same_v) + [[nodiscard]] std::future execute(Callable &&func, Args &&...funcArgs) { + if constexpr (cpuID >= 0) { + if (cpuID >= _affinityMask.size() || (cpuID >= 0 && !_affinityMask[cpuID])) { + throw std::invalid_argument(fmt::format("cpuID {} is out of range [0,{}] or incompatible with set affinity mask [{}]", + cpuID, _affinityMask.size(), _affinityMask)); + } + } + std::promise promise; + auto result = promise.get_future(); + auto lambda = [promise = std::move(promise), func = std::forward(func), ... args = std::forward(funcArgs)]() mutable { + try { + promise.set_value(func(args...)); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }; + execute(std::move(lambda)); + return result; + } + +private: + void cleanupFinishedThreads() { + std::scoped_lock lock(_threadListMutex); + // TODO: + // (C++Ref) A thread that has finished executing code, but has not yet been + // joined is still considered an active thread of execution and is + // therefore joinable. + // std::erase_if(_threads, [](auto &thread) { return !thread.joinable(); }); + } + + void updateThreadConstraints() { + std::scoped_lock lock(_threadListMutex); + // std::erase_if(_threads, [](auto &thread) { return !thread.joinable(); }); + + std::for_each(_threads.begin(), _threads.end(), [this, threadID = std::size_t{ 0 }](auto &thread) mutable { this->updateThreadConstraints(threadID++, thread); }); + } + + void updateThreadConstraints(const std::size_t threadID, std::thread &thread) const { + thread::setThreadName(fmt::format("{}#{}", _poolName, threadID), thread); + thread::setThreadSchedulingParameter(_schedulingPolicy, _schedulingPriority, thread); + if (!_affinityMask.empty()) { + if (taskType == TaskType::IO_BOUND) { + thread::setThreadAffinity(_affinityMask); + return; + } + const std::vector affinityMask = distributeThreadAffinityAcrossCores(_affinityMask, threadID); + std::cout << fmt::format("{}#{} affinity mask: {}", _poolName, threadID, fmt::join(affinityMask.begin(), affinityMask.end(), ",")) << std::endl; + thread::setThreadAffinity(affinityMask); + } + } + + std::vector distributeThreadAffinityAcrossCores(const std::vector &globalAffinityMask, const std::size_t threadID) const { + if (globalAffinityMask.empty()) { + return {}; + } + std::vector affinityMask; + std::size_t coreCount = 0; + for (bool value : globalAffinityMask) { + if (value) { + affinityMask.push_back(coreCount++ % _minThreads == threadID); + } else { + affinityMask.push_back(false); + } + } + return affinityMask; + } + + void createWorkerThread() { + std::scoped_lock lock(_threadListMutex); + const std::size_t nThreads = numThreads(); + std::thread &thread = _threads.emplace_back(&BasicThreadPool::worker, this); + updateThreadConstraints(nThreads + 1, thread); + } + + template + auto createTask(Callable &&func, Args &&...funcArgs) { + const auto getTask = [&recycledTasks = _recycledTasks](Callable &&f, Args &&...args) { + auto extracted = recycledTasks.pop(); + if (extracted.empty()) { + if constexpr (sizeof...(Args) == 0) { + extracted.push_front(Task{ .id = _taskID.fetch_add(1U) + 1U, .func = std::move(f) }); + } else { + extracted.push_front(Task{ .id = _taskID.fetch_add(1U) + 1U, .func = std::move(std::bind_front(std::forward(f), std::forward(args)...)) }); + } + } else { + auto &task = extracted.front(); + task.id = _taskID.fetch_add(1U) + 1U; + if constexpr (sizeof...(Args) == 0) { + task.func = std::move(f); + } else { + task.func = std::move(std::bind_front(std::forward(f), std::forward(args)...)); + } + } + return extracted; + }; + + auto taskContainer = getTask(std::forward(func), std::forward(funcArgs)...); + auto &task = taskContainer.front(); + + if constexpr (!taskName.empty()) { + task.name = taskName.c_str(); + } + task.priority = priority; + task.cpuID = cpuID; + + return taskContainer; + } + + TaskQueue::TaskContainer popTask() { + auto result = _taskQueue.pop(); + if (!result.empty()) { + _numTaskedQueued.fetch_sub(1U); + } + return result; + } + + void worker() { + constexpr uint32_t N_SPIN = 1 << 8; + uint32_t noop_counter = 0; + const auto threadID = _numThreads.fetch_add(1); + std::mutex mutex; + std::unique_lock lock(mutex); + auto lastUsed = std::chrono::steady_clock::now(); + auto timeDiffSinceLastUsed = std::chrono::steady_clock::now() - lastUsed; + if (numThreads() >= _minThreads) { + std::atomic_store_explicit(&_initialised, true, std::memory_order_release); + _initialised.notify_all(); + } + _numThreads.notify_one(); + bool running = true; + do { + TaskQueue::TaskContainer currentTaskContainer = popTask(); + if (!currentTaskContainer.empty()) { + assert(!currentTaskContainer.empty()); + auto ¤tTask = currentTaskContainer.front(); + _numTasksRunning.fetch_add(1); + bool nameSet = !(currentTask.name.empty()); + if (nameSet) { + thread::setThreadName(currentTask.name); + } + currentTask.func(); + // execute dependent children + currentTask.reset(); + _recycledTasks.push(std::move(currentTaskContainer)); + _numTasksRunning.fetch_sub(1); + if (nameSet) { + thread::setThreadName(fmt::format("{}#{}", _poolName, threadID)); + } + lastUsed = std::chrono::steady_clock::now(); + noop_counter = 0; + } else if (++noop_counter > N_SPIN) [[unlikely]] { + // perform some thread maintenance tasks before going to sleep + noop_counter = noop_counter / 2; + cleanupFinishedThreads(); + + _condition.wait_for(lock, keepAliveDuration, [this] { return numTasksQueued() > 0 || isShutdown(); }); + } + // check if this thread is to be kept + timeDiffSinceLastUsed = std::chrono::steady_clock::now() - lastUsed; + if (isShutdown()) { + auto nThread = _numThreads.fetch_sub(1); + _numThreads.notify_all(); + if (nThread == 1) { // cleanup last thread + _recycledTasks.clear(); + _taskQueue.clear(); + } + running = false; + } else if (timeDiffSinceLastUsed > keepAliveDuration) { // decrease to the minimum of _minThreads in a thread safe way + unsigned long nThreads = numThreads(); + while(nThreads > minThreads()) { // compare and swap loop + if (_numThreads.compare_exchange_weak(nThreads, nThreads - 1, std::memory_order_acq_rel)) { + _numThreads.notify_all(); + if (nThreads == 1) { // cleanup last thread + _recycledTasks.clear(); + _taskQueue.clear(); + } + running = false; + break; + } + } + } + } while (running); + } +}; +template +inline std::atomic BasicThreadPool::_globalPoolId = 0U; +template +inline std::atomic BasicThreadPool::_taskID = 0U; +static_assert(ThreadPool>); +static_assert(ThreadPool>); + +} + +#endif // THREADPOOL_HPP diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e3a5cf12..0500cc66 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -17,3 +17,5 @@ add_ut_test(qa_filter) add_ut_test(qa_settings) add_ut_test(qa_tags) add_ut_test(qa_scheduler) +add_ut_test(qa_thread_pool) +add_ut_test(qa_thread_affinity) diff --git a/test/qa_thread_affinity.cpp b/test/qa_thread_affinity.cpp new file mode 100644 index 00000000..a8be86be --- /dev/null +++ b/test/qa_thread_affinity.cpp @@ -0,0 +1,174 @@ +#include +#include +#include + +#if defined(__clang__) && __clang_major__ >= 16 +// clang 16 does not like ut's default reporter_junit due to some issues with stream buffers and output redirection +template<> +auto boost::ut::cfg = boost::ut::runner>{}; +#endif + +const boost::ut::suite ThreadAffinityTests = [] { + using namespace boost::ut; + + "thread_exception"_test = [] { + expect(nothrow([]{fair::thread_pool::thread::thread_exception();})); + expect(fair::thread_pool::thread::thread_exception().name() == "thread_exception"_b); + expect(fair::thread_pool::thread::thread_exception().message(-1) == "unknown threading error code -1"_b); + expect(fair::thread_pool::thread::thread_exception().message(-2) == "unknown threading error code -2"_b); + expect(!fair::thread_pool::thread::thread_exception().message(fair::thread_pool::thread::THREAD_UNINITIALISED).starts_with("unknown threading error code")); + expect(!fair::thread_pool::thread::thread_exception().message(fair::thread_pool::thread::THREAD_ERROR_UNKNOWN).starts_with("unknown threading error code")); + expect(!fair::thread_pool::thread::thread_exception().message(fair::thread_pool::thread::THREAD_VALUE_RANGE).starts_with("unknown threading error code")); + expect(!fair::thread_pool::thread::thread_exception().message(fair::thread_pool::thread::THREAD_ERANGE).starts_with("unknown threading error code")); + }; + + "thread_helper"_test = [] { +#if not defined(__EMSCRIPTEN__) + expect(that % fair::thread_pool::thread::detail::getEnumPolicy(SCHED_FIFO) == fair::thread_pool::thread::Policy::FIFO); + expect(that % fair::thread_pool::thread::detail::getEnumPolicy(SCHED_RR) == fair::thread_pool::thread::Policy::ROUND_ROBIN); + expect(that % fair::thread_pool::thread::detail::getEnumPolicy(SCHED_OTHER) == fair::thread_pool::thread::Policy::OTHER); +#endif + expect(that % fair::thread_pool::thread::detail::getEnumPolicy(-1) == fair::thread_pool::thread::Policy::UNKNOWN); + expect(that % fair::thread_pool::thread::detail::getEnumPolicy(-2) == fair::thread_pool::thread::Policy::UNKNOWN); + }; + +#if not defined(__EMSCRIPTEN__) + "basic thread affinity"_test = [] { + using namespace fair::thread_pool; + std::atomic run = true; + const auto dummyAction = [&run]() { while (run) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } }; + std::thread testThread(dummyAction); + + constexpr std::array threadMap = { true, false, false, false }; + thread::setThreadAffinity(threadMap, testThread); + auto affinity = thread::getThreadAffinity(testThread); + bool equal = true; + for (size_t i = 0; i < std::min(threadMap.size(), affinity.size()); i++) { + if (threadMap[i] != affinity[i]) { + equal = false; + } + } + expect(equal) << fmt::format("set {{{}}} affinity map does not match get {{{}}} map", fmt::join(threadMap, ", "), fmt::join(affinity, ", ")); + + // tests w/o thread argument + constexpr std::array threadMapOn = { true, true }; + thread::setThreadAffinity(threadMapOn); + affinity = thread::getThreadAffinity(); + for (size_t i = 0; i < std::min(threadMapOn.size(), affinity.size()); i++) { + if (threadMapOn[i] != affinity[i]) { + equal = false; + } + } + expect(equal) << fmt::format("set {{{}}} affinity map does not match get {{{}}} map", fmt::join(threadMap, ", "), fmt::join(affinity, ", ")); + + std::thread bogusThread; + expect(throws([&]{ thread::getThreadAffinity(bogusThread); })); + expect(throws([&]{ thread::setThreadAffinity(threadMapOn, bogusThread); })); + + run = false; + testThread.join(); + }; + + "basic process affinity"_test = [] { + using namespace fair::thread_pool; + constexpr std::array threadMap = { true, false, false, false }; + thread::setProcessAffinity(threadMap); + auto affinity = thread::getProcessAffinity(); + bool equal = true; + for (size_t i = 0; i < std::min(threadMap.size(), affinity.size()); i++) { + if (threadMap[i] != affinity[i]) { + equal = false; + } + } + expect(equal) << fmt::format("set {{{}}} affinity map does not match get {{{}}} map", fmt::join(threadMap, ", "), fmt::join(affinity, ", ")); + constexpr std::array threadMapOn = { true, true, true, true }; + thread::setProcessAffinity(threadMapOn); + expect(throws([&]{ thread::getProcessAffinity(-1); })); + expect(throws([&]{ thread::setProcessAffinity(threadMapOn, -1); })); + }; + + "ThreadName"_test = [] { + using namespace fair::thread_pool; + expect(!thread::getThreadName().empty()) << "Thread name shouldn't be empty"; + expect(nothrow([]{ thread::setThreadName("testCoreName"); })); + expect(thread::getThreadName() == "testCoreName"_b); + + std::atomic run = true; + const auto dummyAction = [&run]() { while (run) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); } }; + std::thread testThread(dummyAction); + expect(!thread::getThreadName(testThread).empty()) << "Thread Name shouldn't be empty"; + expect(nothrow([&]{ thread::setThreadName("testThreadName", testThread); })); + thread::setThreadName("testThreadName", testThread); + expect(thread::getThreadName(testThread) == "testThreadName"_b); + + std::thread uninitialisedTestThread; + expect(throws([&]{ thread::getThreadName(uninitialisedTestThread); })); + expect(throws([&]{ thread::setThreadName("name", uninitialisedTestThread); })); + run = false; + testThread.join(); + }; + + "ProcessName"_test = [] { + using namespace fair::thread_pool; + expect(!thread::getProcessName().empty()) << "Process name shouldn't be empty"; + expect(that % thread::getProcessName() == thread::getProcessName(thread::detail::getPid())); + + expect(nothrow([]{ thread::setProcessName("TestProcessName"); })); + expect(thread::getProcessName() == "TestProcessName"_b); + }; + + "ProcessSchedulingParameter"_test = [] { + using namespace fair::thread_pool::thread; + struct SchedulingParameter param = getProcessSchedulingParameter(); + expect(that % param.policy == OTHER); + expect(that % param.priority == 0); + + expect(nothrow([]{ setProcessSchedulingParameter(OTHER, 0); })); + expect(throws([]{ setProcessSchedulingParameter(OTHER, 0, -1); })); + expect(throws([]{ setProcessSchedulingParameter(OTHER, 4); })); + expect(throws([]{ setProcessSchedulingParameter(ROUND_ROBIN, 5); })); // missing rights -- because most users do not have CAP_SYS_NICE rights by default -- hard to unit-test + param = getProcessSchedulingParameter(); + expect(that % param.policy == OTHER); + expect(that % param.priority == 0); + + expect(throws([]{ getProcessSchedulingParameter(-1); })); + expect(throws([]{ setProcessSchedulingParameter(ROUND_ROBIN, 5, -1); })); + + expect(that % fair::thread_pool::thread::detail::getEnumPolicy(SCHED_FIFO) == fair::thread_pool::thread::FIFO); + expect(that % fair::thread_pool::thread::detail::getEnumPolicy(SCHED_RR) == fair::thread_pool::thread::ROUND_ROBIN); + expect(that % fair::thread_pool::thread::detail::getEnumPolicy(SCHED_OTHER) == fair::thread_pool::thread::OTHER); + }; + + "ThreadSchedulingParameter"_test = [] { + std::atomic run = true; + const auto dummyAction = [&run]() { while (run) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } }; + std::thread testThread(dummyAction); + std::thread bogusThread; + + using namespace fair::thread_pool::thread; + struct SchedulingParameter param = getThreadSchedulingParameter(testThread); + expect(that % param.policy == OTHER); + expect(that % param.priority == 0); + + setThreadSchedulingParameter(OTHER, 0, testThread); + setThreadSchedulingParameter(OTHER, 0); + expect(throws([&]{ setThreadSchedulingParameter(OTHER, 0, bogusThread); })); + expect(throws([&]{ setThreadSchedulingParameter(OTHER, 4, testThread); })); + expect(throws([&]{ setThreadSchedulingParameter(OTHER, 4); })); + expect(throws([&]{ setThreadSchedulingParameter(ROUND_ROBIN, 5, testThread); })); // missing rights -- because most users do not have CAP_SYS_NICE rights by default -- hard to unit-test + expect(throws([&]{ setThreadSchedulingParameter(ROUND_ROBIN, 5); })); // missing rights -- because most users do not have CAP_SYS_NICE rights by default -- hard to unit-test + param = getThreadSchedulingParameter(testThread); + expect(that % param.policy == OTHER); + + expect(throws([&]{ getThreadSchedulingParameter(bogusThread); })); + expect(throws([&]{ setThreadSchedulingParameter(ROUND_ROBIN, 5, bogusThread); })); + + run = false; + testThread.join(); + }; +#endif +}; + +int +main() { /* tests are statically executed */ +} diff --git a/test/qa_thread_pool.cpp b/test/qa_thread_pool.cpp new file mode 100644 index 00000000..9f5258a9 --- /dev/null +++ b/test/qa_thread_pool.cpp @@ -0,0 +1,134 @@ +#include +#include + +#if defined(__clang__) && __clang_major__ >= 16 +// clang 16 does not like ut's default reporter_junit due to some issues with stream buffers and output redirection +template<> +auto boost::ut::cfg = boost::ut::runner>{}; +#endif + +const boost::ut::suite ThreadPoolTests = [] { + using namespace boost::ut; + + "Basic ThreadPool tests"_test = [] { + expect(nothrow([] { fair::thread_pool::BasicThreadPool(); })); + expect(nothrow([] { fair::thread_pool::BasicThreadPool(); })); + + std::atomic enqueueCount{0}; + std::atomic executeCount{0}; + fair::thread_pool::BasicThreadPool pool("TestPool", 1, 2); + expect(nothrow([&] { pool.sleepDuration = std::chrono::milliseconds(1); })); + expect(nothrow([&] { pool.keepAliveDuration = std::chrono::seconds(10); })); + pool.waitUntilInitialised(); + expect(that % pool.isInitialised()); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + expect(pool.poolName() == "TestPool"); + expect(pool.minThreads() == 1_u); + expect(pool.maxThreads() == 2_u); + expect(pool.numThreads() == 1_u); + expect(pool.numTasksRunning() == 0_u); + expect(pool.numTasksQueued() == 0_u); + expect(pool.numTasksRecycled() == 0_u); + pool.execute([&enqueueCount] { + ++enqueueCount; + enqueueCount.notify_all(); + }); + enqueueCount.wait(0); + expect(pool.numThreads() == 1_u); + pool.execute([&executeCount] { + ++executeCount; + executeCount.notify_all(); + }); + executeCount.wait(0); + expect(pool.numThreads() >= 1_u); + expect(enqueueCount.load() == 1_i); + expect(executeCount.load() == 1_i); + + auto ret = pool.execute([] { return 42; }); + expect(ret.get() == 42_i); + + auto taskName = pool.execute<"taskName", 0, -1>([] { return fair::thread_pool::thread::getThreadName(); }); +#ifdef __EMSCRIPTEN__ + expect(taskName.get() == "unknown thread name"_b); +#else + expect(taskName.get() == "taskName"_b); +#endif + + expect(nothrow([&] { pool.setAffinityMask(pool.getAffinityMask()); })); + expect(nothrow( + [&] { pool.setThreadSchedulingPolicy(pool.getSchedulingPolicy(), pool.getSchedulingPriority()); })); + }; + "contention tests"_test = [] { + std::atomic counter{0}; + fair::thread_pool::BasicThreadPool pool("contention", 1, 4); + pool.waitUntilInitialised(); + expect(that % pool.isInitialised()); + expect(pool.numThreads() == 1_u); + pool.execute([&counter] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::atomic_fetch_add(&counter, 1); + counter.notify_all(); + }); + expect(pool.numThreads() == 1_u); + pool.execute([&counter] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::atomic_fetch_add(&counter, 1); + counter.notify_all(); + }); + expect(pool.numThreads() >= 1_u); + counter.wait(0); + counter.wait(1); + expect(counter.load() == 2_i); + }; + + "ThreadPool: Thread count tests"_test = [] { + struct bounds_def { + std::uint32_t min, max; + }; + std::array bounds{ + bounds_def{1, 1}, + bounds_def{1, 4}, + bounds_def{2, 2}, + bounds_def{2, 8}, + bounds_def{4, 8} + }; + + for (const auto [minThreads, maxThreads]: bounds) { + for (const auto taskCount: {2, 8, 32}) { + fmt::print("## Test with min={} and max={} and taskCount={}\n", minThreads, maxThreads, taskCount); + std::atomic counter{0}; + + // Pool with min and max thread count + fair::thread_pool::BasicThreadPool pool("count_test", minThreads, maxThreads); + pool.keepAliveDuration = std::chrono::milliseconds(10); // default is 10 seconds, reducing for testing + pool.waitUntilInitialised(); + + for (int i = 0; i < taskCount; ++i) { + pool.execute([&counter] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::atomic_fetch_add(&counter, 1); + counter.notify_all(); + }); + } + expect(that % pool.numThreads() >= minThreads); + // the maximum number of threads is not a hard limit, if there is a burst of execute calls, it will spwawn more than maxThreads trheads. + //expect(that % pool.numThreads() == std::min(std::uint32_t(taskCount), maxThreads)); + + for (int i = 0; i < taskCount; ++i) { + counter.wait(i); + expect(that % pool.numThreads() >= minThreads); + // expect(that % pool.numThreads() <= maxThreads); // not a hard limit + } + + // We should have gotten back to minimum + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + expect(that % pool.numThreads() == minThreads); + expect(that % counter.load() == taskCount); + } + } + }; +}; + +int +main() { /* tests are statically executed */ +}