Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadpool improvements #298

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build_cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ jobs:
git clone https://github.com/emscripten-core/emsdk.git
cd emsdk
# Download and install the latest SDK tools.
./emsdk install 3.1.30 # latest = 3.1.32
./emsdk install releases-03ecb526947f6a3702a0d083083799fe410d3893-64bit
# Make the "latest" SDK "active" for the current user. (writes .emscripten file)
./emsdk activate 3.1.30
./emsdk activate releases-03ecb526947f6a3702a0d083083799fe410d3893-64bit
# Activate PATH and other environment variables in the current terminal
source ./emsdk_env.sh

Expand All @@ -105,7 +105,7 @@ jobs:
shell: bash
run: |
source ~/emsdk/emsdk_env.sh
emcmake cmake -S . -B ../build -DCMAKE_BUILD_TYPE=${{ matrix.cmake-build-type }} -DENABLE_COVERAGE=${{ matrix.configurations.name == env.REFERENCE_CONFIG && matrix.cmake-build-type == 'Debug'}}
emcmake cmake -S . -B ../build -DCMAKE_BUILD_TYPE=${{ matrix.cmake-build-type }} -DENABLE_TESTING=ON

- name: Build
if: matrix.configurations.compiler != 'emscripten'
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ if (ENABLE_TESTING)
enable_testing()
message("Building Tests.")
if (ENABLE_COVERAGE)
if (UNIX AND NOT APPLE) # Linux
if (UNIX AND NOT APPLE AND NOT EMSCRIPTEN) # Linux
message("Coverage reporting enabled")
include(cmake/CodeCoverage.cmake) # https://github.com/bilke/cmake-modules/blob/master/CodeCoverage.cmake
# (License: BSL-1.0)
Expand Down
2 changes: 1 addition & 1 deletion concepts/core/BasicThreadPool_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ int main() {

opencmw::BasicThreadPool<opencmw::CPU_BOUND> poolWork("CustomCpuPool", 1, 1); // pool for CPU-bound tasks with exactly 1 thread
opencmw::BasicThreadPool<opencmw::IO_BOUND> poolIO("CustomIOPool", 1, 1000); // pool for IO-bound (potentially blocking) tasks with at least 1 and a max of 1000 threads
poolIO.keepAliveDuration() = seconds(10); // keeps idling threads alive for 10 seconds
poolIO.keepAliveDuration = seconds(10); // keeps idling threads alive for 10 seconds
poolIO.waitUntilInitialised(); // wait until the pool is initialised (optional)
assert(poolIO.isInitialised()); // check if the pool is initialised

Expand Down
56 changes: 30 additions & 26 deletions src/core/include/ThreadAffinity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

#if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__))) // UNIX-style OS
#include <unistd.h>
#ifdef _POSIX_VERSION // POSIX compliant
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
#include <pthread.h>
#include <sched.h>
#endif
Expand Down Expand Up @@ -57,10 +57,14 @@ class thread_exception : public std::error_category {
};

template<class type>
#ifdef __EMSCRIPTEN__
concept thread_type = std::is_same_v<type, std::thread>;
#else
concept thread_type = std::is_same_v<type, std::thread> || std::is_same_v<type, std::jthread>;
#endif

namespace detail {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
template<typename Tp, typename... Us>
constexpr decltype(auto) firstElement(Tp && t, Us && ...) noexcept {
return std::forward<Tp>(t);
Expand All @@ -80,21 +84,21 @@ namespace detail {
}
char threadName[THREAD_MAX_NAME_LENGTH];
if (int rc = pthread_getname_np(handle, threadName, THREAD_MAX_NAME_LENGTH); rc != 0) {
throw std::system_error(rc, thread_exception(), fmt::format("getThreadName(thread_type)"));
throw std::system_error(rc, thread_exception(), "getThreadName(thread_type)");
}
return std::string{ threadName, std::min(strlen(threadName), THREAD_MAX_NAME_LENGTH) };
}

inline int getPid() { return getpid(); }
#else
int detail::getPid() {
int getPid() {
return 0;
}
#endif
} // namespace detail

inline std::string getProcessName(const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (std::ifstream in(fmt::format("/proc/{}/comm", pid), std::ios::in); in.is_open()) {
std::string fileContent;
std::getline(in, fileContent, '\n');
Expand All @@ -105,10 +109,10 @@ inline std::string getProcessName(const int pid = detail::getPid()) {
} // namespace detail

inline std::string getThreadName(thread_type auto &...thread) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getThreadName(thread_type)"));
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), "getThreadName(thread_type)");
}
return detail::getThreadName(handle);
#else
Expand All @@ -117,7 +121,7 @@ inline std::string getThreadName(thread_type auto &...thread) {
}

inline void setProcessName(const std::string_view &processName, int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
std::ofstream out(fmt::format("/proc/{}/comm", pid), std::ios::out);
if (!out.is_open()) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessName({},{})", processName, pid));
Expand All @@ -128,7 +132,7 @@ inline void setProcessName(const std::string_view &processName, int pid = detail
}

inline void setThreadName(const std::string_view &threadName, thread_type auto &...thread) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadName({}, thread_type)", threadName, detail::getThreadName(handle)));
Expand All @@ -140,7 +144,7 @@ inline void setThreadName(const std::string_view &threadName, thread_type auto &
}

namespace detail {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
inline std::vector<bool> getAffinityMask(const cpu_set_t &cpuSet) {
std::vector<bool> bitMask(std::min(sizeof(cpu_set_t), static_cast<size_t>(std::thread::hardware_concurrency())));
for (size_t i = 0; i < bitMask.size(); i++) {
Expand All @@ -167,7 +171,7 @@ inline constexpr cpu_set_t getAffinityMask(const T &threadMap) {
#endif
} // namespace detail

#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
inline std::vector<bool> getThreadAffinity(thread_type auto &...thread) {
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
Expand All @@ -187,15 +191,15 @@ std::vector<bool> getThreadAffinity(thread_type auto &...) {

template<class T>
requires requires(T value) { value[0]; }
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
inline constexpr void setThreadAffinity(const T &threadMap, thread_type auto &...thread) {
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadAffinity(std::vector<bool, {}> = {{{}}}, thread_type)", threadMap.size(), fmt::join(threadMap, ", ")));
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadAffinity(std::vector<bool, {}> = {{{}}}, thread_type)", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", ")));
}
cpu_set_t cpuSet = detail::getAffinityMask(threadMap);
if (int rc = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuSet); rc != 0) {
throw std::system_error(rc, thread_exception(), fmt::format("setThreadAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), detail::getThreadName(handle)));
throw std::system_error(rc, thread_exception(), fmt::format("setThreadAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), detail::getThreadName(handle)));
}
}
#else
Expand All @@ -205,7 +209,7 @@ constexpr bool setThreadAffinity(const T &threadMap, thread_type auto &...) {
#endif

inline std::vector<bool> getProcessAffinity(const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (pid <= 0) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getProcessAffinity({}) -- invalid pid", pid));
}
Expand All @@ -222,13 +226,13 @@ inline std::vector<bool> getProcessAffinity(const int pid = detail::getPid()) {
template<class T>
requires requires(T value) { std::get<0>(value); }
inline constexpr bool setProcessAffinity(const T &threadMap, const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (pid <= 0) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), pid));
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), pid));
}
cpu_set_t cpuSet = detail::getAffinityMask(threadMap);
if (int rc = sched_setaffinity(pid, sizeof(cpu_set_t), &cpuSet); rc != 0) {
throw std::system_error(rc, thread_exception(), fmt::format("setProcessAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap, ", "), pid));
throw std::system_error(rc, thread_exception(), fmt::format("setProcessAffinity(std::vector<bool, {}> = {{{}}}, {})", threadMap.size(), fmt::join(threadMap.begin(), threadMap.end(), ", "), pid));
}

return true;
Expand All @@ -251,7 +255,7 @@ struct SchedulingParameter {
namespace detail {
inline Policy getEnumPolicy(const int policy) {
switch (policy) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
case SCHED_FIFO: return Policy::FIFO;
case SCHED_RR: return Policy::ROUND_ROBIN;
case SCHED_OTHER: return Policy::OTHER;
Expand All @@ -263,7 +267,7 @@ inline Policy getEnumPolicy(const int policy) {
} // namespace detail

inline struct SchedulingParameter getProcessSchedulingParameter(const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (pid <= 0) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getProcessSchedulingParameter({}) -- invalid pid", pid));
}
Expand All @@ -274,12 +278,12 @@ inline struct SchedulingParameter getProcessSchedulingParameter(const int pid =
}
return SchedulingParameter{ .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority };
#else
return struct SchedulingParameter {};
return {};
#endif
}

inline void setProcessSchedulingParameter(Policy scheduler, int priority, const int pid = detail::getPid()) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
if (pid <= 0) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setProcessSchedulingParameter({}, {}, {}) -- invalid pid", scheduler, priority, pid));
}
Expand All @@ -298,7 +302,7 @@ inline void setProcessSchedulingParameter(Policy scheduler, int priority, const
}

inline struct SchedulingParameter getThreadSchedulingParameter(thread_type auto &...thread) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("getThreadSchedulingParameter(thread_type) -- invalid thread"));
Expand All @@ -308,14 +312,14 @@ inline struct SchedulingParameter getThreadSchedulingParameter(thread_type auto
if (int rc = pthread_getschedparam(handle, &policy, &param); rc != 0) {
throw std::system_error(rc, thread_exception(), fmt::format("getThreadSchedulingParameter({}) - sched_getparam error", detail::getThreadName(handle)));
}
return SchedulingParameter{ .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority };
return { .policy = detail::getEnumPolicy(policy), .priority = param.sched_priority };
#else
return struct SchedulingParameter {};
return {};
#endif
}

inline void setThreadSchedulingParameter(Policy scheduler, int priority, thread_type auto &...thread) {
#ifdef _POSIX_VERSION
#if defined(_POSIX_VERSION) && not defined(__EMSCRIPTEN__)
const pthread_t handle = detail::getPosixHandler(thread...);
if (handle == 0U) {
throw std::system_error(THREAD_UNINITIALISED, thread_exception(), fmt::format("setThreadSchedulingParameter({}, {}, thread_type) -- invalid thread", scheduler, priority));
Expand Down
Loading