Skip to content

Commit

Permalink
ThreadPool: backport changes from graph-prototype
Browse files Browse the repository at this point in the history
Backport of changes from graph prototype. See original issue:
fair-acc/gnuradio4#67

Some noteworthy changes:
- emscripten compatibility: std::jthread -> std::thread, ranges usages
- fix race condition on thread shutdown
- fix #243 (starting and stopping threads in ThreadPool)
  • Loading branch information
wirew0rm committed Jun 1, 2023
1 parent ab3db10 commit b51498b
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 70 deletions.
2 changes: 1 addition & 1 deletion concepts/core/BasicThreadPool_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ int main() {

opencmw::BasicThreadPool<opencmw::CPU_BOUND> poolWork("CustomCpuPool", 1, 1); // pool for CPU-bound tasks with exactly 1 thread
opencmw::BasicThreadPool<opencmw::IO_BOUND> poolIO("CustomIOPool", 1, 1000); // pool for IO-bound (potentially blocking) tasks with at least 1 and a max of 1000 threads
poolIO.keepAliveDuration() = seconds(10); // keeps idling threads alive for 10 seconds
poolIO.keepAliveDuration = seconds(10); // keeps idling threads alive for 10 seconds
poolIO.waitUntilInitialised(); // wait until the pool is initialised (optional)
assert(poolIO.isInitialised()); // check if the pool is initialised

Expand Down
56 changes: 30 additions & 26 deletions src/core/include/ThreadAffinity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

#if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__))) // UNIX-style OS
#include <unistd.h>
#ifdef _POSIX_VERSION // POSIX compliant
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
#include <pthread.h>
#include <sched.h>
#endif
Expand Down Expand Up @@ -57,10 +57,14 @@ class thread_exception : public std::error_category {
};

template<class type>
#ifdef __EMSCRIPTEN__
concept thread_type = std::is_same_v<type, std::thread>;
#else
concept thread_type = std::is_same_v<type, std::thread> || std::is_same_v<type, std::jthread>;
#endif

namespace detail {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
template<typename Tp, typename... Us>
constexpr decltype(auto) firstElement(Tp && t, Us && ...) noexcept {
return std::forward<Tp>(t);
Expand All @@ -80,21 +84,21 @@ namespace detail {
}
char threadName[THREAD_MAX_NAME_LENGTH];
if (int rc = pthread_getname_np(handle, threadName, THREAD_MAX_NAME_LENGTH); rc != 0) {
throw std::system_error(rc, thread_exception(), fmt::format("getThreadName(thread_type)"));
throw std::system_error(rc, thread_exception(), "getThreadName(thread_type)");
}
return std::string{ threadName, std::min(strlen(threadName), THREAD_MAX_NAME_LENGTH) };
}

inline int getPid() { return getpid(); }
#else
int detail::getPid() {
int getPid() {
return 0;
}
#endif
} // namespace detail

inline std::string getProcessName(const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (std::ifstream in(fmt::format("/proc/{}/comm", pid), std::ios::in); in.is_open()) {
std::string fileContent;
std::getline(in, fileContent, '\n');
Expand All @@ -105,10 +109,10 @@ inline std::string getProcessName(const int pid = detail::getPid()) {
} // namespace detail

inline std::string getThreadName(thread_type auto &...thread) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getThreadName(thread_type)"));
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), "getThreadName(thread_type)");
}
return detail::getThreadName(handle);
#else
Expand All @@ -117,7 +121,7 @@ inline std::string getThreadName(thread_type auto &...thread) {
}

inline void setProcessName(const std::string_view &processName, int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
std::ofstream out(fmt::format("/proc/{}/comm", pid), std::ios::out);
if (!out.is_open()) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessName({},{})", processName, pid));
Expand All @@ -128,7 +132,7 @@ inline void setProcessName(const std::string_view &processName, int pid = detail
}

inline void setThreadName(const std::string_view &threadName, thread_type auto &...thread) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadName({}, thread_type)", threadName, detail::getThreadName(handle)));
Expand All @@ -140,7 +144,7 @@ inline void setThreadName(const std::string_view &threadName, thread_type auto &
}

namespace detail {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
inline std::vector<bool> getAffinityMask(const cpu_set_t &cpuSet) {
std::vector<bool> bitMask(std::min(sizeof(cpu_set_t), static_cast<size_t>(std::thread::hardware_concurrency())));
for (size_t i = 0; i < bitMask.size(); i++) {
Expand All @@ -167,7 +171,7 @@ inline constexpr cpu_set_t getAffinityMask(const T &threadMap) {
#endif
} // namespace detail

#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
inline std::vector<bool> getThreadAffinity(thread_type auto &...thread) {
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
Expand All @@ -187,15 +191,15 @@ std::vector<bool> getThreadAffinity(thread_type auto &...) {

template<class T>
requires requires(T value) { value[0]; }
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
inline constexpr void setThreadAffinity(const T &threadMap, thread_type auto &...thread) {
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadAffinity(std::vector<bool, {}> = {{{}}}, thread_type)", threadMap.size(), fmt::join(threadMap, ", ")));
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadAffinity(std::vector<bool, {}> = {{{}}}, thread_type)", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", ")));
}
cpu_set_t cpuSet = detail::getAffinityMask(threadMap);
if (int rc = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuSet); rc != 0) {
throw std::system_error(rc, thread_exception(), fmt::format("setThreadAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), detail::getThreadName(handle)));
throw std::system_error(rc, thread_exception(), fmt::format("setThreadAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), detail::getThreadName(handle)));
}
}
#else
Expand All @@ -205,7 +209,7 @@ constexpr bool setThreadAffinity(const T &threadMap, thread_type auto &...) {
#endif

inline std::vector<bool> getProcessAffinity(const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (pid <= 0) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getProcessAffinity({}) -- invalid pid", pid));
}
Expand All @@ -222,13 +226,13 @@ inline std::vector<bool> getProcessAffinity(const int pid = detail::getPid()) {
template<class T>
requires requires(T value) { std::get<0>(value); }
inline constexpr bool setProcessAffinity(const T &threadMap, const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (pid <= 0) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), pid));
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), pid));
}
cpu_set_t cpuSet = detail::getAffinityMask(threadMap);
if (int rc = sched_setaffinity(pid, sizeof(cpu_set_t), &cpuSet); rc != 0) {
throw std::system_error(rc, thread_exception(), fmt::format("setProcessAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), pid));
throw std::system_error(rc, thread_exception(), fmt::format("setProcessAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), pid));
}

return true;
Expand All @@ -251,7 +255,7 @@ struct SchedulingParameter {
namespace detail {
inline Policy getEnumPolicy(const int policy) {
switch (policy) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
case SCHED_FIFO: return Policy::FIFO;
case SCHED_RR: return Policy::ROUND_ROBIN;
case SCHED_OTHER: return Policy::OTHER;
Expand All @@ -263,7 +267,7 @@ inline Policy getEnumPolicy(const int policy) {
} // namespace detail

inline struct SchedulingParameter getProcessSchedulingParameter(const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (pid <= 0) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getProcessSchedulingParameter({}) -- invalid pid", pid));
}
Expand All @@ -274,12 +278,12 @@ inline struct SchedulingParameter getProcessSchedulingParameter(const int pid =
}
return SchedulingParameter{ .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority };
#else
return struct SchedulingParameter {};
return {};
#endif
}

inline void setProcessSchedulingParameter(Policy scheduler, int priority, const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (pid <= 0) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessSchedulingParameter({}, {}, {}) -- invalid pid", scheduler, priority, pid));
}
Expand All @@ -298,7 +302,7 @@ inline void setProcessSchedulingParameter(Policy scheduler, int priority, const
}

inline struct SchedulingParameter getThreadSchedulingParameter(thread_type auto &...thread) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getThreadSchedulingParameter(thread_type) -- invalid thread"));
Expand All @@ -308,14 +312,14 @@ inline struct SchedulingParameter getThreadSchedulingParameter(thread_type auto
if (int rc = pthread_getschedparam(handle, &policy, &param); rc != 0) {
throw std::system_error(rc, thread_exception(), fmt::format("getThreadSchedulingParameter({}) - sched_getparam error", detail::getThreadName(handle)));
}
return SchedulingParameter{ .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority };
return { .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority };
#else
return struct SchedulingParameter {};
return {};
#endif
}

inline void setThreadSchedulingParameter(Policy scheduler, int priority, thread_type auto &...thread) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadSchedulingParameter({}, {}, thread_type) -- invalid thread", scheduler, priority));
Expand Down
Loading

0 comments on commit b51498b

Please sign in to comment.