diff --git a/cmake/pika_add_config_test.cmake b/cmake/pika_add_config_test.cmake index 93a33baf9..533eeee22 100644 --- a/cmake/pika_add_config_test.cmake +++ b/cmake/pika_add_config_test.cmake @@ -481,3 +481,12 @@ function(pika_check_for_stdexec_sender_receiver_concepts) FILE ${ARGN} ) endfunction() + +# ################################################################################################## +function(pika_check_for_stdexec_continues_on) + pika_add_config_test( + PIKA_WITH_STDEXEC_CONTINUES_ON + SOURCE cmake/tests/stdexec_continues_on.cpp + FILE ${ARGN} + ) +endfunction() diff --git a/cmake/pika_add_executable.cmake b/cmake/pika_add_executable.cmake index 7cd0411e1..291a20f2e 100644 --- a/cmake/pika_add_executable.cmake +++ b/cmake/pika_add_executable.cmake @@ -7,7 +7,15 @@ function(pika_add_executable name) # retrieve arguments - set(options GPU EXCLUDE_FROM_ALL EXCLUDE_FROM_DEFAULT_BUILD INTERNAL_FLAGS NOLIBS UNITY_BUILD) + set(options + GPU + EXCLUDE_FROM_ALL + EXCLUDE_FROM_DEFAULT_BUILD + INTERNAL_FLAGS + NOLIBS + NOPCH + UNITY_BUILD + ) set(one_value_args INI FOLDER @@ -157,6 +165,10 @@ function(pika_add_executable name) set(_target_flags ${_target_flags} NOLIBS) endif() + if(${${name}_NOPCH}) + set(_target_flags ${_target_flags} NOPCH) + endif() + if(${name}_INTERNAL_FLAGS) set(_target_flags ${_target_flags} INTERNAL_FLAGS) endif() diff --git a/cmake/pika_add_library.cmake b/cmake/pika_add_library.cmake index 181376c35..fb17ffb27 100644 --- a/cmake/pika_add_library.cmake +++ b/cmake/pika_add_library.cmake @@ -12,6 +12,7 @@ function(pika_add_library name) INTERNAL_FLAGS NOLIBS NOEXPORT + NOPCH OBJECT NONAMEPREFIX UNITY_BUILD @@ -180,6 +181,10 @@ function(pika_add_library name) set(_target_flags ${_target_flags} EXPORT) endif() + if(${${name}_NOPCH}) + set(_target_flags ${_target_flags} NOPCH) + endif() + if(${name}_INTERNAL_FLAGS) set(_target_flags ${_target_flags} INTERNAL_FLAGS) endif() diff --git a/cmake/pika_perform_cxx_feature_tests.cmake b/cmake/pika_perform_cxx_feature_tests.cmake index e7a3754e0..61eed9975 100644 --- a/cmake/pika_perform_cxx_feature_tests.cmake +++ b/cmake/pika_perform_cxx_feature_tests.cmake @@ -55,4 +55,6 @@ function(pika_perform_cxx_feature_tests) pika_check_for_stdexec_sender_receiver_concepts( DEFINITIONS PIKA_HAVE_STDEXEC_SENDER_RECEIVER_CONCEPTS ) + + pika_check_for_stdexec_continues_on(DEFINITIONS PIKA_HAVE_STDEXEC_CONTINUES_ON) endfunction() diff --git a/cmake/pika_setup_target.cmake b/cmake/pika_setup_target.cmake index f07d86ee3..aa9b73e96 100644 --- a/cmake/pika_setup_target.cmake +++ b/cmake/pika_setup_target.cmake @@ -21,6 +21,7 @@ function(pika_setup_target target) INTERNAL_FLAGS NOLIBS NONAMEPREFIX + NOPCH NOTLLKEYWORD UNITY_BUILD ) @@ -115,10 +116,11 @@ function(pika_setup_target target) if(NOT target_NOLIBS) target_link_libraries(${target} ${__tll_public} pika::pika) - if(PIKA_WITH_PRECOMPILED_HEADERS_INTERNAL) - if("${_type}" STREQUAL "EXECUTABLE") - target_precompile_headers(${target} REUSE_FROM pika_exe_precompiled_headers) - endif() + if(PIKA_WITH_PRECOMPILED_HEADERS_INTERNAL + AND "${_type}" STREQUAL "EXECUTABLE" + AND NOT target_NOPCH + ) + target_precompile_headers(${target} REUSE_FROM pika_exe_precompiled_headers) endif() endif() diff --git a/cmake/tests/stdexec_continues_on.cpp b/cmake/tests/stdexec_continues_on.cpp new file mode 100644 index 000000000..d30e82879 --- /dev/null +++ b/cmake/tests/stdexec_continues_on.cpp @@ -0,0 +1,15 @@ +// Copyright (c) 2024 ETH Zurich +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +int main() +{ + // Earlier versions of stdexec call continues_on continue_on. If stdexec has continues_on we do + // nothing special in pika. If stdexec doesn't have continues_on, we assume it has continue_on + // and create an alias from continue_on to continues_on. This test serves to check if continues_on is defined. + using stdexec::continues_on; +} diff --git a/examples/1d_stencil/1d_stencil_2.cpp b/examples/1d_stencil/1d_stencil_2.cpp index 5f0f35152..969becf2b 100644 --- a/examples/1d_stencil/1d_stencil_2.cpp +++ b/examples/1d_stencil/1d_stencil_2.cpp @@ -95,7 +95,7 @@ struct stepper { next[i] = ex::when_all(current[idx(i, -1, nx)], current[i], current[idx(i, +1, nx)]) | - ex::transfer(sched) | ex::then(Op) | ex::split(); + ex::continues_on(sched) | ex::then(Op) | ex::split(); } } diff --git a/examples/documentation/hello_world_documentation.cpp b/examples/documentation/hello_world_documentation.cpp index 0436d33be..338e0b55b 100644 --- a/examples/documentation/hello_world_documentation.cpp +++ b/examples/documentation/hello_world_documentation.cpp @@ -29,7 +29,7 @@ int main(int argc, char* argv[]) ex::thread_pool_scheduler sched{}; // We can schedule work using sched. - auto snd1 = ex::transfer_just(sched, 42) | ex::then([](int x) { + auto snd1 = ex::just(42) | ex::continues_on(sched) | ex::then([](int x) { fmt::print(std::cout, "Hello from a pika user-level thread (with id {})!\nx = {}\n", pika::this_thread::get_id(), x); }); @@ -39,9 +39,9 @@ int main(int argc, char* argv[]) // We can build arbitrary graphs of work using the split and when_all adaptors. auto snd2 = ex::just(3.14) | ex::split(); - auto snd3 = ex::transfer(snd2, sched) | + auto snd3 = ex::continues_on(snd2, sched) | ex::then([](double pi) { fmt::print(std::cout, "Is this pi: {}?\n", pi); }); - auto snd4 = ex::transfer_when_all(sched, std::move(snd2), ex::just(500.3)) | + auto snd4 = ex::when_all(std::move(snd2), ex::just(500.3)) | ex::continues_on(sched) | ex::then([](double pi, double r) { return pi * r * r; }); auto result = tt::sync_wait(ex::when_all(std::move(snd3), std::move(snd4))); fmt::print(std::cout, "The result is {}\n", result); diff --git a/examples/documentation/split_tuple_documentation.cpp b/examples/documentation/split_tuple_documentation.cpp index c7f9055ec..c663d062f 100644 --- a/examples/documentation/split_tuple_documentation.cpp +++ b/examples/documentation/split_tuple_documentation.cpp @@ -28,13 +28,14 @@ int main(int argc, char* argv[]) ex::then([](int x) { return std::tuple(x, x * x); }) | ex::split_tuple(); // snd and snd_squared will be ready at the same time, but can be used independently - auto snd_print = - std::move(snd) | ex::transfer(sched) | ex::then([](int x) { fmt::print("x is {}\n", x); }); - auto snd_process = std::move(snd_squared) | ex::transfer(sched) | ex::then([](int x_squared) { - fmt::print("Performing expensive operations on x * x\n"); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - return x_squared / 2; - }); + auto snd_print = std::move(snd) | ex::continues_on(sched) | + ex::then([](int x) { fmt::print("x is {}\n", x); }); + auto snd_process = + std::move(snd_squared) | ex::continues_on(sched) | ex::then([](int x_squared) { + fmt::print("Performing expensive operations on x * x\n"); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + return x_squared / 2; + }); auto x_squared_processed = tt::sync_wait(ex::when_all(std::move(snd_print), std::move(snd_process))); diff --git a/examples/documentation/when_all_vector_documentation.cpp b/examples/documentation/when_all_vector_documentation.cpp index 790188768..54b3f0756 100644 --- a/examples/documentation/when_all_vector_documentation.cpp +++ b/examples/documentation/when_all_vector_documentation.cpp @@ -33,7 +33,7 @@ int main(int argc, char* argv[]) snds.reserve(n); for (std::size_t i = 0; i < n; ++i) { - snds.push_back(ex::transfer_just(sched, i) | ex::then(calculate)); + snds.push_back(ex::just(i) | ex::continues_on(sched) | ex::then(calculate)); } auto snds_print = ex::when_all_vector(std::move(snds)) | ex::then([](std::vector results) { @@ -47,7 +47,7 @@ int main(int argc, char* argv[]) snds_nothing.reserve(n); for (std::size_t i = 0; i < n; ++i) { - snds_nothing.push_back(ex::transfer_just(sched, i) | + snds_nothing.push_back(ex::just(i) | ex::continues_on(sched) | ex::then([](auto i) { fmt::print("{}: {}\n", i, calculate(i)); })); } auto snds_nothing_done = ex::when_all_vector(std::move(snds_nothing)) | diff --git a/examples/jacobi_smp/jacobi_nonuniform_pika.cpp b/examples/jacobi_smp/jacobi_nonuniform_pika.cpp index 2c563aa18..23b06ee48 100644 --- a/examples/jacobi_smp/jacobi_nonuniform_pika.cpp +++ b/examples/jacobi_smp/jacobi_nonuniform_pika.cpp @@ -91,7 +91,7 @@ namespace jacobi_smp { for (std::size_t dep : deps) { trigger.push_back((*deps_src)[dep]); } (*deps_dst)[block] = ex::when_all_vector(std::move(trigger)) | - ex::transfer(ex::thread_pool_scheduler{}) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(pika::util::detail::bind_front(jacobi_kernel_wrap, block_ranges[block], std::cref(A), std::ref(*dst), std::cref(*src), std::cref(b))) | ex::split(); diff --git a/examples/jacobi_smp/jacobi_pika.cpp b/examples/jacobi_smp/jacobi_pika.cpp index b7c01c3ba..a1c9a0d43 100644 --- a/examples/jacobi_smp/jacobi_pika.cpp +++ b/examples/jacobi_smp/jacobi_pika.cpp @@ -64,7 +64,7 @@ namespace jacobi_smp { if (j + 1 < n_block) trigger.push_back((*deps_old)[j + 1]); (*deps_new)[j] = ex::when_all_vector(std::move(trigger)) | - ex::transfer(ex::thread_pool_scheduler{}) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(pika::util::detail::bind_front(jacobi_kernel_wrap, range(y, y_end), n, std::ref(*grid_new), std::cref(*grid_old))) | ex::split(); diff --git a/examples/quickstart/fibonacci.cpp b/examples/quickstart/fibonacci.cpp index df950be52..e3d619aa3 100644 --- a/examples/quickstart/fibonacci.cpp +++ b/examples/quickstart/fibonacci.cpp @@ -42,10 +42,10 @@ ex::unique_any_sender fibonacci_sender_one(std::uint64_t n) // asynchronously launch the calculation of one of the sub-terms // attach a continuation to this sender which is called asynchronously on // its completion and which calculates the other sub-term - return ex::transfer_just(ex::thread_pool_scheduler{}, n - 1) | + return ex::just(n - 1) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::let_value([n](std::uint64_t res) { return ex::when_all(fibonacci_sender_one(n - 2), ex::just(res)) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(add); + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(add); }); } @@ -58,7 +58,7 @@ std::uint64_t fibonacci(std::uint64_t n) // asynchronously launch the creation of one of the sub-terms of the // execution graph - auto s = ex::transfer_just(ex::thread_pool_scheduler{}, n - 1) | ex::then(fibonacci); + auto s = ex::just(n - 1) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(fibonacci); std::uint64_t r = fibonacci(n - 2); return tt::sync_wait(std::move(s)) + r; @@ -73,12 +73,12 @@ ex::unique_any_sender fibonacci_sender(std::uint64_t n) // asynchronously launch the creation of one of the sub-terms of the // execution graph - auto s = - ex::transfer_just(ex::thread_pool_scheduler{}, n - 1) | ex::let_value(fibonacci_sender); + auto s = ex::just(n - 1) | ex::continues_on(ex::thread_pool_scheduler{}) | + ex::let_value(fibonacci_sender); auto r = fibonacci_sender(n - 2); - return ex::when_all(std::move(s), std::move(r)) | ex::transfer(ex::thread_pool_scheduler{}) | - ex::then(add); + return ex::when_all(std::move(s), std::move(r)) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(add); } ///////////////////////////////////////////////////////////////////////////// @@ -89,14 +89,14 @@ ex::unique_any_sender fibonacci_sender_all(std::uint64_t n) if (n < threshold) return ex::just(fibonacci_serial(n)); // asynchronously launch the calculation of both of the sub-terms - auto s = - ex::transfer_just(ex::thread_pool_scheduler{}, n - 1) | ex::let_value(fibonacci_sender_all); - auto r = - ex::transfer_just(ex::thread_pool_scheduler{}, n - 2) | ex::let_value(fibonacci_sender_all); + auto s = ex::just(n - 1) | ex::continues_on(ex::thread_pool_scheduler{}) | + ex::let_value(fibonacci_sender_all); + auto r = ex::just(n - 2) | ex::continues_on(ex::thread_pool_scheduler{}) | + ex::let_value(fibonacci_sender_all); // create a sender representing the successful calculation of both sub-terms - return ex::when_all(std::move(s), std::move(r)) | ex::transfer(ex::thread_pool_scheduler{}) | - ex::then(add); + return ex::when_all(std::move(s), std::move(r)) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(add); } /////////////////////////////////////////////////////////////////////////////// diff --git a/examples/quickstart/latch_example.cpp b/examples/quickstart/latch_example.cpp index ad039620e..16b5c3f0a 100644 --- a/examples/quickstart/latch_example.cpp +++ b/examples/quickstart/latch_example.cpp @@ -35,7 +35,7 @@ int pika_main(pika::program_options::variables_map& vm) std::vector> results; for (std::ptrdiff_t i = 0; i != num_threads; ++i) { - results.emplace_back(ex::transfer_just(ex::thread_pool_scheduler{}, std::ref(l)) | + results.emplace_back(ex::just(std::ref(l)) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(wait_for_latch) | ex::ensure_started()); } diff --git a/examples/quickstart/pipeline1.cpp b/examples/quickstart/pipeline1.cpp index f0579a1d1..5f5f6b33a 100644 --- a/examples/quickstart/pipeline1.cpp +++ b/examples/quickstart/pipeline1.cpp @@ -29,12 +29,14 @@ struct pipeline std::vector> tasks; for (auto s : input) { - auto sender = ex::transfer_just(ex::thread_pool_scheduler{}, "Error.*", std::move(s)) | + auto sender = ex::just("Error.*", std::move(s)) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::let_value([](std::string re, std::string item) -> ex::unique_any_sender<> { std::regex regex(std::move(re)); if (std::regex_match(item, regex)) { - return ex::transfer_just(ex::thread_pool_scheduler{}, std::move(item)) | + return ex::just(std::move(item)) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then([](std::string s) { return pika::detail::trim_copy(std::move(s)); }) | diff --git a/libs/pika/async_cuda/tests/performance/cuda_scheduler_throughput.cpp b/libs/pika/async_cuda/tests/performance/cuda_scheduler_throughput.cpp index 9fe70b6f9..62c749835 100644 --- a/libs/pika/async_cuda/tests/performance/cuda_scheduler_throughput.cpp +++ b/libs/pika/async_cuda/tests/performance/cuda_scheduler_throughput.cpp @@ -90,12 +90,11 @@ void matrixMultiply(sMatrixSize& matrix_size, std::size_t device, std::size_t it whip::malloc(&d_C, size_C * sizeof(T)); // copy A and B to device - auto copies_done = ex::when_all(ex::transfer_just(cuda_sched, d_A, h_A.data(), - size_A * sizeof(T), whip::memcpy_host_to_device) | - cu::then_with_stream(whip::memcpy_async), - ex::transfer_just( - cuda_sched, d_B, h_B.data(), size_B * sizeof(T), whip::memcpy_host_to_device) | - cu::then_with_stream(whip::memcpy_async)); + auto copies_done = + ex::when_all(ex::just(d_A, h_A.data(), size_A * sizeof(T), whip::memcpy_host_to_device) | + ex::continues_on(cuda_sched) | cu::then_with_stream(whip::memcpy_async), + ex::just(d_B, h_B.data(), size_B * sizeof(T), whip::memcpy_host_to_device) | + ex::continues_on(cuda_sched) | cu::then_with_stream(whip::memcpy_async)); // print something when copies complete tt::sync_wait(std::move(copies_done) | ex::then([] { diff --git a/libs/pika/async_cuda/tests/performance/synchronize.cu b/libs/pika/async_cuda/tests/performance/synchronize.cu index 245db3a7f..f9890df5d 100644 --- a/libs/pika/async_cuda/tests/performance/synchronize.cu +++ b/libs/pika/async_cuda/tests/performance/synchronize.cu @@ -137,24 +137,24 @@ int pika_main(pika::program_options::variables_map& vm) // then_with_stream calls to force synchronization between the // kernel launches. tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(f_empty) | - ex::transfer(sched) | cu::then_with_stream(f)); + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(f_empty) | + ex::continues_on(sched) | cu::then_with_stream(f)); } // Do the remainder one-by-one for (std::size_t i = 0; i < non_batch_iterations; ++i) @@ -176,7 +176,7 @@ int pika_main(pika::program_options::variables_map& vm) for (std::size_t i = 0; i != iterations; ++i) { tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{})); + ex::continues_on(ex::thread_pool_scheduler{})); } double elapsed = timer.elapsed(); std::cout << "then_with_stream with transfer: " << elapsed @@ -198,13 +198,13 @@ int pika_main(pika::program_options::variables_map& vm) cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{})); + ex::continues_on(ex::thread_pool_scheduler{})); } // Do the remainder one-by-one for (std::size_t i = 0; i < non_batch_iterations; ++i) { tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{})); + ex::continues_on(ex::thread_pool_scheduler{})); } double elapsed = timer.elapsed(); std::cout << "then_with_stream with transfer batched: " << elapsed diff --git a/libs/pika/async_cuda/tests/unit/cublas_matmul.cpp b/libs/pika/async_cuda/tests/unit/cublas_matmul.cpp index 7698721df..0fcef4e49 100644 --- a/libs/pika/async_cuda/tests/unit/cublas_matmul.cpp +++ b/libs/pika/async_cuda/tests/unit/cublas_matmul.cpp @@ -165,7 +165,7 @@ void matrixMultiply(pika::cuda::experimental::cuda_scheduler& cuda_sched, sMatri pika::chrono::detail::high_resolution_timer t1; // std::cout << "calling CUBLAS...\n"; - auto gemm = ex::transfer_just(cuda_sched) | + auto gemm = ex::schedule(cuda_sched) | cu::then_with_cublas( [&](cublasHandle_t handle) { cu::check_cublas_error(cublasSgemm(handle, CUBLAS_OP_N, CUBLAS_OP_N, @@ -199,7 +199,7 @@ void matrixMultiply(pika::cuda::experimental::cuda_scheduler& cuda_sched, sMatri ex::unique_any_sender<> gemms_finished = ex::just(); for (std::size_t j = 0; j < iterations; j++) { - gemms_finished = std::move(gemms_finished) | ex::transfer(cuda_sched) | + gemms_finished = std::move(gemms_finished) | ex::continues_on(cuda_sched) | cu::then_with_cublas( [&](cublasHandle_t handle) { cu::check_cublas_error(cublasSgemm(handle, CUBLAS_OP_N, CUBLAS_OP_N, @@ -226,7 +226,7 @@ void matrixMultiply(pika::cuda::experimental::cuda_scheduler& cuda_sched, sMatri }); // when the matrix operations complete, copy the result to the host - auto copy_finished = std::move(gemms_finished_split) | ex::transfer(cuda_sched) | + auto copy_finished = std::move(gemms_finished_split) | ex::continues_on(cuda_sched) | cu::then_with_stream(pika::util::detail::bind_front(whip::memcpy_async, h_CUBLAS.data(), d_C, size_C * sizeof(T), whip::memcpy_device_to_host)); diff --git a/libs/pika/async_cuda/tests/unit/cuda_bulk.cu b/libs/pika/async_cuda/tests/unit/cuda_bulk.cu index 40a71c488..e2590bbf2 100644 --- a/libs/pika/async_cuda/tests/unit/cuda_bulk.cu +++ b/libs/pika/async_cuda/tests/unit/cuda_bulk.cu @@ -67,7 +67,7 @@ int pika_main() return p; }; - auto s = ex::transfer_just(sched, device_ptr, n) | cu::then_with_stream(malloc) | + auto s = ex::just(device_ptr, n) | ex::continues_on(sched) | cu::then_with_stream(malloc) | ex::bulk(n, f) | cu::then_with_stream(memcpy) | cu::then_with_stream(free); tt::sync_wait(std::move(s)); @@ -93,7 +93,7 @@ int pika_main() return p; }; - auto s = ex::transfer_just(sched, device_ptr, n) | cu::then_with_stream(malloc) | + auto s = ex::just(device_ptr, n) | ex::continues_on(sched) | cu::then_with_stream(malloc) | ex::bulk(pika::util::detail::make_counting_shape(n), f) | cu::then_with_stream(memcpy) | cu::then_with_stream(free); tt::sync_wait(std::move(s)); diff --git a/libs/pika/async_cuda/tests/unit/cuda_device_reset.cpp b/libs/pika/async_cuda/tests/unit/cuda_device_reset.cpp index 10025143c..fca21243f 100644 --- a/libs/pika/async_cuda/tests/unit/cuda_device_reset.cpp +++ b/libs/pika/async_cuda/tests/unit/cuda_device_reset.cpp @@ -46,7 +46,7 @@ int main(int argc, char* argv[]) for (std::size_t i = 0; i < n; ++i) { tt::sync_wait(ex::schedule(ex::thread_pool_scheduler{}) | - ex::transfer(cu::cuda_scheduler{pool}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_cublas([](cublasHandle_t) {}, CUBLAS_POINTER_MODE_HOST)); } @@ -61,7 +61,7 @@ int main(int argc, char* argv[]) for (std::size_t i = 0; i < n; ++i) { tt::sync_wait(ex::schedule(ex::thread_pool_scheduler{}) | - ex::transfer(cu::cuda_scheduler{pool}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_cusolver([](cusolverDnHandle_t) {})); } } diff --git a/libs/pika/async_cuda/tests/unit/cuda_scheduler.cu b/libs/pika/async_cuda/tests/unit/cuda_scheduler.cu index 7d64af657..11f103ec3 100644 --- a/libs/pika/async_cuda/tests/unit/cuda_scheduler.cu +++ b/libs/pika/async_cuda/tests/unit/cuda_scheduler.cu @@ -48,7 +48,7 @@ int pika_main() } { - auto s = ex::just() | ex::transfer(sched); + auto s = ex::just() | ex::continues_on(sched); CHECK_CUDA_COMPLETION_SCHEDULER(s); } @@ -81,7 +81,7 @@ int pika_main() // started). The thread pool is never accessed. auto s = ex::schedule(sched) | cu::then_with_cublas([](cublasHandle_t) {}, CUBLAS_POINTER_MODE_HOST) | - ex::transfer(ex::thread_pool_scheduler{nullptr}); + ex::continues_on(ex::thread_pool_scheduler{nullptr}); CHECK_NOT_CUDA_COMPLETION_SCHEDULER(s); #endif } diff --git a/libs/pika/async_cuda/tests/unit/cuda_sender.cu b/libs/pika/async_cuda/tests/unit/cuda_sender.cu index b569c98e6..47c4d769a 100644 --- a/libs/pika/async_cuda/tests/unit/cuda_sender.cu +++ b/libs/pika/async_cuda/tests/unit/cuda_sender.cu @@ -38,7 +38,7 @@ auto launch_saxpy_kernel(pika::cuda::experimental::cuda_scheduler& cuda_sched, S return ex::when_all(std::forward(predecessor), ex::just(reinterpret_cast(&saxpy), dim3(blocks), dim3(threads), args, std::size_t(0))) | - ex::transfer(cuda_sched) | cu::then_with_stream(whip::launch_kernel); + ex::continues_on(cuda_sched) | cu::then_with_stream(whip::launch_kernel); } template @@ -76,12 +76,10 @@ void test_saxpy(pika::cuda::experimental::cuda_scheduler& cuda_sched) // copy both arrays from cpu to gpu, putting both copies onto the stream // no need to get a future back yet - auto copy_A = - ex::transfer_just(cuda_sched, d_A, h_A, N * sizeof(float), whip::memcpy_host_to_device) | - cu::then_with_stream(whip::memcpy_async); - auto copy_B = - ex::transfer_just(cuda_sched, d_B, h_B, N * sizeof(float), whip::memcpy_host_to_device) | - cu::then_with_stream(whip::memcpy_async); + auto copy_A = ex::just(d_A, h_A, N * sizeof(float), whip::memcpy_host_to_device) | + ex::continues_on(cuda_sched) | cu::then_with_stream(whip::memcpy_async); + auto copy_B = ex::just(d_B, h_B, N * sizeof(float), whip::memcpy_host_to_device) | + ex::continues_on(cuda_sched) | cu::then_with_stream(whip::memcpy_async); unsigned int threads = 256; unsigned int blocks = (N + 255) / threads; @@ -97,7 +95,7 @@ void test_saxpy(pika::cuda::experimental::cuda_scheduler& cuda_sched) whip::memcpy_async, h_B, d_B, N * sizeof(float), whip::memcpy_device_to_host)) | // we can add a continuation to the memcpy sender, so that when the // memory copy completes, we can do new things ... - ex::transfer(ex::thread_pool_scheduler{}) | ex::then([&]() { + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then([&]() { std::cout << "saxpy completed on GPU, checking results in continuation" << std::endl; float max_error = 0.0f; for (int jdx = 0; jdx < N; jdx++) @@ -144,21 +142,21 @@ int pika_main(pika::program_options::variables_map& vm) // test kernel launch using then_with_stream float testf = 1.2345; std::cout << "schedule : cuda kernel : " << testf << std::endl; - tt::sync_wait( - ex::transfer_just(cuda_sched, testf) | cu::then_with_stream(&cuda_trivial_kernel)); + tt::sync_wait(ex::just(testf) | ex::continues_on(cuda_sched) | + cu::then_with_stream(&cuda_trivial_kernel)); // -------------------- // test kernel launch using apply and async double testd = 1.2345; std::cout << "schedule : cuda kernel : " << testd << std::endl; - tt::sync_wait( - ex::transfer_just(cuda_sched, testd) | cu::then_with_stream(&cuda_trivial_kernel)); + tt::sync_wait(ex::just(testd) | ex::continues_on(cuda_sched) | + cu::then_with_stream(&cuda_trivial_kernel)); // -------------------- // test adding a continuation to a cuda call double testd2 = 3.1415; std::cout << "then_with_stream/continuation : " << testd2 << std::endl; - tt::sync_wait(ex::transfer_just(cuda_sched, testd2) | + tt::sync_wait(ex::just(testd2) | ex::continues_on(cuda_sched) | cu::then_with_stream(&cuda_trivial_kernel) | cu::then_on_host([] { std::cout << "continuation triggered\n"; })); @@ -167,7 +165,7 @@ int pika_main(pika::program_options::variables_map& vm) // and adding a continuation with a copy of a copy std::cout << "Copying executor : " << testd2 + 1 << std::endl; auto cuda_sched_copy = cuda_sched; - tt::sync_wait(ex::transfer_just(cuda_sched_copy, testd2 + 1) | + tt::sync_wait(ex::just(testd2 + 1) | ex::continues_on(cuda_sched_copy) | cu::then_with_stream(&cuda_trivial_kernel) | cu::then_on_host([] { std::cout << "copy continuation triggered\n"; })); diff --git a/libs/pika/async_cuda/tests/unit/then_with_stream.cu b/libs/pika/async_cuda/tests/unit/then_with_stream.cu index f703a45b2..19e14a35d 100644 --- a/libs/pika/async_cuda/tests/unit/then_with_stream.cu +++ b/libs/pika/async_cuda/tests/unit/then_with_stream.cu @@ -309,10 +309,10 @@ int pika_main() { dummy::reset_counts(); auto s = - ex::just() | ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}); + ex::just() | ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}); // NOTE: then_with_stream calls triggers the receiver on a plain // std::thread. We explicitly change the context back to a pika::thread. - tt::sync_wait(ex::transfer(std::move(s), ex::thread_pool_scheduler{})); + tt::sync_wait(ex::continues_on(std::move(s), ex::thread_pool_scheduler{})); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(1)); PIKA_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0)); @@ -323,10 +323,10 @@ int pika_main() { dummy::reset_counts(); - auto s = ex::just() | ex::transfer(cu::cuda_scheduler(pool)) | + auto s = ex::just() | ex::continues_on(cu::cuda_scheduler(pool)) | cu::then_with_stream(dummy{}) | cu::then_with_stream(dummy{}) | cu::then_with_stream(dummy{}); - tt::sync_wait(ex::transfer(std::move(s), ex::thread_pool_scheduler{})); + tt::sync_wait(ex::continues_on(std::move(s), ex::thread_pool_scheduler{})); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(3)); PIKA_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0)); @@ -338,11 +338,11 @@ int pika_main() // Mixing stream transform with host scheduler { dummy::reset_counts(); - auto s = ex::just() | ex::transfer(cu::cuda_scheduler(pool)) | - cu::then_with_stream(dummy{}) | ex::transfer(ex::thread_pool_scheduler{}) | - ex::then(dummy{}) | ex::transfer(cu::cuda_scheduler(pool)) | + auto s = ex::just() | ex::continues_on(cu::cuda_scheduler(pool)) | + cu::then_with_stream(dummy{}) | ex::continues_on(ex::thread_pool_scheduler{}) | + ex::then(dummy{}) | ex::continues_on(cu::cuda_scheduler(pool)) | cu::then_with_stream(dummy{}); - tt::sync_wait(ex::transfer(std::move(s), ex::thread_pool_scheduler{})); + tt::sync_wait(ex::continues_on(std::move(s), ex::thread_pool_scheduler{})); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(1)); PIKA_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(2)); PIKA_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0)); @@ -354,8 +354,8 @@ int pika_main() { dummy::reset_counts(); auto s = ex::schedule(ex::thread_pool_scheduler{}) | ex::then(dummy{}) | - ex::transfer(cu::cuda_scheduler(pool)) | cu::then_with_stream(dummy{}) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(dummy{}); + ex::continues_on(cu::cuda_scheduler(pool)) | cu::then_with_stream(dummy{}) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}); tt::sync_wait(std::move(s)); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(2)); PIKA_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(1)); @@ -368,9 +368,10 @@ int pika_main() // Only stream transform with non-void values { dummy::reset_counts(); - auto s = - ex::just(1) | ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}); - PIKA_TEST_EQ(tt::sync_wait(ex::transfer(std::move(s), ex::thread_pool_scheduler{})), 2.0); + auto s = ex::just(1) | ex::continues_on(cu::cuda_scheduler{pool}) | + cu::then_with_stream(dummy{}); + PIKA_TEST_EQ( + tt::sync_wait(ex::continues_on(std::move(s), ex::thread_pool_scheduler{})), 2.0); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0)); @@ -381,10 +382,11 @@ int pika_main() { dummy::reset_counts(); - auto s = ex::just(1) | ex::transfer(cu::cuda_scheduler{pool}) | + auto s = ex::just(1) | ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | cu::then_with_stream(dummy{}) | cu::then_with_stream(dummy{}); - PIKA_TEST_EQ(tt::sync_wait(ex::transfer(std::move(s), ex::thread_pool_scheduler{})), 4.0); + PIKA_TEST_EQ( + tt::sync_wait(ex::continues_on(std::move(s), ex::thread_pool_scheduler{})), 4.0); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0)); @@ -396,26 +398,29 @@ int pika_main() // Non-copyable or non-default-constructible types { auto s = ex::just(custom_type_non_default_constructible{42}) | - ex::transfer(cu::cuda_scheduler{pool}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(&non_default_constructible_params); - PIKA_TEST_EQ(tt::sync_wait(ex::transfer(std::move(s), ex::thread_pool_scheduler{})).x, 42); + PIKA_TEST_EQ( + tt::sync_wait(ex::continues_on(std::move(s), ex::thread_pool_scheduler{})).x, 42); } { auto s = ex::just(custom_type_non_default_constructible_non_copyable{42}) | - ex::transfer(cu::cuda_scheduler{pool}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(&non_default_constructible_non_copyable_params); - PIKA_TEST_EQ(tt::sync_wait(ex::transfer(std::move(s), ex::thread_pool_scheduler{})).x, 42); + PIKA_TEST_EQ( + tt::sync_wait(ex::continues_on(std::move(s), ex::thread_pool_scheduler{})).x, 42); } // Mixing stream transform with host scheduler with non-void values { dummy::reset_counts(); - auto s = ex::just(1) | ex::transfer(cu::cuda_scheduler{pool}) | - cu::then_with_stream(dummy{}) | ex::transfer(ex::thread_pool_scheduler{}) | - ex::then(dummy{}) | ex::transfer(cu::cuda_scheduler{pool}) | + auto s = ex::just(1) | ex::continues_on(cu::cuda_scheduler{pool}) | + cu::then_with_stream(dummy{}) | ex::continues_on(ex::thread_pool_scheduler{}) | + ex::then(dummy{}) | ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}); - PIKA_TEST_EQ(tt::sync_wait(ex::transfer(std::move(s), ex::thread_pool_scheduler{})), 4.0); + PIKA_TEST_EQ( + tt::sync_wait(ex::continues_on(std::move(s), ex::thread_pool_scheduler{})), 4.0); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0)); @@ -426,9 +431,9 @@ int pika_main() { dummy::reset_counts(); - auto s = ex::just(1) | ex::transfer(ex::thread_pool_scheduler{}) | ex::then(dummy{}) | - ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(dummy{}); + auto s = ex::just(1) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 4.0); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); PIKA_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0)); @@ -441,9 +446,9 @@ int pika_main() { dummy::reset_counts(); - auto s = ex::transfer_just(ex::thread_pool_scheduler{}, 1) | ex::then(dummy{}) | - ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | - cu::then_with_stream(dummy{}) | ex::transfer(ex::thread_pool_scheduler{}) | + auto s = ex::just(1) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | + cu::then_with_stream(dummy{}) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 5.0); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); @@ -464,7 +469,7 @@ int pika_main() [&](whip::stream_t stream) { PIKA_TEST_EQ(stream, first_stream); }) | cu::then_with_stream( [&](whip::stream_t stream) { PIKA_TEST_EQ(stream, first_stream); }) | - ex::transfer(cu::cuda_scheduler{pool}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream([&](whip::stream_t stream) { PIKA_TEST_NEQ(stream, first_stream); second_stream = stream; @@ -483,15 +488,16 @@ int pika_main() whip::malloc(&p, sizeof(type)); auto s = ex::just(p, &p_h, sizeof(type), whip::memcpy_host_to_device) | - ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(whip::memcpy_async) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then([p] { return p; }) | - ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(increment{}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(whip::memcpy_async) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then([p] { return p; }) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(increment{}) | cu::then_with_stream(increment{}) | cu::then_with_stream(increment{}); tt::sync_wait(ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)), ex::just(whip::memcpy_device_to_host)) | - ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(whip::memcpy_async) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then([&p_h] { PIKA_TEST_EQ(p_h, 3); }) | - ex::transfer(ex::thread_pool_scheduler{})); + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(whip::memcpy_async) | + ex::continues_on(ex::thread_pool_scheduler{}) | + ex::then([&p_h] { PIKA_TEST_EQ(p_h, 3); }) | + ex::continues_on(ex::thread_pool_scheduler{})); whip::free(p); } @@ -499,10 +505,10 @@ int pika_main() // cuBLAS and cuSOLVER { dummy::reset_counts(); - auto s = ex::just(1) | ex::transfer(ex::thread_pool_scheduler{}) | ex::then(dummy{}) | - ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | + auto s = ex::just(1) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | cu::then_with_cublas(dummy{}, CUBLAS_POINTER_MODE_HOST) | - cu::then_with_cusolver(dummy{}) | ex::transfer(ex::thread_pool_scheduler{}) | + cu::then_with_cusolver(dummy{}) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 6); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); @@ -531,10 +537,10 @@ int pika_main() { dummy::reset_counts(); - auto s = ex::just(1) | ex::transfer(ex::thread_pool_scheduler{}) | ex::then(dummy{}) | - ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | + auto s = ex::just(1) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}) | + ex::continues_on(cu::cuda_scheduler{pool}) | cu::then_with_stream(dummy{}) | cu::then_on_host(dummy{}) | cu::then_with_cublas(dummy{}, CUBLAS_POINTER_MODE_HOST) | - cu::then_with_cusolver(dummy{}) | ex::transfer(ex::thread_pool_scheduler{}) | + cu::then_with_cusolver(dummy{}) | ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(dummy{}); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 7.0); PIKA_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0)); diff --git a/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp index ef644d5ba..23a808c0a 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include diff --git a/libs/pika/async_mpi/include/pika/async_mpi/mpi_helpers.hpp b/libs/pika/async_mpi/include/pika/async_mpi/mpi_helpers.hpp index 0efc1889a..e6b1a6951 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/mpi_helpers.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/mpi_helpers.hpp @@ -13,12 +13,11 @@ #include #include #include +#include #include #include #include #include -#include -#include #include #include #include @@ -135,8 +134,8 @@ namespace pika::mpi::experimental::detail { execution::thread_priority p = use_priority_boost(op_state.mode_flags) ? execution::thread_priority::boost : execution::thread_priority::normal; - auto snd0 = ex::transfer_just(default_pool_scheduler(p)) | - ex::then([&op_state]() mutable { + auto snd0 = + ex::schedule(default_pool_scheduler(p)) | ex::then([&op_state]() mutable { PIKA_DETAIL_DP(mpi_tran<5>, debug(str<>("set_value"))); ex::set_value(PIKA_MOVE(op_state.receiver)); }); diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index 91d5a379e..653f7c4a2 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -15,11 +15,11 @@ #include #include #include +#include #include #include +#include #include -#include -#include #include #include #include @@ -48,10 +48,10 @@ namespace pika::mpi::experimental { PIKA_DETAIL_DP(mpi_tran<5>, debug(str<>("transform_mpi_t"), "tag_fallback_invoke")); using execution::thread_priority; + using pika::execution::experimental::continues_on; using pika::execution::experimental::just; using pika::execution::experimental::let_value; using pika::execution::experimental::transfer; - using pika::execution::experimental::transfer_just; using pika::execution::experimental::unique_any_sender; // get mpi completion mode settings @@ -82,9 +82,10 @@ namespace pika::mpi::experimental { { if (request == MPI_REQUEST_NULL) { - return transfer_just(default_pool_scheduler(p)); + return ex::schedule(default_pool_scheduler(p)); } - return transfer_just(default_pool_scheduler(p), request) | trigger_mpi(mode); + return just(request) | ex::continues_on(default_pool_scheduler(p)) | + trigger_mpi(mode); } if (request == MPI_REQUEST_NULL) { return just(); } return just(request) | trigger_mpi(mode); @@ -97,7 +98,7 @@ namespace pika::mpi::experimental { } else { - auto snd0 = PIKA_FORWARD(Sender, sender) | transfer(mpi_pool_scheduler(p)); + auto snd0 = PIKA_FORWARD(Sender, sender) | continues_on(mpi_pool_scheduler(p)); return dispatch_mpi_sender{PIKA_MOVE(snd0), PIKA_FORWARD(F, f)} | let_value(completion_snd); } diff --git a/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp index 393364b87..1be177e77 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include diff --git a/libs/pika/concurrency/tests/unit/contiguous_index_queue.cpp b/libs/pika/concurrency/tests/unit/contiguous_index_queue.cpp index ab757e4ae..1606309fe 100644 --- a/libs/pika/concurrency/tests/unit/contiguous_index_queue.cpp +++ b/libs/pika/concurrency/tests/unit/contiguous_index_queue.cpp @@ -164,9 +164,8 @@ void test_concurrent(pop_mode m) for (std::size_t i = 0; i < num_threads; ++i) { - senders.push_back(ex::transfer_just(ex::thread_pool_scheduler{}, m, i, std::ref(b), - std::ref(q), std::ref(popped_indices[i])) | - ex::then(test_concurrent_worker)); + senders.push_back(ex::just(m, i, std::ref(b), std::ref(q), std::ref(popped_indices[i])) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(test_concurrent_worker)); } tt::sync_wait(ex::when_all_vector(std::move(senders))); diff --git a/libs/pika/coroutines/tests/regressions/coroutine_function_destructor_yield_4800.cpp b/libs/pika/coroutines/tests/regressions/coroutine_function_destructor_yield_4800.cpp index 5cf96e8e7..8374b57af 100644 --- a/libs/pika/coroutines/tests/regressions/coroutine_function_destructor_yield_4800.cpp +++ b/libs/pika/coroutines/tests/regressions/coroutine_function_destructor_yield_4800.cpp @@ -54,8 +54,8 @@ int pika_main() // destructor being called late in the coroutine call operator. for (int i = 0; i < num_iterations; ++i) { - tt::sync_wait( - ex::transfer_just(ex::thread_pool_scheduler{}, yielder{}) | ex::then([](auto&&) {})); + tt::sync_wait(ex::just(yielder{}) | ex::continues_on(ex::thread_pool_scheduler{}) | + ex::then([](auto&&) {})); } for (int i = 0; i < num_iterations; ++i) diff --git a/libs/pika/execution/CMakeLists.txt b/libs/pika/execution/CMakeLists.txt index f5dfe31dc..e6f476edc 100644 --- a/libs/pika/execution/CMakeLists.txt +++ b/libs/pika/execution/CMakeLists.txt @@ -8,6 +8,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") set(execution_headers pika/execution/algorithms/bulk.hpp + pika/execution/algorithms/continues_on.hpp pika/execution/algorithms/detail/helpers.hpp pika/execution/algorithms/detail/partial_algorithm.hpp pika/execution/algorithms/drop_operation_state.hpp @@ -24,7 +25,6 @@ set(execution_headers pika/execution/algorithms/start_detached.hpp pika/execution/algorithms/sync_wait.hpp pika/execution/algorithms/then.hpp - pika/execution/algorithms/transfer.hpp pika/execution/algorithms/transfer_just.hpp pika/execution/algorithms/transfer_when_all.hpp pika/execution/algorithms/unpack.hpp diff --git a/libs/pika/execution/include/pika/execution/algorithms/transfer.hpp b/libs/pika/execution/include/pika/execution/algorithms/continues_on.hpp similarity index 74% rename from libs/pika/execution/include/pika/execution/algorithms/transfer.hpp rename to libs/pika/execution/include/pika/execution/algorithms/continues_on.hpp index 939d19f07..d540035b6 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/transfer.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/continues_on.hpp @@ -22,7 +22,8 @@ # include namespace pika::execution::experimental { - inline constexpr struct transfer_t final : pika::functional::detail::tag_priority + inline constexpr struct continues_on_t final + : pika::functional::detail::tag_priority { private: // clang-format off @@ -33,15 +34,15 @@ namespace pika::execution::experimental { pika::execution::experimental::detail:: is_completion_scheduler_tag_invocable_v< pika::execution::experimental::set_value_t, Sender, - transfer_t, Scheduler>)> + continues_on_t, Scheduler>)> // clang-format on friend constexpr PIKA_FORCEINLINE auto - tag_override_invoke(transfer_t, Sender&& sender, Scheduler&& scheduler) + tag_override_invoke(continues_on_t, Sender&& sender, Scheduler&& scheduler) { auto completion_scheduler = pika::execution::experimental::get_completion_scheduler< pika::execution::experimental::set_value_t>( pika::execution::experimental::get_env(sender)); - return pika::functional::detail::tag_invoke(transfer_t{}, + return pika::functional::detail::tag_invoke(continues_on_t{}, PIKA_MOVE(completion_scheduler), PIKA_FORWARD(Sender, sender), PIKA_FORWARD(Scheduler, scheduler)); } @@ -54,7 +55,7 @@ namespace pika::execution::experimental { )> // clang-format on friend constexpr PIKA_FORCEINLINE auto - tag_fallback_invoke(transfer_t, Sender&& predecessor_sender, Scheduler&& scheduler) + tag_fallback_invoke(continues_on_t, Sender&& predecessor_sender, Scheduler&& scheduler) { return schedule_from( PIKA_FORWARD(Scheduler, scheduler), PIKA_FORWARD(Sender, predecessor_sender)); @@ -62,11 +63,15 @@ namespace pika::execution::experimental { template friend constexpr PIKA_FORCEINLINE auto - tag_fallback_invoke(transfer_t, Scheduler&& scheduler) + tag_fallback_invoke(continues_on_t, Scheduler&& scheduler) { - return detail::partial_algorithm{ + return detail::partial_algorithm{ PIKA_FORWARD(Scheduler, scheduler)}; } - } transfer{}; + } continues_on{}; + + using transfer_t PIKA_DEPRECATED("transfer_t has been renamed continues_on_t") = continues_on_t; + PIKA_DEPRECATED("transfer has been renamed continues_on") + inline constexpr continues_on_t transfer{}; } // namespace pika::execution::experimental #endif diff --git a/libs/pika/execution/include/pika/execution/algorithms/ensure_started.hpp b/libs/pika/execution/include/pika/execution/algorithms/ensure_started.hpp index 6e1e5f491..ed4124ff0 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/ensure_started.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/ensure_started.hpp @@ -10,36 +10,37 @@ #if defined(PIKA_HAVE_STDEXEC) # include -#else -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include - -# include -# include -# include -# include -# include -# include -# include -# include -# include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace pika::ensure_started_detail { template @@ -84,12 +85,31 @@ namespace pika::ensure_started_detail { template struct ensure_started_sender_impl::ensure_started_sender_type { + PIKA_STDEXEC_SENDER_CONCEPT + struct ensure_started_sender_tag { }; using allocator_type = Allocator; +#if defined(PIKA_HAVE_STDEXEC) + template + using value_types_helper = pika::execution::experimental::completion_signatures< + pika::execution::experimental::set_value_t(std::decay_t...)>; + + template + using error_types_helper = pika::execution::experimental::completion_signatures< + pika::execution::experimental::set_error_t(std::decay_t)>; + + using completion_signatures = + pika::execution::experimental::transform_completion_signatures_of, + value_types_helper, error_types_helper>; +#else template struct value_types_helper { @@ -110,6 +130,7 @@ namespace pika::ensure_started_detail { std::exception_ptr>>; static constexpr bool sends_done = false; +#endif struct shared_state { @@ -137,6 +158,21 @@ namespace pika::ensure_started_detail { { using type = pika::util::detail::transform_t; }; +#if defined(PIKA_HAVE_STDEXEC) + using value_type = pika::util::detail::prepend_t< + pika::util::detail::transform_t< + typename pika::execution::experimental::value_types_of_t, + value_types_helper>, + pika::detail::monostate>; + using error_type = pika::util::detail::unique_t, + std::decay>, + std::exception_ptr>>; +#else using value_type = pika::util::detail::prepend_t< pika::util::detail::transform_t< typename pika::execution::experimental::sender_traits< @@ -145,6 +181,7 @@ namespace pika::ensure_started_detail { pika::detail::monostate>; using error_type = pika::util::detail::unique_t, std::exception_ptr>>; +#endif pika::detail::variant v; @@ -154,6 +191,8 @@ namespace pika::ensure_started_detail { struct ensure_started_receiver { + PIKA_STDEXEC_RECEIVER_CONCEPT + pika::intrusive_ptr state; template @@ -167,6 +206,7 @@ namespace pika::ensure_started_detail { friend void tag_invoke(pika::execution::experimental::set_stopped_t, ensure_started_receiver r) noexcept { + r.state->v.template emplace(); r.state->set_predecessor_done(); }; @@ -178,13 +218,22 @@ namespace pika::ensure_started_detail { { using type = pika::util::detail::transform_t; }; - +#if defined(PIKA_HAVE_STDEXEC) + using value_type = pika::util::detail::prepend_t< + pika::util::detail::transform_t< + typename pika::execution::experimental::value_types_of_t, + value_types_helper>, + pika::detail::monostate>; +#else using value_type = pika::util::detail::prepend_t< pika::util::detail::transform_t< typename pika::execution::experimental::sender_traits< Sender>::template value_types, value_types_helper>, pika::detail::monostate>; +#endif template friend auto tag_invoke(pika::execution::experimental::set_value_t, @@ -351,6 +400,7 @@ namespace pika::ensure_started_detail { if (!start_called.exchange(true)) { PIKA_ASSERT(os.has_value()); + // NOLINTNEXTLINE(bugprone-unchecked-optional-access) pika::execution::experimental::start(*os); } } @@ -516,4 +566,3 @@ namespace pika::execution::experimental { } } ensure_started{}; } // namespace pika::execution::experimental -#endif diff --git a/libs/pika/execution/include/pika/execution/algorithms/start_detached.hpp b/libs/pika/execution/include/pika/execution/algorithms/start_detached.hpp index 064ce750d..5a82765c0 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/start_detached.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/start_detached.hpp @@ -10,22 +10,24 @@ #if defined(PIKA_HAVE_STDEXEC) # include -#else -# include -# include -# include -# include -# include -# include -# include -# include -# include - -# include -# include -# include -# include -# include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include namespace pika::start_detached_detail { template @@ -33,12 +35,14 @@ namespace pika::start_detached_detail { { struct start_detached_receiver { + PIKA_STDEXEC_RECEIVER_CONCEPT + operation_state_holder& op_state; template -# if !defined(__NVCC__) +#if !defined(__NVCC__) [[noreturn]] -# endif +#endif friend void tag_invoke(pika::execution::experimental::set_error_t, start_detached_receiver&& r, Error error) noexcept { @@ -134,4 +138,3 @@ namespace pika::execution::experimental { } } start_detached{}; } // namespace pika::execution::experimental -#endif diff --git a/libs/pika/execution/include/pika/execution/algorithms/transfer_just.hpp b/libs/pika/execution/include/pika/execution/algorithms/transfer_just.hpp index 095b71146..a06d1bd89 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/transfer_just.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/transfer_just.hpp @@ -11,13 +11,15 @@ #if defined(PIKA_HAVE_STDEXEC) # include #else +# include # include -# include # include # include namespace pika::execution::experimental { + PIKA_DEPRECATED( + "transfer_just will be removed in the future, use transfer and just separately instead") inline constexpr struct transfer_just_t final : pika::functional::detail::tag_fallback { @@ -26,7 +28,7 @@ namespace pika::execution::experimental { friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(transfer_just_t, Scheduler&& scheduler, Ts&&... ts) { - return transfer(just(PIKA_FORWARD(Ts, ts)...), PIKA_FORWARD(Scheduler, scheduler)); + return continues_on(just(PIKA_FORWARD(Ts, ts)...), PIKA_FORWARD(Scheduler, scheduler)); } } transfer_just{}; } // namespace pika::execution::experimental diff --git a/libs/pika/execution/include/pika/execution/algorithms/transfer_when_all.hpp b/libs/pika/execution/include/pika/execution/algorithms/transfer_when_all.hpp index 7794be745..e2566546d 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/transfer_when_all.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/transfer_when_all.hpp @@ -11,13 +11,15 @@ #if defined(PIKA_HAVE_STDEXEC) # include #else -# include +# include # include # include # include namespace pika::execution::experimental { + PIKA_DEPRECATED("transfer_when_all will be removed in the future, use transfer and when_all " + "separately instead") inline constexpr struct transfer_when_all_t final : pika::functional::detail::tag_fallback { @@ -26,7 +28,8 @@ namespace pika::execution::experimental { friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(transfer_when_all_t, Scheduler&& scheduler, Ts&&... ts) { - return transfer(when_all(PIKA_FORWARD(Ts, ts)...), PIKA_FORWARD(Scheduler, scheduler)); + return continues_on( + when_all(PIKA_FORWARD(Ts, ts)...), PIKA_FORWARD(Scheduler, scheduler)); } } transfer_when_all{}; } // namespace pika::execution::experimental diff --git a/libs/pika/execution/tests/unit/CMakeLists.txt b/libs/pika/execution/tests/unit/CMakeLists.txt index 05989d2e2..ba62189e6 100644 --- a/libs/pika/execution/tests/unit/CMakeLists.txt +++ b/libs/pika/execution/tests/unit/CMakeLists.txt @@ -28,6 +28,12 @@ set(tests scheduler_queries ) +# Disable deprecation warnings since transfer/transfer_just have been deprecated. Also explicitly +# disable precompiled headers since the precompiled headers may have +# PIKA_HAVE_DEPRECATION_WARNINGS=1 set. +set(algorithm_transfer_just_FLAGS COMPILE_FLAGS "-DPIKA_HAVE_DEPRECATION_WARNINGS=0" NOPCH) +set(algorithm_transfer_FLAGS COMPILE_FLAGS "-DPIKA_HAVE_DEPRECATION_WARNINGS=0" NOPCH) + foreach(test ${tests}) set(sources ${test}.cpp) diff --git a/libs/pika/execution_base/include/pika/execution_base/stdexec_forward.hpp b/libs/pika/execution_base/include/pika/execution_base/stdexec_forward.hpp index af759ae87..fbab800e5 100644 --- a/libs/pika/execution_base/include/pika/execution_base/stdexec_forward.hpp +++ b/libs/pika/execution_base/include/pika/execution_base/stdexec_forward.hpp @@ -12,5 +12,10 @@ namespace pika::execution::experimental { using namespace stdexec; -} + +# if !defined(PIKA_HAVE_STDEXEC_CONTINUES_ON) + using continues_on_t = stdexec::continue_on_t; + inline constexpr continues_on_t continues_on{}; +# endif +} // namespace pika::execution::experimental #endif diff --git a/libs/pika/execution_base/tests/unit/any_sender.cpp b/libs/pika/execution_base/tests/unit/any_sender.cpp index 9eb1a59a8..de28efbfa 100644 --- a/libs/pika/execution_base/tests/unit/any_sender.cpp +++ b/libs/pika/execution_base/tests/unit/any_sender.cpp @@ -527,13 +527,13 @@ void test_unique_any_sender_set_error() void test_any_sender_set_stopped() { ex::any_sender<> as{ex::just()}; - tt::sync_wait(ex::transfer(std::move(as), ex::std_thread_scheduler{})); + tt::sync_wait(ex::continues_on(std::move(as), ex::std_thread_scheduler{})); } void test_unique_any_sender_set_stopped() { ex::any_sender<> as{ex::just()}; - tt::sync_wait(ex::transfer(std::move(as), ex::std_thread_scheduler{})); + tt::sync_wait(ex::continues_on(std::move(as), ex::std_thread_scheduler{})); } // This tests that the empty vtable types used in the implementation of any_* diff --git a/libs/pika/executors/tests/unit/std_thread_scheduler.cpp b/libs/pika/executors/tests/unit/std_thread_scheduler.cpp index 162efe6e6..2a2056073 100644 --- a/libs/pika/executors/tests/unit/std_thread_scheduler.cpp +++ b/libs/pika/executors/tests/unit/std_thread_scheduler.cpp @@ -254,7 +254,7 @@ void test_transfer_basic() }); auto work2 = ex::then(std::move(work1), [=, ¤t_id]() { PIKA_TEST_EQ(current_id, std::this_thread::get_id()); }); - auto transfer1 = ex::transfer(work2, sched); + auto transfer1 = ex::continues_on(work2, sched); auto work3 = ex::then(transfer1, [=, ¤t_id]() { std::thread::id new_id = std::this_thread::get_id(); PIKA_TEST_NEQ(current_id, new_id); @@ -263,7 +263,7 @@ void test_transfer_basic() }); auto work4 = ex::then( work3, [=, ¤t_id]() { PIKA_TEST_EQ(current_id, std::this_thread::get_id()); }); - auto transfer2 = ex::transfer(work4, sched); + auto transfer2 = ex::continues_on(work4, sched); auto work5 = ex::then(transfer2, [=, ¤t_id]() { std::thread::id new_id = std::this_thread::get_id(); PIKA_TEST_NEQ(current_id, new_id); @@ -290,7 +290,7 @@ void test_transfer_arguments() PIKA_TEST_EQ(current_id, std::this_thread::get_id()); return x / 2.0; }); - auto transfer1 = ex::transfer(work2, sched); + auto transfer1 = ex::continues_on(work2, sched); auto work3 = ex::then(transfer1, [=, ¤t_id](double x) { std::thread::id new_id = std::this_thread::get_id(); PIKA_TEST_NEQ(current_id, new_id); @@ -302,7 +302,7 @@ void test_transfer_arguments() PIKA_TEST_EQ(current_id, std::this_thread::get_id()); return "result: " + std::to_string(x); }); - auto transfer2 = ex::transfer(work4, sched); + auto transfer2 = ex::continues_on(work4, sched); auto work5 = ex::then(transfer2, [=, ¤t_id](std::string s) { std::thread::id new_id = std::this_thread::get_id(); PIKA_TEST_NEQ(current_id, new_id); @@ -332,7 +332,7 @@ void test_just_void() std::thread::id parent_id = std::this_thread::get_id(); auto begin = ex::just(); - auto transfer1 = ex::transfer(begin, ex::std_thread_scheduler{}); + auto transfer1 = ex::continues_on(begin, ex::std_thread_scheduler{}); auto work1 = ex::then( transfer1, [parent_id]() { PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); }); tt::sync_wait(std::move(work1)); @@ -356,7 +356,7 @@ void test_just_one_arg() std::thread::id parent_id = std::this_thread::get_id(); auto begin = ex::just(3); - auto transfer1 = ex::transfer(begin, ex::std_thread_scheduler{}); + auto transfer1 = ex::continues_on(begin, ex::std_thread_scheduler{}); auto work1 = ex::then(transfer1, [parent_id](int x) { PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); PIKA_TEST_EQ(x, 3); @@ -383,7 +383,7 @@ void test_just_two_args() std::thread::id parent_id = std::this_thread::get_id(); auto begin = ex::just(3, std::string("hello")); - auto transfer1 = ex::transfer(begin, ex::std_thread_scheduler{}); + auto transfer1 = ex::continues_on(begin, ex::std_thread_scheduler{}); auto work1 = ex::then(transfer1, [parent_id](int x, std::string y) { PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); PIKA_TEST_EQ(x, 3); @@ -393,21 +393,21 @@ void test_just_two_args() } } -void test_transfer_just_void() +void test_just_continues_on_void() { std::thread::id parent_id = std::this_thread::get_id(); - auto begin = ex::transfer_just(ex::std_thread_scheduler{}); + auto begin = ex::just() | ex::continues_on(ex::std_thread_scheduler{}); auto work1 = ex::then(begin, [parent_id]() { PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); }); tt::sync_wait(std::move(work1)); } -void test_transfer_just_one_arg() +void test_just_continues_on_one_arg() { std::thread::id parent_id = std::this_thread::get_id(); - auto begin = ex::transfer_just(ex::std_thread_scheduler{}, 3); + auto begin = ex::just(3) | ex::continues_on(ex::std_thread_scheduler{}); auto work1 = ex::then(begin, [parent_id](int x) { PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); PIKA_TEST_EQ(x, 3); @@ -415,11 +415,11 @@ void test_transfer_just_one_arg() tt::sync_wait(std::move(work1)); } -void test_transfer_just_two_args() +void test_just_continues_on_two_args() { std::thread::id parent_id = std::this_thread::get_id(); - auto begin = ex::transfer_just(ex::std_thread_scheduler{}, 3, std::string("hello")); + auto begin = ex::just(3, std::string("hello")) | ex::continues_on(ex::std_thread_scheduler{}); auto work1 = ex::then(begin, [parent_id](int x, std::string y) { PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); PIKA_TEST_EQ(x, 3); @@ -649,17 +649,18 @@ void test_ensure_started() } { - auto s = ex::transfer_just(sched, 42) | ex::ensure_started(); + auto s = ex::just(42) | ex::continues_on(sched) | ex::ensure_started(); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto s = ex::transfer_just(sched, 42) | ex::ensure_started() | ex::transfer(sched); + auto s = + ex::just(42) | ex::continues_on(sched) | ex::ensure_started() | ex::continues_on(sched); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto s = ex::transfer_just(sched, 42) | ex::ensure_started() | ex::split(); + auto s = ex::just(42) | ex::continues_on(sched) | ex::ensure_started() | ex::split(); PIKA_TEST_EQ(tt::sync_wait(s), 42); PIKA_TEST_EQ(tt::sync_wait(s), 42); PIKA_TEST_EQ(tt::sync_wait(s), 42); @@ -760,11 +761,11 @@ void test_ensure_started_when_all() std::unique_lock l{mtx}; cond.wait(l, [&]() { return started; }); } - auto succ1 = s | ex::transfer(sched) | ex::then([&](int const& x) { + auto succ1 = s | ex::continues_on(sched) | ex::then([&](int const& x) { ++successor_task_calls; return x + 1; }); - auto succ2 = s | ex::transfer(sched) | ex::then([&](int const& x) { + auto succ2 = s | ex::continues_on(sched) | ex::then([&](int const& x) { ++successor_task_calls; return x + 2; }); @@ -785,17 +786,17 @@ void test_split() } { - auto s = ex::transfer_just(sched, 42) | ex::split(); + auto s = ex::just(42) | ex::continues_on(sched) | ex::split(); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto s = ex::transfer_just(sched, 42) | ex::split() | ex::transfer(sched); + auto s = ex::just(42) | ex::continues_on(sched) | ex::split() | ex::continues_on(sched); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto s = ex::transfer_just(sched, 42) | ex::split(); + auto s = ex::just(42) | ex::continues_on(sched) | ex::split(); PIKA_TEST_EQ(tt::sync_wait(s), 42); PIKA_TEST_EQ(tt::sync_wait(s), 42); PIKA_TEST_EQ(tt::sync_wait(s), 42); @@ -855,11 +856,11 @@ void test_split_when_all() ++first_task_calls; return 3; }) | ex::split(); - auto succ1 = s | ex::transfer(sched) | ex::then([&](int const& x) { + auto succ1 = s | ex::continues_on(sched) | ex::then([&](int const& x) { ++successor_task_calls; return x + 1; }); - auto succ2 = s | ex::transfer(sched) | ex::then([&](int const& x) { + auto succ2 = s | ex::continues_on(sched) | ex::then([&](int const& x) { ++successor_task_calls; return x + 2; }); @@ -883,54 +884,57 @@ void test_let_value() } { - auto result = tt::sync_wait( - ex::schedule(sched) | ex::let_value([=]() { return ex::transfer_just(sched, 42); })); + auto result = tt::sync_wait(ex::schedule(sched) | + ex::let_value([=]() { return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } { auto result = tt::sync_wait( - ex::just() | ex::let_value([=]() { return ex::transfer_just(sched, 42); })); + ex::just() | ex::let_value([=]() { return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } // int predecessor, value ignored { - auto result = tt::sync_wait( - ex::transfer_just(sched, 43) | ex::let_value([](int&) { return ex::just(42); })); + auto result = tt::sync_wait(ex::just(43) | ex::continues_on(sched) | + ex::let_value([](int&) { return ex::just(42); })); PIKA_TEST_EQ(result, 42); } { - auto result = tt::sync_wait(ex::transfer_just(sched, 43) | - ex::let_value([=](int&) { return ex::transfer_just(sched, 42); })); + auto result = tt::sync_wait(ex::just(43) | ex::continues_on(sched) | + ex::let_value([=](int&) { return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } { - auto result = tt::sync_wait( - ex::just(43) | ex::let_value([=](int&) { return ex::transfer_just(sched, 42); })); + auto result = tt::sync_wait(ex::just(43) | + ex::let_value([=](int&) { return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } // int predecessor, value used { - auto result = tt::sync_wait(ex::transfer_just(sched, 43) | ex::let_value([](int& x) { - return ex::just(42) | ex::then([&](int y) { return x + y; }); - })); + auto result = + tt::sync_wait(ex::just(43) | ex::continues_on(sched) | ex::let_value([](int& x) { + return ex::just(42) | ex::then([&](int y) { return x + y; }); + })); PIKA_TEST_EQ(result, 85); } { - auto result = tt::sync_wait(ex::transfer_just(sched, 43) | ex::let_value([=](int& x) { - return ex::transfer_just(sched, 42) | ex::then([&](int y) { return x + y; }); - })); + auto result = + tt::sync_wait(ex::just(43) | ex::continues_on(sched) | ex::let_value([=](int& x) { + return ex::just(42) | ex::continues_on(sched) | + ex::then([&](int y) { return x + y; }); + })); PIKA_TEST_EQ(result, 85); } { auto result = tt::sync_wait(ex::just(43) | ex::let_value([=](int& x) { - return ex::transfer_just(sched, 42) | ex::then([&](int y) { return x + y; }); + return ex::just(42) | ex::continues_on(sched) | ex::then([&](int y) { return x + y; }); })); PIKA_TEST_EQ(result, 85); } @@ -941,7 +945,7 @@ void test_let_value() try { - tt::sync_wait(ex::transfer_just(sched, 43) | ex::then([](int) -> int { + tt::sync_wait(ex::just(43) | ex::continues_on(sched) | ex::then([](int) -> int { throw std::runtime_error("error"); }) | ex::let_value([](int&) { PIKA_TEST(false); @@ -996,7 +1000,7 @@ void test_let_error() ex::let_error([=, &called](std::exception_ptr& ep) { called = true; check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched); + return ex::schedule(sched); })); PIKA_TEST(called); } @@ -1007,7 +1011,7 @@ void test_let_error() ex::let_error([=, &called](std::exception_ptr& ep) { called = true; check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched); + return ex::schedule(sched); })); PIKA_TEST(called); } @@ -1028,7 +1032,7 @@ void test_let_error() throw std::runtime_error("error"); }) | ex::let_error([=](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched, 42); + return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } @@ -1038,15 +1042,15 @@ void test_let_error() throw std::runtime_error("error"); }) | ex::let_error([=](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched, 42); + return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } // predecessor doesn't throw, let sender is ignored { - auto result = - tt::sync_wait(ex::transfer_just(sched, 42) | ex::let_error([](std::exception_ptr) { + auto result = tt::sync_wait( + ex::just(42) | ex::continues_on(sched) | ex::let_error([](std::exception_ptr) { PIKA_TEST(false); return ex::just(43); })); @@ -1054,10 +1058,10 @@ void test_let_error() } { - auto result = - tt::sync_wait(ex::transfer_just(sched, 42) | ex::let_error([=](std::exception_ptr) { + auto result = tt::sync_wait( + ex::just(42) | ex::continues_on(sched) | ex::let_error([=](std::exception_ptr) { PIKA_TEST(false); - return ex::transfer_just(sched, 43); + return ex::just(43) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } @@ -1065,7 +1069,7 @@ void test_let_error() { auto result = tt::sync_wait(ex::just(42) | ex::let_error([=](std::exception_ptr) { PIKA_TEST(false); - return ex::transfer_just(sched, 43); + return ex::just(43) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } @@ -1136,11 +1140,12 @@ void test_bulk() std::vector v(n, -1); std::thread::id parent_id = std::this_thread::get_id(); - auto v_out = tt::sync_wait(ex::transfer_just(ex::std_thread_scheduler{}, std::move(v)) | - ex::bulk(n, [&parent_id](int i, std::vector& v) { - v[i] = i; - PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); - })); + auto v_out = + tt::sync_wait(ex::just(std::move(v)) | ex::continues_on(ex::std_thread_scheduler{}) | + ex::bulk(n, [&parent_id](int i, std::vector& v) { + v[i] = i; + PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); + })); for (int i = 0; i < n; ++i) { PIKA_TEST_EQ(v_out[i], i); } } @@ -1173,7 +1178,7 @@ void test_bulk() try { - tt::sync_wait(ex::transfer_just(ex::std_thread_scheduler{}) | ex::bulk(n, [&v](int i) { + tt::sync_wait(ex::schedule(ex::std_thread_scheduler{}) | ex::bulk(n, [&v](int i) { if (i == i_fail) { throw std::runtime_error("error"); } v[i] = i; })); @@ -1216,7 +1221,7 @@ void test_completion_scheduler() } { - auto sender = ex::transfer_just(ex::std_thread_scheduler{}, 42); + auto sender = ex::just(42) | ex::continues_on(ex::std_thread_scheduler{}); auto completion_scheduler = ex::get_completion_scheduler(ex::get_env(sender)); static_assert( @@ -1234,8 +1239,8 @@ void test_completion_scheduler() } { - auto sender = ex::then( - ex::bulk(ex::transfer_just(ex::std_thread_scheduler{}, 42), 10, [](int, int) {}), + auto sender = ex::then(ex::bulk(ex::just(42) | ex::continues_on(ex::std_thread_scheduler{}), + 10, [](int, int) {}), [](int) {}); auto completion_scheduler = ex::get_completion_scheduler(ex::get_env(sender)); @@ -1245,8 +1250,8 @@ void test_completion_scheduler() } { - auto sender = ex::bulk( - ex::then(ex::transfer_just(ex::std_thread_scheduler{}, 42), [](int x) { return x; }), + auto sender = ex::bulk(ex::then(ex::just(42) | ex::continues_on(ex::std_thread_scheduler{}), + [](int x) { return x; }), 10, [](int, int) {}); auto completion_scheduler = ex::get_completion_scheduler(ex::get_env(sender)); @@ -1276,9 +1281,9 @@ int main() test_just_void(); test_just_one_arg(); test_just_two_args(); - test_transfer_just_void(); - test_transfer_just_one_arg(); - test_transfer_just_two_args(); + test_just_continues_on_void(); + test_just_continues_on_one_arg(); + test_just_continues_on_two_args(); test_when_all(); test_when_all_vector(); test_ensure_started(); diff --git a/libs/pika/executors/tests/unit/thread_pool_scheduler.cpp b/libs/pika/executors/tests/unit/thread_pool_scheduler.cpp index 49ab8f0bd..3436b3a5d 100644 --- a/libs/pika/executors/tests/unit/thread_pool_scheduler.cpp +++ b/libs/pika/executors/tests/unit/thread_pool_scheduler.cpp @@ -361,14 +361,14 @@ void test_transfer_basic() }); auto work2 = ex::then( work1, [=, ¤t_id]() { PIKA_TEST_EQ(current_id, pika::this_thread::get_id()); }); - auto transfer1 = ex::transfer(work2, sched); + auto transfer1 = ex::continues_on(work2, sched); auto work3 = ex::then(transfer1, [=, ¤t_id]() { current_id = pika::this_thread::get_id(); PIKA_TEST_NEQ(current_id, parent_id); }); auto work4 = ex::then( work3, [=, ¤t_id]() { PIKA_TEST_EQ(current_id, pika::this_thread::get_id()); }); - auto transfer2 = ex::transfer(work4, sched); + auto transfer2 = ex::continues_on(work4, sched); auto work5 = ex::then(transfer2, [=, ¤t_id]() { current_id = pika::this_thread::get_id(); PIKA_TEST_NEQ(current_id, parent_id); @@ -393,7 +393,7 @@ void test_transfer_arguments() PIKA_TEST_EQ(current_id, pika::this_thread::get_id()); return x / 2.0; }); - auto transfer1 = ex::transfer(work2, sched); + auto transfer1 = ex::continues_on(work2, sched); auto work3 = ex::then(transfer1, [=, ¤t_id](double x) { current_id = pika::this_thread::get_id(); PIKA_TEST_NEQ(current_id, parent_id); @@ -403,7 +403,7 @@ void test_transfer_arguments() PIKA_TEST_EQ(current_id, pika::this_thread::get_id()); return "result: " + std::to_string(x); }); - auto transfer2 = ex::transfer(work4, sched); + auto transfer2 = ex::continues_on(work4, sched); auto work5 = ex::then(transfer2, [=, ¤t_id](std::string s) { current_id = pika::this_thread::get_id(); PIKA_TEST_NEQ(current_id, parent_id); @@ -431,7 +431,7 @@ void test_just_void() pika::thread::id parent_id = pika::this_thread::get_id(); auto begin = ex::just(); - auto transfer1 = ex::transfer(begin, ex::thread_pool_scheduler{}); + auto transfer1 = ex::continues_on(begin, ex::thread_pool_scheduler{}); auto work1 = ex::then( transfer1, [parent_id]() { PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); }); tt::sync_wait(std::move(work1)); @@ -455,7 +455,7 @@ void test_just_one_arg() pika::thread::id parent_id = pika::this_thread::get_id(); auto begin = ex::just(3); - auto transfer1 = ex::transfer(begin, ex::thread_pool_scheduler{}); + auto transfer1 = ex::continues_on(begin, ex::thread_pool_scheduler{}); auto work1 = ex::then(transfer1, [parent_id](int x) { PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); PIKA_TEST_EQ(x, 3); @@ -482,7 +482,7 @@ void test_just_two_args() pika::thread::id parent_id = pika::this_thread::get_id(); auto begin = ex::just(3, std::string("hello")); - auto transfer1 = ex::transfer(begin, ex::thread_pool_scheduler{}); + auto transfer1 = ex::continues_on(begin, ex::thread_pool_scheduler{}); auto work1 = ex::then(transfer1, [parent_id](int x, std::string y) { PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); PIKA_TEST_EQ(x, 3); @@ -492,21 +492,21 @@ void test_just_two_args() } } -void test_transfer_just_void() +void test_just_continues_on_void() { pika::thread::id parent_id = pika::this_thread::get_id(); - auto begin = ex::transfer_just(ex::thread_pool_scheduler{}); + auto begin = ex::just() | ex::continues_on(ex::thread_pool_scheduler{}); auto work1 = ex::then(begin, [parent_id]() { PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); }); tt::sync_wait(std::move(work1)); } -void test_transfer_just_one_arg() +void test_just_continues_on_one_arg() { pika::thread::id parent_id = pika::this_thread::get_id(); - auto begin = ex::transfer_just(ex::thread_pool_scheduler{}, 3); + auto begin = ex::just(3) | ex::continues_on(ex::thread_pool_scheduler{}); auto work1 = ex::then(begin, [parent_id](int x) { PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); PIKA_TEST_EQ(x, 3); @@ -514,11 +514,11 @@ void test_transfer_just_one_arg() tt::sync_wait(std::move(work1)); } -void test_transfer_just_two_args() +void test_just_continues_on_two_args() { pika::thread::id parent_id = pika::this_thread::get_id(); - auto begin = ex::transfer_just(ex::thread_pool_scheduler{}, 3, std::string("hello")); + auto begin = ex::just(3, std::string("hello")) | ex::continues_on(ex::thread_pool_scheduler{}); auto work1 = ex::then(begin, [parent_id](int x, std::string y) { PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); PIKA_TEST_EQ(x, 3); @@ -748,17 +748,18 @@ void test_ensure_started() } { - auto s = ex::transfer_just(sched, 42) | ex::ensure_started(); + auto s = ex::just(42) | ex::continues_on(sched) | ex::ensure_started(); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto s = ex::transfer_just(sched, 42) | ex::ensure_started() | ex::transfer(sched); + auto s = + ex::just(42) | ex::continues_on(sched) | ex::ensure_started() | ex::continues_on(sched); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto s = ex::transfer_just(sched, 42) | ex::ensure_started() | ex::split(); + auto s = ex::just(42) | ex::continues_on(sched) | ex::ensure_started() | ex::split(); PIKA_TEST_EQ(tt::sync_wait(s), 42); PIKA_TEST_EQ(tt::sync_wait(s), 42); PIKA_TEST_EQ(tt::sync_wait(s), 42); @@ -859,11 +860,11 @@ void test_ensure_started_when_all() std::unique_lock l{mtx}; cond.wait(l, [&]() { return started; }); } - auto succ1 = s | ex::transfer(sched) | ex::then([&](int const& x) { + auto succ1 = s | ex::continues_on(sched) | ex::then([&](int const& x) { ++successor_task_calls; return x + 1; }); - auto succ2 = s | ex::transfer(sched) | ex::then([&](int const& x) { + auto succ2 = s | ex::continues_on(sched) | ex::then([&](int const& x) { ++successor_task_calls; return x + 2; }); @@ -884,17 +885,17 @@ void test_split() } { - auto s = ex::transfer_just(sched, 42) | ex::split(); + auto s = ex::just(42) | ex::continues_on(sched) | ex::split(); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto s = ex::transfer_just(sched, 42) | ex::split() | ex::transfer(sched); + auto s = ex::just(42) | ex::continues_on(sched) | ex::split() | ex::continues_on(sched); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto s = ex::transfer_just(sched, 42) | ex::split(); + auto s = ex::just(42) | ex::continues_on(sched) | ex::split(); PIKA_TEST_EQ(tt::sync_wait(s), 42); PIKA_TEST_EQ(tt::sync_wait(s), 42); PIKA_TEST_EQ(tt::sync_wait(s), 42); @@ -954,11 +955,11 @@ void test_split_when_all() ++first_task_calls; return 3; }) | ex::split(); - auto succ1 = s | ex::transfer(sched) | ex::then([&](int const& x) { + auto succ1 = s | ex::continues_on(sched) | ex::then([&](int const& x) { ++successor_task_calls; return x + 1; }); - auto succ2 = s | ex::transfer(sched) | ex::then([&](int const& x) { + auto succ2 = s | ex::continues_on(sched) | ex::then([&](int const& x) { ++successor_task_calls; return x + 2; }); @@ -982,54 +983,57 @@ void test_let_value() } { - auto result = tt::sync_wait( - ex::schedule(sched) | ex::let_value([=]() { return ex::transfer_just(sched, 42); })); + auto result = tt::sync_wait(ex::schedule(sched) | + ex::let_value([=]() { return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } { auto result = tt::sync_wait( - ex::just() | ex::let_value([=]() { return ex::transfer_just(sched, 42); })); + ex::just() | ex::let_value([=]() { return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } // int predecessor, value ignored { - auto result = tt::sync_wait( - ex::transfer_just(sched, 43) | ex::let_value([](int&) { return ex::just(42); })); + auto result = tt::sync_wait(ex::just(43) | ex::continues_on(sched) | + ex::let_value([](int&) { return ex::just(42); })); PIKA_TEST_EQ(result, 42); } { - auto result = tt::sync_wait(ex::transfer_just(sched, 43) | - ex::let_value([=](int&) { return ex::transfer_just(sched, 42); })); + auto result = tt::sync_wait(ex::just(43) | ex::continues_on(sched) | + ex::let_value([=](int&) { return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } { - auto result = tt::sync_wait( - ex::just(43) | ex::let_value([=](int&) { return ex::transfer_just(sched, 42); })); + auto result = tt::sync_wait(ex::just(43) | + ex::let_value([=](int&) { return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } // int predecessor, value used { - auto result = tt::sync_wait(ex::transfer_just(sched, 43) | ex::let_value([](int& x) { - return ex::just(42) | ex::then([&](int y) { return x + y; }); - })); + auto result = + tt::sync_wait(ex::just(43) | ex::continues_on(sched) | ex::let_value([](int& x) { + return ex::just(42) | ex::then([&](int y) { return x + y; }); + })); PIKA_TEST_EQ(result, 85); } { - auto result = tt::sync_wait(ex::transfer_just(sched, 43) | ex::let_value([=](int& x) { - return ex::transfer_just(sched, 42) | ex::then([&](int y) { return x + y; }); - })); + auto result = + tt::sync_wait(ex::just(43) | ex::continues_on(sched) | ex::let_value([=](int& x) { + return ex::just(42) | ex::continues_on(sched) | + ex::then([&](int y) { return x + y; }); + })); PIKA_TEST_EQ(result, 85); } { auto result = tt::sync_wait(ex::just(43) | ex::let_value([=](int& x) { - return ex::transfer_just(sched, 42) | ex::then([&](int y) { return x + y; }); + return ex::just(42) | ex::continues_on(sched) | ex::then([&](int y) { return x + y; }); })); PIKA_TEST_EQ(result, 85); } @@ -1040,7 +1044,7 @@ void test_let_value() try { - tt::sync_wait(ex::transfer_just(sched, 43) | ex::then([](int) -> int { + tt::sync_wait(ex::just(43) | ex::continues_on(sched) | ex::then([](int) -> int { throw std::runtime_error("error"); }) | ex::let_value([](int&) { PIKA_TEST(false); @@ -1095,7 +1099,7 @@ void test_let_error() ex::let_error([=, &called](std::exception_ptr& ep) { called = true; check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched); + return ex::schedule(sched); })); PIKA_TEST(called); } @@ -1106,7 +1110,7 @@ void test_let_error() ex::let_error([=, &called](std::exception_ptr& ep) { called = true; check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched); + return ex::schedule(sched); })); PIKA_TEST(called); } @@ -1127,7 +1131,7 @@ void test_let_error() throw std::runtime_error("error"); }) | ex::let_error([=](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched, 42); + return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } @@ -1137,15 +1141,15 @@ void test_let_error() throw std::runtime_error("error"); }) | ex::let_error([=](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched, 42); + return ex::just(42) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } // predecessor doesn't throw, let sender is ignored { - auto result = - tt::sync_wait(ex::transfer_just(sched, 42) | ex::let_error([](std::exception_ptr) { + auto result = tt::sync_wait( + ex::just(42) | ex::continues_on(sched) | ex::let_error([](std::exception_ptr) { PIKA_TEST(false); return ex::just(43); })); @@ -1153,10 +1157,10 @@ void test_let_error() } { - auto result = - tt::sync_wait(ex::transfer_just(sched, 42) | ex::let_error([=](std::exception_ptr) { + auto result = tt::sync_wait( + ex::just(42) | ex::continues_on(sched) | ex::let_error([=](std::exception_ptr) { PIKA_TEST(false); - return ex::transfer_just(sched, 43); + return ex::just(43) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } @@ -1164,7 +1168,7 @@ void test_let_error() { auto result = tt::sync_wait(ex::just(42) | ex::let_error([=](std::exception_ptr) { PIKA_TEST(false); - return ex::transfer_just(sched, 43); + return ex::just(43) | ex::continues_on(sched); })); PIKA_TEST_EQ(result, 42); } @@ -1235,11 +1239,12 @@ void test_bulk() std::vector v(n, -1); pika::thread::id parent_id = pika::this_thread::get_id(); - auto v_out = tt::sync_wait(ex::transfer_just(ex::thread_pool_scheduler{}, std::move(v)) | - ex::bulk(n, [&parent_id](int i, std::vector& v) { - v[i] = i; - PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); - })); + auto v_out = + tt::sync_wait(ex::just(std::move(v)) | ex::continues_on(ex::thread_pool_scheduler{}) | + ex::bulk(n, [&parent_id](int i, std::vector& v) { + v[i] = i; + PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); + })); for (int i = 0; i < n; ++i) { PIKA_TEST_EQ(v_out[i], i); } } @@ -1287,7 +1292,7 @@ void test_bulk() try { - tt::sync_wait(ex::transfer_just(ex::thread_pool_scheduler{}) | ex::bulk(n, [&v](int i) { + tt::sync_wait(ex::schedule(ex::thread_pool_scheduler{}) | ex::bulk(n, [&v](int i) { if (i == i_fail) { throw std::runtime_error("error"); } v[i] = i; })); @@ -1330,7 +1335,7 @@ void test_completion_scheduler() } { - auto sender = ex::transfer_just(ex::thread_pool_scheduler{}, 42); + auto sender = ex::just(42) | ex::continues_on(ex::thread_pool_scheduler{}); auto completion_scheduler = ex::get_completion_scheduler(ex::get_env(sender)); static_assert( @@ -1348,9 +1353,10 @@ void test_completion_scheduler() } { - auto sender = ex::then( - ex::bulk(ex::transfer_just(ex::thread_pool_scheduler{}, 42), 10, [](int, int) {}), - [](int) {}); + auto sender = + ex::then(ex::bulk(ex::just(42) | ex::continues_on(ex::thread_pool_scheduler{}), 10, + [](int, int) {}), + [](int) {}); auto completion_scheduler = ex::get_completion_scheduler(ex::get_env(sender)); static_assert( @@ -1359,9 +1365,10 @@ void test_completion_scheduler() } { - auto sender = ex::bulk( - ex::then(ex::transfer_just(ex::thread_pool_scheduler{}, 42), [](int x) { return x; }), - 10, [](int, int) {}); + auto sender = + ex::bulk(ex::then(ex::just(42) | ex::continues_on(ex::thread_pool_scheduler{}), + [](int x) { return x; }), + 10, [](int, int) {}); auto completion_scheduler = ex::get_completion_scheduler(ex::get_env(sender)); static_assert( @@ -1380,22 +1387,24 @@ void test_drop_value() } { - tt::sync_wait(ex::drop_value(ex::transfer_just(sched, 3))); - static_assert( - std::is_void_v); + tt::sync_wait(ex::drop_value(ex::just(3) | ex::continues_on(sched))); + static_assert(std::is_void_v); } { - tt::sync_wait(ex::drop_value(ex::transfer_just(sched, std::string("hello")))); + tt::sync_wait(ex::drop_value(ex::just(std::string("hello")) | ex::continues_on(sched))); static_assert(std::is_void_v); + ex::drop_value(ex::just(std::string("hello")) | ex::continues_on(sched))))>); } { - tt::sync_wait(ex::drop_value( - ex::transfer_just(sched, custom_type_non_default_constructible_non_copyable{0}))); - static_assert(std::is_void_v); + tt::sync_wait( + ex::drop_value(ex::just(custom_type_non_default_constructible_non_copyable{0}) | + ex::continues_on(sched))); + static_assert(std::is_void_v); } { @@ -1423,24 +1432,24 @@ void test_split_tuple() ex::thread_pool_scheduler sched{}; { - auto [s] = ex::split_tuple(ex::transfer_just(sched, std::tuple(42))); + auto [s] = ex::split_tuple(ex::just(std::tuple(42)) | ex::continues_on(sched)); PIKA_TEST_EQ(tt::sync_wait(std::move(s)), 42); } { - auto [s1, s2, s3] = - ex::split_tuple(ex::transfer_just(sched, std::tuple(42, std::string{"hello"}, 3.14))); + auto [s1, s2, s3] = ex::split_tuple( + ex::just(std::tuple(42, std::string{"hello"}, 3.14)) | ex::continues_on(sched)); PIKA_TEST_EQ(tt::sync_wait(std::move(s1)), 42); PIKA_TEST_EQ(tt::sync_wait(std::move(s2)), std::string{"hello"}); PIKA_TEST_EQ(tt::sync_wait(std::move(s3)), 3.14); } { - auto [s1, s2, s3] = - ex::split_tuple(ex::transfer_just(sched, std::tuple(42, std::string{"hello"}, 3.14))); - auto s1_transfer = std::move(s1) | ex::transfer(sched); - auto s2_transfer = std::move(s2) | ex::transfer(sched); - auto s3_transfer = std::move(s3) | ex::transfer(sched); + auto [s1, s2, s3] = ex::split_tuple( + ex::just(std::tuple(42, std::string{"hello"}, 3.14)) | ex::continues_on(sched)); + auto s1_transfer = std::move(s1) | ex::continues_on(sched); + auto s2_transfer = std::move(s2) | ex::continues_on(sched); + auto s3_transfer = std::move(s3) | ex::continues_on(sched); PIKA_TEST_EQ(tt::sync_wait(std::move(s1_transfer)), 42); PIKA_TEST_EQ(tt::sync_wait(std::move(s2_transfer)), std::string{"hello"}); PIKA_TEST_EQ(tt::sync_wait(std::move(s3_transfer)), 3.14); @@ -1494,9 +1503,9 @@ int pika_main() test_just_void(); test_just_one_arg(); test_just_two_args(); - test_transfer_just_void(); - test_transfer_just_one_arg(); - test_transfer_just_two_args(); + test_just_continues_on_void(); + test_just_continues_on_one_arg(); + test_just_continues_on_two_args(); test_when_all(); test_when_all_vector(); test_ensure_started(); diff --git a/libs/pika/resource_partitioner/examples/oversubscribing_resource_partitioner.cpp b/libs/pika/resource_partitioner/examples/oversubscribing_resource_partitioner.cpp index bfb127189..5adcd5071 100644 --- a/libs/pika/resource_partitioner/examples/oversubscribing_resource_partitioner.cpp +++ b/libs/pika/resource_partitioner/examples/oversubscribing_resource_partitioner.cpp @@ -110,8 +110,9 @@ int pika_main(/*pika::program_options::variables_map& vm*/) // use scheduler to schedule work on custom pool auto sender = ex::schedule(pool_scheduler) | - ex::then(pika::util::detail::bind_front(do_stuff, 5, true)) | ex::transfer(pool_scheduler) | - ex::then([]() { do_stuff(5, true); }) | ex::transfer(pool_scheduler) | + ex::then(pika::util::detail::bind_front(do_stuff, 5, true)) | + ex::continues_on(pool_scheduler) | ex::then([]() { do_stuff(5, true); }) | + ex::continues_on(pool_scheduler) | ex::then([pool_scheduler, high_priority_scheduler, async_count]() mutable { ex::unique_any_sender<> sender1, sender2; for (std::size_t i = 0; i < async_count; i++) diff --git a/libs/pika/resource_partitioner/examples/simple_resource_partitioner.cpp b/libs/pika/resource_partitioner/examples/simple_resource_partitioner.cpp index 85151f1b8..412e610d4 100644 --- a/libs/pika/resource_partitioner/examples/simple_resource_partitioner.cpp +++ b/libs/pika/resource_partitioner/examples/simple_resource_partitioner.cpp @@ -95,8 +95,9 @@ int pika_main(pika::program_options::variables_map&) // use scheduler to schedule work on custom pool auto sender = ex::schedule(pool_scheduler) | - ex::then(pika::util::detail::bind_front(do_stuff, 5, true)) | ex::transfer(pool_scheduler) | - ex::then([]() { do_stuff(5, true); }) | ex::transfer(pool_scheduler) | + ex::then(pika::util::detail::bind_front(do_stuff, 5, true)) | + ex::continues_on(pool_scheduler) | ex::then([]() { do_stuff(5, true); }) | + ex::continues_on(pool_scheduler) | ex::then([pool_scheduler, high_priority_scheduler, async_count]() mutable { ex::unique_any_sender<> sender1, sender2; for (std::size_t i = 0; i < async_count; i++) diff --git a/libs/pika/resource_partitioner/tests/unit/cross_pool_injection.cpp b/libs/pika/resource_partitioner/tests/unit/cross_pool_injection.cpp index e519f670e..e4e4514ac 100644 --- a/libs/pika/resource_partitioner/tests/unit/cross_pool_injection.cpp +++ b/libs/pika/resource_partitioner/tests/unit/cross_pool_injection.cpp @@ -108,8 +108,8 @@ int pika_main() std::size_t random_pool_2 = st_rand(0, num_pools - 1); auto& sched_1 = HP_schedulers[random_pool_1]; auto& sched_2 = HP_schedulers[random_pool_2]; - ex::start_detached(ex::transfer_just(sched_1, 0) | ex::then(dummy_task) | - ex::transfer(sched_2) | ex::then([=, &counter]() { + ex::start_detached(ex::just(0) | ex::continues_on(sched_1) | ex::then(dummy_task) | + ex::continues_on(sched_2) | ex::then([=, &counter]() { dummy_task(0); --counter; })); @@ -127,8 +127,8 @@ int pika_main() std::size_t random_pool_2 = st_rand(0, num_pools - 1); auto& sched_3 = NP_schedulers[random_pool_1]; auto& sched_4 = NP_schedulers[random_pool_2]; - ex::start_detached(ex::transfer_just(sched_3, 0) | ex::then(dummy_task) | - ex::transfer(sched_4) | ex::then([=, &counter]() { + ex::start_detached(ex::just(0) | ex::continues_on(sched_3) | ex::then(dummy_task) | + ex::continues_on(sched_4) | ex::then([=, &counter]() { dummy_task(0); --counter; })); @@ -146,8 +146,8 @@ int pika_main() std::size_t random_pool_2 = st_rand(0, num_pools - 1); auto& sched_5 = HP_schedulers[random_pool_1]; auto& sched_6 = NP_schedulers[random_pool_2]; - ex::start_detached(ex::transfer_just(sched_5, 0) | ex::then(dummy_task) | - ex::transfer(sched_6) | ex::then([=, &counter]() { + ex::start_detached(ex::just(0) | ex::continues_on(sched_5) | ex::then(dummy_task) | + ex::continues_on(sched_6) | ex::then([=, &counter]() { dummy_task(0); --counter; })); @@ -165,8 +165,8 @@ int pika_main() std::size_t random_pool_2 = st_rand(0, num_pools - 1); auto& sched_7 = NP_schedulers[random_pool_1]; auto& sched_8 = HP_schedulers[random_pool_2]; - ex::start_detached(ex::transfer_just(sched_7, 0) | ex::then(dummy_task) | - ex::transfer(sched_8) | ex::then([=, &counter]() { + ex::start_detached(ex::just(0) | ex::continues_on(sched_7) | ex::then(dummy_task) | + ex::continues_on(sched_8) | ex::then([=, &counter]() { dummy_task(0); --counter; })); @@ -186,7 +186,8 @@ int pika_main() auto& sched_8 = HP_schedulers[random_pool_2]; // random delay up to 5 milliseconds std::size_t delay = st_rand(0, 5); - auto s = ex::transfer_just(sched_7, delay) | ex::then(dummy_task) | ex::ensure_started(); + auto s = ex::just(delay) | ex::continues_on(sched_7) | ex::then(dummy_task) | + ex::ensure_started(); ex::start_detached(ex::schedule(sched_8) | ex::then([s = std::move(s), &counter]() mutable { // if s is not ready then this task will suspend itself in sync_wait tt::sync_wait(std::move(s)); diff --git a/libs/pika/resource_partitioner/tests/unit/named_pool_executor.cpp b/libs/pika/resource_partitioner/tests/unit/named_pool_executor.cpp index 4ac80b03d..4bff404a1 100644 --- a/libs/pika/resource_partitioner/tests/unit/named_pool_executor.cpp +++ b/libs/pika/resource_partitioner/tests/unit/named_pool_executor.cpp @@ -72,8 +72,10 @@ int pika_main() std::vector> lotsa_senders; // use schedulers to schedule work on pools - lotsa_senders.push_back(ex::transfer_just(sched_0_hp, 3, "HP default") | ex::then(dummy_task)); - lotsa_senders.push_back(ex::transfer_just(sched_0, 3, "Normal default") | ex::then(dummy_task)); + lotsa_senders.push_back( + ex::just(3, "HP default") | ex::continues_on(sched_0_hp) | ex::then(dummy_task)); + lotsa_senders.push_back( + ex::just(3, "Normal default") | ex::continues_on(sched_0) | ex::then(dummy_task)); std::vector scheds; std::vector scheds_hp; @@ -93,18 +95,17 @@ int pika_main() { std::string pool_name = "pool-" + std::to_string(i); lotsa_senders.push_back( - ex::transfer_just(scheds[i], 3, pool_name + "normal") | ex::then(dummy_task)); + ex::just(3, pool_name + "normal") | ex::continues_on(scheds[i]) | ex::then(dummy_task)); lotsa_senders.push_back( - ex::transfer_just(scheds_hp[i], 3, pool_name + " HP") | ex::then(dummy_task)); + ex::just(3, pool_name + " HP") | ex::continues_on(scheds_hp[i]) | ex::then(dummy_task)); } // check that the default scheduler still works auto large_stack_scheduler = ex::with_stacksize(ex::thread_pool_scheduler{}, pika::execution::thread_stacksize::large); - lotsa_senders.push_back( - ex::transfer_just(large_stack_scheduler, 3, "true default + large stack") | - ex::then(dummy_task)); + lotsa_senders.push_back(ex::just(3, "true default + large stack") | + ex::continues_on(large_stack_scheduler) | ex::then(dummy_task)); // just wait until everything is done tt::sync_wait(ex::when_all_vector(std::move(lotsa_senders))); diff --git a/libs/pika/resource_partitioner/tests/unit/scheduler_binding_check.cpp b/libs/pika/resource_partitioner/tests/unit/scheduler_binding_check.cpp index 4d45ee111..48805abe8 100644 --- a/libs/pika/resource_partitioner/tests/unit/scheduler_binding_check.cpp +++ b/libs/pika/resource_partitioner/tests/unit/scheduler_binding_check.cpp @@ -73,7 +73,7 @@ void threadLoop() pika::execution::thread_priority::bound), pika::execution::thread_stacksize::default_), pika::execution::thread_schedule_hint(std::int16_t(i % threads))); - tt::sync_wait(ex::transfer_just(sched, i, i % threads) | ex::then(f)); + tt::sync_wait(ex::just(i, i % threads) | ex::continues_on(sched) | ex::then(f)); } do { diff --git a/libs/pika/synchronization/tests/unit/async_rw_mutex.cpp b/libs/pika/synchronization/tests/unit/async_rw_mutex.cpp index e4707cb2b..3331b2248 100644 --- a/libs/pika/synchronization/tests/unit/async_rw_mutex.cpp +++ b/libs/pika/synchronization/tests/unit/async_rw_mutex.cpp @@ -20,11 +20,11 @@ #include using pika::execution::experimental::async_rw_mutex; +using pika::execution::experimental::continues_on; using pika::execution::experimental::execute; using pika::execution::experimental::start_detached; using pika::execution::experimental::then; using pika::execution::experimental::thread_pool_scheduler; -using pika::execution::experimental::transfer; using pika::execution::experimental::when_all; using pika::this_thread::experimental::sync_wait; @@ -174,8 +174,8 @@ void test_multiple_accesses(async_rw_mutex rwm, std::size_t i // Read-only and read-write access return senders of different types using r_sender_type = - std::decay_t; - using rw_sender_type = std::decay_t; + using rw_sender_type = std::decay_t; std::vector r_senders; std::vector rw_senders; @@ -195,13 +195,13 @@ void test_multiple_accesses(async_rw_mutex rwm, std::size_t i { if (readonly) { - r_senders.push_back(rwm.read() | transfer(exec) | + r_senders.push_back(rwm.read() | continues_on(exec) | then(checker{readonly, expected_predecessor_count, count, min_expected_count, max_expected_count})); } else { - rw_senders.push_back(rwm.readwrite() | transfer(exec) | + rw_senders.push_back(rwm.readwrite() | continues_on(exec) | then(checker{readonly, expected_predecessor_count, count, min_expected_count, max_expected_count})); // Only read-write access is allowed to change the value diff --git a/libs/pika/synchronization/tests/unit/barrier.cpp b/libs/pika/synchronization/tests/unit/barrier.cpp index 6426f49ba..da6a7c58e 100644 --- a/libs/pika/synchronization/tests/unit/barrier.cpp +++ b/libs/pika/synchronization/tests/unit/barrier.cpp @@ -52,7 +52,8 @@ void test_barrier_empty_oncomplete() results.reserve(threads); for (std::size_t i = 0; i != threads; ++i) { - results.emplace_back(ex::transfer_just(ex::thread_pool_scheduler{}, std::ref(b)) | + results.emplace_back(ex::just(std::ref(b)) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(local_barrier_test_no_completion) | ex::ensure_started()); } @@ -101,8 +102,9 @@ void test_barrier_oncomplete() results.reserve(threads); for (std::size_t i = 0; i != threads; ++i) { - results.emplace_back(ex::transfer_just(ex::thread_pool_scheduler{}, std::ref(b)) | - ex::then(local_barrier_test) | ex::ensure_started()); + results.emplace_back(ex::just(std::ref(b)) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(local_barrier_test) | + ex::ensure_started()); } b.arrive_and_wait(); // wait for all threads to enter the barrier @@ -146,7 +148,8 @@ void test_barrier_empty_oncomplete_split() results.reserve(threads); for (std::size_t i = 0; i != threads; ++i) { - results.emplace_back(ex::transfer_just(ex::thread_pool_scheduler{}, std::ref(b)) | + results.emplace_back(ex::just(std::ref(b)) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(local_barrier_test_no_completion_split) | ex::ensure_started()); } @@ -190,8 +193,9 @@ void test_barrier_oncomplete_split() results.reserve(threads); for (std::size_t i = 0; i != threads; ++i) { - results.emplace_back(ex::transfer_just(ex::thread_pool_scheduler{}, std::ref(b)) | - ex::then(local_barrier_test_split) | ex::ensure_started()); + results.emplace_back(ex::just(std::ref(b)) | + ex::continues_on(ex::thread_pool_scheduler{}) | ex::then(local_barrier_test_split) | + ex::ensure_started()); } b.arrive_and_wait(); // wait for all threads to enter the barrier diff --git a/libs/pika/synchronization/tests/unit/event.cpp b/libs/pika/synchronization/tests/unit/event.cpp index a8313b2d5..15638b168 100644 --- a/libs/pika/synchronization/tests/unit/event.cpp +++ b/libs/pika/synchronization/tests/unit/event.cpp @@ -66,7 +66,7 @@ int pika_main(variables_map& vm) // Create the threads which will wait on the event for (std::size_t i = 0; i < pxthreads; ++i) { - senders.emplace_back(ex::transfer_just(sched, std::ref(e), std::ref(c)) | + senders.emplace_back(ex::just(std::ref(e), std::ref(c)) | ex::continues_on(sched) | ex::then(local_event_test) | ex::ensure_started()); } diff --git a/libs/pika/synchronization/tests/unit/latch.cpp b/libs/pika/synchronization/tests/unit/latch.cpp index 4517043ac..3e74e3675 100644 --- a/libs/pika/synchronization/tests/unit/latch.cpp +++ b/libs/pika/synchronization/tests/unit/latch.cpp @@ -54,7 +54,7 @@ int pika_main() std::vector> results; for (std::ptrdiff_t i = 0; i != NUM_THREADS; ++i) { - results.emplace_back(ex::transfer_just(sched, std::ref(l)) | + results.emplace_back(ex::just(std::ref(l)) | ex::continues_on(sched) | ex::then(test_arrive_and_wait) | ex::ensure_started()); } @@ -76,7 +76,7 @@ int pika_main() pika::latch l(NUM_THREADS + 1); PIKA_TEST(!l.try_wait()); - auto s = ex::transfer_just(sched, std::ref(l)) | ex::then(test_count_down) | + auto s = ex::just(std::ref(l)) | ex::continues_on(sched) | ex::then(test_count_down) | ex::ensure_started(); PIKA_TEST(!l.try_wait()); @@ -98,7 +98,7 @@ int pika_main() std::vector> results; for (std::ptrdiff_t i = 0; i != NUM_THREADS; ++i) { - results.emplace_back(ex::transfer_just(sched, std::ref(l)) | + results.emplace_back(ex::just(std::ref(l)) | ex::continues_on(sched) | ex::then(test_arrive_and_wait) | ex::ensure_started()); } diff --git a/libs/pika/synchronization/tests/unit/sliding_semaphore.cpp b/libs/pika/synchronization/tests/unit/sliding_semaphore.cpp index 9c6f91b3a..531c55519 100644 --- a/libs/pika/synchronization/tests/unit/sliding_semaphore.cpp +++ b/libs/pika/synchronization/tests/unit/sliding_semaphore.cpp @@ -44,8 +44,8 @@ int pika_main() for (std::size_t i = 0; i != num_tasks; ++i) { - senders.emplace_back( - ex::transfer_just(sched, std::ref(sem)) | ex::then(worker) | ex::ensure_started()); + senders.emplace_back(ex::just(std::ref(sem)) | ex::continues_on(sched) | ex::then(worker) | + ex::ensure_started()); } sem.wait(initial_count + num_tasks); diff --git a/libs/pika/threading_base/tests/unit/annotation_check_senders.cpp b/libs/pika/threading_base/tests/unit/annotation_check_senders.cpp index 887f9e971..9f0908ae4 100644 --- a/libs/pika/threading_base/tests/unit/annotation_check_senders.cpp +++ b/libs/pika/threading_base/tests/unit/annotation_check_senders.cpp @@ -64,15 +64,15 @@ auto test_senders_schedule_no_parent_annotation() ex::then(pika::annotated_function([] {}, "2-schedule-no-parent-D")), // 2 x - ex::schedule(ex::thread_pool_scheduler{}) | ex::transfer(ex::thread_pool_scheduler{}), + ex::schedule(ex::thread_pool_scheduler{}) | ex::continues_on(ex::thread_pool_scheduler{}), // 2-schedule-no-parent-E and ex::schedule(ex::with_annotation(ex::thread_pool_scheduler{}, "2-schedule-no-parent-E")) | - ex::transfer(ex::thread_pool_scheduler{}), + ex::continues_on(ex::thread_pool_scheduler{}), // 2-schedule-no-parent-F and ex::schedule(ex::thread_pool_scheduler{}) | - ex::transfer( + ex::continues_on( ex::with_annotation(ex::thread_pool_scheduler{}, "2-schedule-no-parent-F")), // 2 x @@ -117,16 +117,16 @@ auto test_senders_schedule_parent_annotation() // 2 x 3-schedule-parent ex::schedule(ex::thread_pool_scheduler{}) | - ex::transfer(ex::thread_pool_scheduler{}), + ex::continues_on(ex::thread_pool_scheduler{}), // 3-schedule-parent-E and 3-schedule-parent ex::schedule( ex::with_annotation(ex::thread_pool_scheduler{}, "3-schedule-parent-E")) | - ex::transfer(ex::thread_pool_scheduler{}), + ex::continues_on(ex::thread_pool_scheduler{}), // 3-schedule-parent-F and 3-schedule-parent ex::schedule(ex::thread_pool_scheduler{}) | - ex::transfer( + ex::continues_on( ex::with_annotation(ex::thread_pool_scheduler{}, "3-schedule-parent-F")), // 2 x 3-schedule-parent diff --git a/tests/performance/local/async_overheads.cpp b/tests/performance/local/async_overheads.cpp index d6c49a5fd..463acab8f 100644 --- a/tests/performance/local/async_overheads.cpp +++ b/tests/performance/local/async_overheads.cpp @@ -102,8 +102,8 @@ int pika_main(pika::program_options::variables_map& vm) { auto start = std::chrono::high_resolution_clock::now(); - tt::sync_wait( - ex::transfer_just(ex::thread_pool_scheduler{}, num_tasks) | ex::then(spawn_level)); + tt::sync_wait(ex::just(num_tasks) | ex::continues_on(ex::thread_pool_scheduler{}) | + ex::then(spawn_level)); auto end = std::chrono::high_resolution_clock::now(); diff --git a/tests/performance/local/condition_variable_overhead.cpp b/tests/performance/local/condition_variable_overhead.cpp index e549cbf65..878f27d80 100644 --- a/tests/performance/local/condition_variable_overhead.cpp +++ b/tests/performance/local/condition_variable_overhead.cpp @@ -142,15 +142,17 @@ void test_cv(Scheduler&& sched, std::uint64_t loops) for (int i = 0; i < N; i++) { // thread A - auto s1 = ex::transfer_just(sched, &task_data_A[i], &task_data_B[i]) // - | ex::then([&, loops](task_data* tda, task_data* tdb) { // + auto s1 = ex::just(&task_data_A[i], &task_data_B[i]) // + | ex::continues_on(sched) // + | ex::then([&, loops](task_data* tda, task_data* tdb) { // function_A(loops, tda, tdb); }); senders.push_back(std::move(s1)); // thread B - auto s2 = ex::transfer_just(sched, &task_data_A[i], &task_data_B[i]) // - | ex::then([&, loops](task_data* tda, task_data* tdb) { // + auto s2 = ex::just(&task_data_A[i], &task_data_B[i]) // + | ex::continues_on(sched) // + | ex::then([&, loops](task_data* tda, task_data* tdb) { // function_B(loops, tda, tdb); }); senders.push_back(std::move(s2)); diff --git a/tests/performance/local/task_size.cpp b/tests/performance/local/task_size.cpp index 49e181bb7..7204bb5e3 100644 --- a/tests/performance/local/task_size.cpp +++ b/tests/performance/local/task_size.cpp @@ -189,7 +189,7 @@ double do_work_bulk( ex::unique_any_sender<> sender{ex::just()}; for (std::uint64_t i = 0; i < tasks_per_thread; ++i) { - sender = std::move(sender) | ex::transfer(sched) | ex::bulk(num_threads, work); + sender = std::move(sender) | ex::continues_on(sched) | ex::bulk(num_threads, work); } double const spawn_time_s = timer.elapsed(); diff --git a/tests/regressions/threads/thread_rescheduling.cpp b/tests/regressions/threads/thread_rescheduling.cpp index 238b7e462..ca305e4b2 100644 --- a/tests/regressions/threads/thread_rescheduling.cpp +++ b/tests/regressions/threads/thread_rescheduling.cpp @@ -90,14 +90,14 @@ void tree_boot(std::uint64_t count, std::uint64_t grain_size, thread_id_type thr ex::thread_pool_scheduler sched{}; for (std::uint64_t i = 0; i < children; ++i) { - senders.emplace_back(ex::transfer_just(sched, child_count, grain_size, thread) | + senders.emplace_back(ex::just(child_count, grain_size, thread) | ex::continues_on(sched) | ex::then(tree_boot) | ex::ensure_started()); } for (std::uint64_t i = 0; i < actors; ++i) { - senders.emplace_back(ex::transfer_just(sched, thread) | ex::then(change_thread_state) | - ex::ensure_started()); + senders.emplace_back(ex::just(thread) | ex::continues_on(sched) | + ex::then(change_thread_state) | ex::ensure_started()); } detail::wait(std::move(senders)); @@ -136,14 +136,14 @@ int pika_main(variables_map& vm) ex::thread_pool_scheduler sched{}; // Flood the queues with set_thread_state operations before the rescheduling attempt. - auto before = ex::transfer_just(sched, tasks, grain_size, thread_id.noref()) | + auto before = ex::just(tasks, grain_size, thread_id.noref()) | ex::continues_on(sched) | ex::then(tree_boot) | ex::ensure_started(); set_thread_state(thread_id.noref(), pika::threads::detail::thread_schedule_state::pending, pika::threads::detail::thread_restart_state::signaled); // Flood the queues with set_thread_state operations after the rescheduling attempt. - auto after = ex::transfer_just(sched, tasks, grain_size, thread_id.noref()) | + auto after = ex::just(tasks, grain_size, thread_id.noref()) | ex::continues_on(sched) | ex::then(tree_boot) | ex::ensure_started(); tt::sync_wait(std::move(before));