Skip to content

Commit

Permalink
Merge pull request #1235 from msimberg/stdexec-sanitizers
Browse files Browse the repository at this point in the history
      Update stdexec integration with `transfer` renames and `ensure_started`/`start_detached` from pika
  • Loading branch information
msimberg committed Sep 12, 2024
2 parents 42b27fa + 2d56c14 commit 3d903bf
Show file tree
Hide file tree
Showing 55 changed files with 560 additions and 415 deletions.
9 changes: 9 additions & 0 deletions cmake/pika_add_config_test.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,12 @@ function(pika_check_for_stdexec_sender_receiver_concepts)
FILE ${ARGN}
)
endfunction()

# ##################################################################################################
function(pika_check_for_stdexec_continues_on)
pika_add_config_test(
PIKA_WITH_STDEXEC_CONTINUES_ON
SOURCE cmake/tests/stdexec_continues_on.cpp
FILE ${ARGN}
)
endfunction()
14 changes: 13 additions & 1 deletion cmake/pika_add_executable.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@

function(pika_add_executable name)
# retrieve arguments
set(options GPU EXCLUDE_FROM_ALL EXCLUDE_FROM_DEFAULT_BUILD INTERNAL_FLAGS NOLIBS UNITY_BUILD)
set(options
GPU
EXCLUDE_FROM_ALL
EXCLUDE_FROM_DEFAULT_BUILD
INTERNAL_FLAGS
NOLIBS
NOPCH
UNITY_BUILD
)
set(one_value_args
INI
FOLDER
Expand Down Expand Up @@ -157,6 +165,10 @@ function(pika_add_executable name)
set(_target_flags ${_target_flags} NOLIBS)
endif()

if(${${name}_NOPCH})
set(_target_flags ${_target_flags} NOPCH)
endif()

if(${name}_INTERNAL_FLAGS)
set(_target_flags ${_target_flags} INTERNAL_FLAGS)
endif()
Expand Down
5 changes: 5 additions & 0 deletions cmake/pika_add_library.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ function(pika_add_library name)
INTERNAL_FLAGS
NOLIBS
NOEXPORT
NOPCH
OBJECT
NONAMEPREFIX
UNITY_BUILD
Expand Down Expand Up @@ -180,6 +181,10 @@ function(pika_add_library name)
set(_target_flags ${_target_flags} EXPORT)
endif()

if(${${name}_NOPCH})
set(_target_flags ${_target_flags} NOPCH)
endif()

if(${name}_INTERNAL_FLAGS)
set(_target_flags ${_target_flags} INTERNAL_FLAGS)
endif()
Expand Down
2 changes: 2 additions & 0 deletions cmake/pika_perform_cxx_feature_tests.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ function(pika_perform_cxx_feature_tests)
pika_check_for_stdexec_sender_receiver_concepts(
DEFINITIONS PIKA_HAVE_STDEXEC_SENDER_RECEIVER_CONCEPTS
)

pika_check_for_stdexec_continues_on(DEFINITIONS PIKA_HAVE_STDEXEC_CONTINUES_ON)
endfunction()
10 changes: 6 additions & 4 deletions cmake/pika_setup_target.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function(pika_setup_target target)
INTERNAL_FLAGS
NOLIBS
NONAMEPREFIX
NOPCH
NOTLLKEYWORD
UNITY_BUILD
)
Expand Down Expand Up @@ -115,10 +116,11 @@ function(pika_setup_target target)

if(NOT target_NOLIBS)
target_link_libraries(${target} ${__tll_public} pika::pika)
if(PIKA_WITH_PRECOMPILED_HEADERS_INTERNAL)
if("${_type}" STREQUAL "EXECUTABLE")
target_precompile_headers(${target} REUSE_FROM pika_exe_precompiled_headers)
endif()
if(PIKA_WITH_PRECOMPILED_HEADERS_INTERNAL
AND "${_type}" STREQUAL "EXECUTABLE"
AND NOT target_NOPCH
)
target_precompile_headers(${target} REUSE_FROM pika_exe_precompiled_headers)
endif()
endif()

Expand Down
15 changes: 15 additions & 0 deletions cmake/tests/stdexec_continues_on.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2024 ETH Zurich
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <stdexec/execution.hpp>

int main()
{
// Earlier versions of stdexec call continues_on continue_on. If stdexec has continues_on we do
// nothing special in pika. If stdexec doesn't have continues_on, we assume it has continue_on
// and create an alias from continue_on to continues_on. This test serves to check if continues_on is defined.
using stdexec::continues_on;
}
2 changes: 1 addition & 1 deletion examples/1d_stencil/1d_stencil_2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ struct stepper
{
next[i] =
ex::when_all(current[idx(i, -1, nx)], current[i], current[idx(i, +1, nx)]) |
ex::transfer(sched) | ex::then(Op) | ex::split();
ex::continues_on(sched) | ex::then(Op) | ex::split();
}
}

Expand Down
6 changes: 3 additions & 3 deletions examples/documentation/hello_world_documentation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ int main(int argc, char* argv[])
ex::thread_pool_scheduler sched{};

// We can schedule work using sched.
auto snd1 = ex::transfer_just(sched, 42) | ex::then([](int x) {
auto snd1 = ex::just(42) | ex::continues_on(sched) | ex::then([](int x) {
fmt::print(std::cout, "Hello from a pika user-level thread (with id {})!\nx = {}\n",
pika::this_thread::get_id(), x);
});
Expand All @@ -39,9 +39,9 @@ int main(int argc, char* argv[])

// We can build arbitrary graphs of work using the split and when_all adaptors.
auto snd2 = ex::just(3.14) | ex::split();
auto snd3 = ex::transfer(snd2, sched) |
auto snd3 = ex::continues_on(snd2, sched) |
ex::then([](double pi) { fmt::print(std::cout, "Is this pi: {}?\n", pi); });
auto snd4 = ex::transfer_when_all(sched, std::move(snd2), ex::just(500.3)) |
auto snd4 = ex::when_all(std::move(snd2), ex::just(500.3)) | ex::continues_on(sched) |
ex::then([](double pi, double r) { return pi * r * r; });
auto result = tt::sync_wait(ex::when_all(std::move(snd3), std::move(snd4)));
fmt::print(std::cout, "The result is {}\n", result);
Expand Down
15 changes: 8 additions & 7 deletions examples/documentation/split_tuple_documentation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ int main(int argc, char* argv[])
ex::then([](int x) { return std::tuple(x, x * x); }) | ex::split_tuple();

// snd and snd_squared will be ready at the same time, but can be used independently
auto snd_print =
std::move(snd) | ex::transfer(sched) | ex::then([](int x) { fmt::print("x is {}\n", x); });
auto snd_process = std::move(snd_squared) | ex::transfer(sched) | ex::then([](int x_squared) {
fmt::print("Performing expensive operations on x * x\n");
std::this_thread::sleep_for(std::chrono::milliseconds(300));
return x_squared / 2;
});
auto snd_print = std::move(snd) | ex::continues_on(sched) |
ex::then([](int x) { fmt::print("x is {}\n", x); });
auto snd_process =
std::move(snd_squared) | ex::continues_on(sched) | ex::then([](int x_squared) {
fmt::print("Performing expensive operations on x * x\n");
std::this_thread::sleep_for(std::chrono::milliseconds(300));
return x_squared / 2;
});

auto x_squared_processed =
tt::sync_wait(ex::when_all(std::move(snd_print), std::move(snd_process)));
Expand Down
4 changes: 2 additions & 2 deletions examples/documentation/when_all_vector_documentation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int main(int argc, char* argv[])
snds.reserve(n);
for (std::size_t i = 0; i < n; ++i)
{
snds.push_back(ex::transfer_just(sched, i) | ex::then(calculate));
snds.push_back(ex::just(i) | ex::continues_on(sched) | ex::then(calculate));
}
auto snds_print =
ex::when_all_vector(std::move(snds)) | ex::then([](std::vector<std::size_t> results) {
Expand All @@ -47,7 +47,7 @@ int main(int argc, char* argv[])
snds_nothing.reserve(n);
for (std::size_t i = 0; i < n; ++i)
{
snds_nothing.push_back(ex::transfer_just(sched, i) |
snds_nothing.push_back(ex::just(i) | ex::continues_on(sched) |
ex::then([](auto i) { fmt::print("{}: {}\n", i, calculate(i)); }));
}
auto snds_nothing_done = ex::when_all_vector(std::move(snds_nothing)) |
Expand Down
2 changes: 1 addition & 1 deletion examples/jacobi_smp/jacobi_nonuniform_pika.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ namespace jacobi_smp {
for (std::size_t dep : deps) { trigger.push_back((*deps_src)[dep]); }

(*deps_dst)[block] = ex::when_all_vector(std::move(trigger)) |
ex::transfer(ex::thread_pool_scheduler{}) |
ex::continues_on(ex::thread_pool_scheduler{}) |
ex::then(pika::util::detail::bind_front(jacobi_kernel_wrap, block_ranges[block],
std::cref(A), std::ref(*dst), std::cref(*src), std::cref(b))) |
ex::split();
Expand Down
2 changes: 1 addition & 1 deletion examples/jacobi_smp/jacobi_pika.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace jacobi_smp {
if (j + 1 < n_block) trigger.push_back((*deps_old)[j + 1]);

(*deps_new)[j] = ex::when_all_vector(std::move(trigger)) |
ex::transfer(ex::thread_pool_scheduler{}) |
ex::continues_on(ex::thread_pool_scheduler{}) |
ex::then(pika::util::detail::bind_front(jacobi_kernel_wrap, range(y, y_end), n,
std::ref(*grid_new), std::cref(*grid_old))) |
ex::split();
Expand Down
26 changes: 13 additions & 13 deletions examples/quickstart/fibonacci.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ ex::unique_any_sender<std::uint64_t> fibonacci_sender_one(std::uint64_t n)
// asynchronously launch the calculation of one of the sub-terms
// attach a continuation to this sender which is called asynchronously on
// its completion and which calculates the other sub-term
return ex::transfer_just(ex::thread_pool_scheduler{}, n - 1) |
return ex::just(n - 1) | ex::continues_on(ex::thread_pool_scheduler{}) |
ex::let_value([n](std::uint64_t res) {
return ex::when_all(fibonacci_sender_one(n - 2), ex::just(res)) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(add);
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(add);
});
}

Expand All @@ -58,7 +58,7 @@ std::uint64_t fibonacci(std::uint64_t n)

// asynchronously launch the creation of one of the sub-terms of the
// execution graph
auto s = ex::transfer_just(ex::thread_pool_scheduler{}, n - 1) | ex::then(fibonacci);
auto s = ex::just(n - 1) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(fibonacci);
std::uint64_t r = fibonacci(n - 2);

return tt::sync_wait(std::move(s)) + r;
Expand All @@ -73,12 +73,12 @@ ex::unique_any_sender<std::uint64_t> fibonacci_sender(std::uint64_t n)

// asynchronously launch the creation of one of the sub-terms of the
// execution graph
auto s =
ex::transfer_just(ex::thread_pool_scheduler{}, n - 1) | ex::let_value(fibonacci_sender);
auto s = ex::just(n - 1) | ex::continues_on(ex::thread_pool_scheduler{}) |
ex::let_value(fibonacci_sender);
auto r = fibonacci_sender(n - 2);

return ex::when_all(std::move(s), std::move(r)) | ex::transfer(ex::thread_pool_scheduler{}) |
ex::then(add);
return ex::when_all(std::move(s), std::move(r)) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(add);
}

/////////////////////////////////////////////////////////////////////////////
Expand All @@ -89,14 +89,14 @@ ex::unique_any_sender<std::uint64_t> fibonacci_sender_all(std::uint64_t n)
if (n < threshold) return ex::just(fibonacci_serial(n));

// asynchronously launch the calculation of both of the sub-terms
auto s =
ex::transfer_just(ex::thread_pool_scheduler{}, n - 1) | ex::let_value(fibonacci_sender_all);
auto r =
ex::transfer_just(ex::thread_pool_scheduler{}, n - 2) | ex::let_value(fibonacci_sender_all);
auto s = ex::just(n - 1) | ex::continues_on(ex::thread_pool_scheduler{}) |
ex::let_value(fibonacci_sender_all);
auto r = ex::just(n - 2) | ex::continues_on(ex::thread_pool_scheduler{}) |
ex::let_value(fibonacci_sender_all);

// create a sender representing the successful calculation of both sub-terms
return ex::when_all(std::move(s), std::move(r)) | ex::transfer(ex::thread_pool_scheduler{}) |
ex::then(add);
return ex::when_all(std::move(s), std::move(r)) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(add);
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/latch_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ int pika_main(pika::program_options::variables_map& vm)
std::vector<ex::unique_any_sender<>> results;
for (std::ptrdiff_t i = 0; i != num_threads; ++i)
{
results.emplace_back(ex::transfer_just(ex::thread_pool_scheduler{}, std::ref(l)) |
results.emplace_back(ex::just(std::ref(l)) | ex::continues_on(ex::thread_pool_scheduler{}) |
ex::then(wait_for_latch) | ex::ensure_started());
}

Expand Down
6 changes: 4 additions & 2 deletions examples/quickstart/pipeline1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ struct pipeline
std::vector<ex::unique_any_sender<>> tasks;
for (auto s : input)
{
auto sender = ex::transfer_just(ex::thread_pool_scheduler{}, "Error.*", std::move(s)) |
auto sender = ex::just("Error.*", std::move(s)) |
ex::continues_on(ex::thread_pool_scheduler{}) |
ex::let_value([](std::string re, std::string item) -> ex::unique_any_sender<> {
std::regex regex(std::move(re));
if (std::regex_match(item, regex))
{
return ex::transfer_just(ex::thread_pool_scheduler{}, std::move(item)) |
return ex::just(std::move(item)) |
ex::continues_on(ex::thread_pool_scheduler{}) |
ex::then([](std::string s) {
return pika::detail::trim_copy(std::move(s));
}) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,11 @@ void matrixMultiply(sMatrixSize& matrix_size, std::size_t device, std::size_t it
whip::malloc(&d_C, size_C * sizeof(T));

// copy A and B to device
auto copies_done = ex::when_all(ex::transfer_just(cuda_sched, d_A, h_A.data(),
size_A * sizeof(T), whip::memcpy_host_to_device) |
cu::then_with_stream(whip::memcpy_async),
ex::transfer_just(
cuda_sched, d_B, h_B.data(), size_B * sizeof(T), whip::memcpy_host_to_device) |
cu::then_with_stream(whip::memcpy_async));
auto copies_done =
ex::when_all(ex::just(d_A, h_A.data(), size_A * sizeof(T), whip::memcpy_host_to_device) |
ex::continues_on(cuda_sched) | cu::then_with_stream(whip::memcpy_async),
ex::just(d_B, h_B.data(), size_B * sizeof(T), whip::memcpy_host_to_device) |
ex::continues_on(cuda_sched) | cu::then_with_stream(whip::memcpy_async));

// print something when copies complete
tt::sync_wait(std::move(copies_done) | ex::then([] {
Expand Down
42 changes: 21 additions & 21 deletions libs/pika/async_cuda/tests/performance/synchronize.cu
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,24 @@ int pika_main(pika::program_options::variables_map& vm)
// then_with_stream calls to force synchronization between the
// kernel launches.
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::transfer(sched) | cu::then_with_stream(f));
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f) |
ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) |
ex::continues_on(sched) | cu::then_with_stream(f));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
Expand All @@ -176,7 +176,7 @@ int pika_main(pika::program_options::variables_map& vm)
for (std::size_t i = 0; i != iterations; ++i)
{
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
ex::continues_on(ex::thread_pool_scheduler{}));
}
double elapsed = timer.elapsed();
std::cout << "then_with_stream with transfer: " << elapsed
Expand All @@ -198,13 +198,13 @@ int pika_main(pika::program_options::variables_map& vm)
cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
ex::continues_on(ex::thread_pool_scheduler{}));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
ex::continues_on(ex::thread_pool_scheduler{}));
}
double elapsed = timer.elapsed();
std::cout << "then_with_stream with transfer batched: " << elapsed
Expand Down
6 changes: 3 additions & 3 deletions libs/pika/async_cuda/tests/unit/cublas_matmul.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ void matrixMultiply(pika::cuda::experimental::cuda_scheduler& cuda_sched, sMatri
pika::chrono::detail::high_resolution_timer t1;
//
std::cout << "calling CUBLAS...\n";
auto gemm = ex::transfer_just(cuda_sched) |
auto gemm = ex::schedule(cuda_sched) |
cu::then_with_cublas(
[&](cublasHandle_t handle) {
cu::check_cublas_error(cublasSgemm(handle, CUBLAS_OP_N, CUBLAS_OP_N,
Expand Down Expand Up @@ -199,7 +199,7 @@ void matrixMultiply(pika::cuda::experimental::cuda_scheduler& cuda_sched, sMatri
ex::unique_any_sender<> gemms_finished = ex::just();
for (std::size_t j = 0; j < iterations; j++)
{
gemms_finished = std::move(gemms_finished) | ex::transfer(cuda_sched) |
gemms_finished = std::move(gemms_finished) | ex::continues_on(cuda_sched) |
cu::then_with_cublas(
[&](cublasHandle_t handle) {
cu::check_cublas_error(cublasSgemm(handle, CUBLAS_OP_N, CUBLAS_OP_N,
Expand All @@ -226,7 +226,7 @@ void matrixMultiply(pika::cuda::experimental::cuda_scheduler& cuda_sched, sMatri
});

// when the matrix operations complete, copy the result to the host
auto copy_finished = std::move(gemms_finished_split) | ex::transfer(cuda_sched) |
auto copy_finished = std::move(gemms_finished_split) | ex::continues_on(cuda_sched) |
cu::then_with_stream(pika::util::detail::bind_front(whip::memcpy_async, h_CUBLAS.data(),
d_C, size_C * sizeof(T), whip::memcpy_device_to_host));

Expand Down
Loading

0 comments on commit 3d903bf

Please sign in to comment.