Skip to content

Commit

Permalink
introduces automatic loom support
Browse files Browse the repository at this point in the history
Signed-off-by: OlegDokuka <[email protected]>
  • Loading branch information
Oleh Dokuka committed Jun 30, 2023
1 parent 29ca754 commit 86a9c8f
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 6 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/ci-mrj.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,29 @@ jobs:
name: gradle
with:
arguments: :reactor-core:test --no-daemon -Pjunit-tags=!slow
core-java21-fast:
name: core fast java21Tests
runs-on: ubuntu-latest
needs: preliminary
steps:
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8 # tag=v3
- run: ${GITHUB_WORKSPACE}/.github/setup.sh
shell: bash
- uses: actions/setup-java@de1bb2b0c5634f0fc4438d7aa9944e68f9bf86cc # tag=v3
with:
distribution: 'jdkfile'
java-version: 9.0.4
jdkFile: /opt/openjdk/java9/OpenJDK9U-jdk_x64_linux_hotspot_9.0.4_11.tar.gz
- uses: actions/setup-java@de1bb2b0c5634f0fc4438d7aa9944e68f9bf86cc # tag=v3
with:
distribution: 'temurin'
java-version: |
21-ea
8
- uses: gradle/gradle-build-action@3fbe033aaae657f011f88f29be9e65ed26bd29ef # tag=v2
name: gradle
with:
arguments: :reactor-core:java21Test --no-daemon -Pjunit-tags=!slow
core-slow:
name: core slower tests
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ test {
}
}

java21Test {
systemProperty "reactor.schedulers.defaultBoundedElasticOnVirtualThreads", "true"
}

tasks.withType(Test).matching { !(it.name in testing.suites.names) }.configureEach {
def tags = rootProject.findProperty("junit-tags")
if (tags != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.scheduler;

import java.util.function.Supplier;

import reactor.util.Logger;
import reactor.util.Loggers;

import static reactor.core.scheduler.Schedulers.BOUNDED_ELASTIC;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
import static reactor.core.scheduler.Schedulers.newBoundedElastic;

class BoundedElasticSchedulerSupplier implements Supplier<Scheduler> {

static final Logger logger = Loggers.getLogger(BoundedElasticSchedulerSupplier.class);

@Override
public Scheduler get() {
if (DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS) {
logger.warn(
"Virtual Threads support is not available on the given JVM. Fallbacks to default BoundedElastic setup");
}

return newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE,
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
BOUNDED_ELASTIC,
BoundedElasticScheduler.DEFAULT_TTL_SECONDS,
true);
}
}
20 changes: 16 additions & 4 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -104,6 +104,19 @@ public abstract class Schedulers {
.map(Integer::parseInt)
.orElse(100000);

/**
* Default execution of enqueued tasks on {@link Thread#ofVirtual} for the global
* {@link #boundedElastic()} {@link Scheduler},
* initialized by system property {@code reactor.schedulers.defaultBoundedElasticOnVirtualThreads}
* and falls back to false .
*
* @see #boundedElastic()
*/
public static final boolean DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS =
Optional.ofNullable(System.getProperty("reactor.schedulers.defaultBoundedElasticOnVirtualThreads"))
.map(Boolean::parseBoolean)
.orElse(false);

/**
* Create a {@link Scheduler} which uses a backing {@link Executor} to schedule
* Runnables for async operators.
Expand Down Expand Up @@ -1060,6 +1073,7 @@ public void dispose() {

// Internals
static final String BOUNDED_ELASTIC = "boundedElastic"; // Blocking stuff with scale to zero
static final String LOOM_BOUNDED_ELASTIC = "loomBoundedElastic"; // Loom stuff
static final String PARALLEL = "parallel"; //scale up common tasks
static final String SINGLE = "single"; //non blocking tasks
static final String IMMEDIATE = "immediate";
Expand All @@ -1072,9 +1086,7 @@ public void dispose() {
static AtomicReference<CachedScheduler> CACHED_PARALLEL = new AtomicReference<>();
static AtomicReference<CachedScheduler> CACHED_SINGLE = new AtomicReference<>();

static final Supplier<Scheduler> BOUNDED_ELASTIC_SUPPLIER =
() -> newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
BOUNDED_ELASTIC, BoundedElasticScheduler.DEFAULT_TTL_SECONDS, true);
static final Supplier<Scheduler> BOUNDED_ELASTIC_SUPPLIER = new BoundedElasticSchedulerSupplier();

static final Supplier<Scheduler> PARALLEL_SUPPLIER =
() -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.scheduler;

import java.util.function.Supplier;

import static reactor.core.scheduler.Schedulers.BOUNDED_ELASTIC;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
import static reactor.core.scheduler.Schedulers.LOOM_BOUNDED_ELASTIC;
import static reactor.core.scheduler.Schedulers.newBoundedElastic;

class BoundedElasticSchedulerSupplier implements Supplier<Scheduler> {

@Override
public Scheduler get() {
if (DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS) {
return newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE,
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
Thread.ofVirtual()
.name(LOOM_BOUNDED_ELASTIC + "-", 1)
.uncaughtExceptionHandler(Schedulers::defaultUncaughtException)
.factory(),
BoundedElasticScheduler.DEFAULT_TTL_SECONDS);
}
return newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE,
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
BOUNDED_ELASTIC,
BoundedElasticScheduler.DEFAULT_TTL_SECONDS,
true);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -530,7 +530,7 @@ public void fromFuseableUsesThreadBarrier() {
.startsWith("single-");

assertThat(processing.keySet())
.allSatisfy(k -> assertThat(k).startsWith("boundedElastic-"));
.allSatisfy(k -> assertThat(k).containsIgnoringCase("boundedElastic-"));
}

@Test
Expand Down

0 comments on commit 86a9c8f

Please sign in to comment.