Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a bypass scheduler to improve mpi continuations #1195

Open
wants to merge 40 commits into
base: main
Choose a base branch
from

Conversation

biddisco
Copy link
Contributor

I would like this to go through testing ASAP, so I'm pushing it, but it can be can be considered a draft PR for now
Relies on #1151 and #1180

Here, we add a stdexec style scheduler that executes a task/function - it behaves like the main pika scheduling loop - by takinf the function, wrapping it in a task, then switching context to it immediately, so that if the task later tries to suspend, then there are no problems and the task goes ontoo the queues as usual.

It is an error to call this on a pika thread and is only supported by certain modes of the mpi continuations

Copy link

codacy-production bot commented Jun 27, 2024

Coverage summary from Codacy

See diff coverage on Codacy

Coverage variation Diff coverage
+0.45% (target: -1.00%) 82.81% (target: 90.00%)
Coverage variation details
Coverable lines Covered lines Coverage
Common ancestor commit (1e052bc) 18376 13721 74.67%
Head commit (7867f22) 18360 (-16) 13791 (+70) 75.11% (+0.45%)

Coverage variation is the difference between the coverage for the head and common ancestor commits of the pull request branch: <coverage of head commit> - <coverage of common ancestor commit>

Diff coverage details
Coverable lines Covered lines Diff coverage
Pull request (#1195) 64 53 82.81%

Diff coverage is the percentage of lines that are covered by tests out of the coverable lines that the pull request added or modified: <covered lines added or modified>/<coverable lines added or modified> * 100%

See your quality gate settings    Change summary preferences

Codacy stopped sending the deprecated coverage status on June 5th, 2024. Learn more

Copy link
Contributor

@msimberg msimberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few initial comments. This adds a lot of complexity and I'll need a bit more time to look at this.

@@ -32,6 +32,8 @@
#include <type_traits>
#include <utility>

#define PIKA_MPI_ENABLE_EARLY_POLL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a proper CMake configuration option, or a runtime configuration option?

handler_method h = get_handler_method(mode);
/// these task modes always trigger continuations on a pika task and can be safely inlined
return (h == handler_method::yield_while) ||
(h == handler_method::suspend_resume) | (h == handler_method::new_task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(h == handler_method::suspend_resume) | (h == handler_method::new_task);
(h == handler_method::suspend_resume) || (h == handler_method::new_task);

@@ -77,29 +79,47 @@ namespace pika::mpi::experimental {
execution::thread_priority::boost :
execution::thread_priority::normal;

auto dgb = []() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short for degub? Could you guard this with PIKA_DEBUG, otherwise it'll be an unused variable in release builds (with corresponding warnings)? And perhaps give it a slightly longer name?

static print_threshold<Level, bplevel> bps_deb("SBYPASS");
} // namespace pika::debug::detail

namespace pika { namespace execution { namespace experimental {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
namespace pika { namespace execution { namespace experimental {
namespace pika::execution::detail {

if we're not intending to make this a user-facing scheduler (I think I'd prefer that anyway) in the short term?

// TODO: Can we simply dispatch to the default implementation? This is
// disabled with the P2300 reference implementation because we don't
// want to use implementation details of it.
#if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed in March to be PIKA_HAVE_STDEXEC.

Comment on lines 111 to 110
throw std::runtime_error("Bypass scheduler - already on task thread");
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw std::runtime_error("Bypass scheduler - already on task thread");
return;
PIKA_THROW_EXCEPTION(pika::error::invalid_status, "thread_pool_scheduler_queue_bypass::execute", "Already on a pika task");
return;

(with appropriate line wrapping)

if (!threaddata->set_state_tagged(thread_schedule_state::active, task_state,
active_state, std::memory_order_relaxed))
{
throw std::runtime_error("Thread state cannot fail here");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw std::runtime_error("Thread state cannot fail here");
PIKA_THROW_EXCEPTION(pika::error::no_success, "thread_pool_scheduler_queue_bypass::execute", "Setting thread state failed");

thread_state(task_return.first, task_state.state_ex(), task_state.tag() + 1);

// if the threaddata->state still matches active_state update to new state
// (could be stolen/changed by another thread here?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it? I don't think it could be stolen if it's not in any queue...

case thread_schedule_state::pending:
{
data.scheduler_base->schedule_thread(id, thread_schedule_hint(thread_num),
false /*allow_fallback*/, execution::thread_priority::high);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this be high priority?

// thread sits in thread map until resumed
return;
}
default: throw std::runtime_error("fix this thread state type");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update all of the plain throws to use PIKA_THROW_EXCEPTION with some appropriate error (no_success if nothing else).

break;
case thread_schedule_state::suspended:
{
std::cout << "thread_schedule_state::suspended" << std::endl; //
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove or replace with our logging.

Comment on lines +118 to +117
thread_description desc(
f, (fallback_annotation != nullptr) ? fallback_annotation : "Bypass");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fallback annotation should be non-null. Maybe replace with an assert and

Suggested change
thread_description desc(
f, (fallback_annotation != nullptr) ? fallback_annotation : "Bypass");
thread_description desc(f, fallback_annotation);

?

Comment on lines +15 to +18
namespace pika::threads::detail {
class thread_data; // forward declaration only
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do a forward declaration here instead of including the header?

@@ -56,7 +56,7 @@
namespace pika::debug::detail {
// a debug level of zero disables messages with a level>0
// a debug level of N shows messages with level 1..N
constexpr int debug_level = 0;
constexpr int debug_level = 9;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be disabled?

namespace pika::debug::detail {
// a debug level of zero disables messages with a level>0
// a debug level of N shows messages with level 1..N
constexpr int bplevel = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
constexpr int bplevel = 0;
inline constexpr int bplevel = 0;

The existing mode uses suspend/resume which can be problematic
if threads suspend and block, but in principle we should directly
execute continuations as function calls which this new mode does
Callbacks to sender set_value/error should not be triggered directly
on a polling thread as continuations will be called on the polling
thread. Instead the execution should be tranferred to a pika
thread/task.

Adding a transfer adaptor costs task creation, plus enqueuing/dequeuing
of the task, so the bypass scheduler supports inline execution, but
with the creation of a task wrapper around the callback so that it is
safe to suspend/resume etc and no raw scheduler/polling threads are
running user code.
Bypass scheduler must never be run on top of an existing pika task,
otherwise coroutine self pointer is clobberred (thread local).

Rearrange transfers in transform_mpi to better route inline
continuations through the bypass scheduler and skip it when not needed
We do not need the RAII state restorer object as our thread creation is
always on the executing thread and certain race conditions are not valid

Improve comments and remove unsupported API features
The bypass needs to be added to the cuda polling, but this termporary
fix inserts and extra bypass before issuing mpi requests if the thread
id is invalid.

Also, if the bypass scheduler is invoked on a thread that is already a
task, it skips task creation and becomes a simple pass through
execution.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

2 participants