diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ContextPropagationWithScopesTest.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ContextPropagationWithScopesTest.java index 87c375d0f4..e26c144b43 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ContextPropagationWithScopesTest.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ContextPropagationWithScopesTest.java @@ -28,6 +28,9 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.scopedvalue.ScopeHolder; +import reactor.core.publisher.scopedvalue.ScopedValue; +import reactor.core.publisher.scopedvalue.ScopedValueThreadLocalAccessor; import reactor.core.scheduler.Schedulers; import reactor.util.annotation.Nullable; import reactor.util.context.Context; @@ -39,8 +42,6 @@ */ class ContextPropagationWithScopesTest { - private static final Logger log = LoggerFactory.getLogger(ContextPropagationWithScopesTest.class); - @BeforeAll static void initializeThreadLocalAccessors() { ContextRegistry globalRegistry = ContextRegistry.getInstance(); @@ -52,11 +53,9 @@ void enableHook() { Hooks.enableAutomaticContextPropagation(); } - //the cleanup of "thread locals" could be especially important if one starts relying on - //the global registry in tests: it would ensure no TL pollution. @AfterEach void cleanupThreadLocals() { - ScopedValue.VALUE_IN_SCOPE.remove(); + ScopeHolder.remove(); Hooks.disableAutomaticContextPropagation(); } @@ -66,122 +65,16 @@ static void removeThreadLocalAccessors() { globalRegistry.removeThreadLocalAccessor(ScopedValueThreadLocalAccessor.KEY); } - @Test - void basicScopeWorks() { - assertThat(ScopedValue.getCurrent()).isNull(); - - ScopedValue scopedValue = ScopedValue.create("hello"); - try (ScopedValue.Scope scope = scopedValue.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(scopedValue); - } - - assertThat(ScopedValue.getCurrent()).isNull(); - } - - @Test - void scopeWorksInAnotherThread() throws Exception { - AtomicReference valueInNewThread = new AtomicReference<>(); - ContextSnapshotFactory snapshotFactory = ContextSnapshotFactory.builder().build(); - ScopedValue scopedValue = ScopedValue.create("hello"); - - assertThat(ScopedValue.getCurrent()).isNull(); - - try (ScopedValue.Scope scope = scopedValue.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(scopedValue); - Runnable wrapped = snapshotFactory.captureAll().wrap(() -> valueInNewThread.set(ScopedValue.getCurrent())); - Thread t = new Thread(wrapped); - t.start(); - t.join(); - } - - assertThat(valueInNewThread.get()).isEqualTo(scopedValue); - assertThat(ScopedValue.getCurrent()).isNull(); - } - - @Test - void emptyScopeWorks() { - assertThat(ScopedValue.getCurrent()).isNull(); - - ScopedValue scopedValue = ScopedValue.create("hello"); - try (ScopedValue.Scope scope = scopedValue.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(scopedValue); - try (ScopedValue.Scope emptyScope = ScopedValue.nullValue().open()) { - assertThat(ScopedValue.getCurrent()).isInstanceOf(NullScopedValue.class); - assertThat(ScopedValue.getCurrent().get()).isNull(); - } - assertThat(ScopedValue.getCurrent()).isEqualTo(scopedValue); - } - - assertThat(ScopedValue.getCurrent()).isNull(); - } - - @Test - void nestedScopeWorksInAnotherThread() throws Exception { - AtomicReference value1InNewThreadBefore = new AtomicReference<>(); - AtomicReference value1InNewThreadAfter = new AtomicReference<>(); - AtomicReference value2InNewThread = new AtomicReference<>(); - - ContextSnapshotFactory snapshotFactory = ContextSnapshotFactory.builder().build(); - - ScopedValue v1 = ScopedValue.create("val1"); - ScopedValue v2 = ScopedValue.create("val2"); - - assertThat(ScopedValue.getCurrent()).isNull(); - - Thread t; - - try (ScopedValue.Scope v1Scope = v1.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - try (ScopedValue.Scope v2scope1T1 = v2.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - try (ScopedValue.Scope v2scope2T1 = v2.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - Runnable runnable = () -> { - value1InNewThreadBefore.set(ScopedValue.getCurrent()); - try (ScopedValue.Scope v2scopeT2 = v2.open()) { - value2InNewThread.set(ScopedValue.getCurrent()); - } - value1InNewThreadAfter.set(ScopedValue.getCurrent()); - }; - - Runnable wrapped = snapshotFactory.captureAll().wrap(runnable); - t = new Thread(wrapped); - t.start(); - - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v2scope2T1); - } - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v2scope1T1); - } - - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - - try (ScopedValue.Scope childScope3 = v2.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(childScope3); - } - - t.join(); - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - } - - assertThat(value1InNewThreadBefore.get()).isEqualTo(v2); - assertThat(value1InNewThreadAfter.get()).isEqualTo(v2); - assertThat(value2InNewThread.get()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent()).isNull(); - } - @Test void basicMonoWorks() { ScopedValue scopedValue = ScopedValue.create("hello"); Mono.just("item") - .doOnNext(item -> assertThat(ScopedValue.getCurrent()).isEqualTo(scopedValue)) + .doOnNext(item -> assertThat(ScopeHolder.currentValue()).isEqualTo(scopedValue)) .contextWrite(Context.of(ScopedValueThreadLocalAccessor.KEY, scopedValue)) .block(); - assertThat(ScopedValue.getCurrent()).isNull(); + assertThat(ScopeHolder.currentValue()).isNull(); } @Test @@ -189,28 +82,28 @@ void basicFluxWorks() { ScopedValue scopedValue = ScopedValue.create("hello"); Flux.just("item") - .doOnNext(item -> assertThat(ScopedValue.getCurrent()).isEqualTo(scopedValue)) + .doOnNext(item -> assertThat(ScopeHolder.currentValue()).isEqualTo(scopedValue)) .contextWrite(Context.of(ScopedValueThreadLocalAccessor.KEY, scopedValue)) .blockLast(); - assertThat(ScopedValue.getCurrent()).isNull(); + assertThat(ScopeHolder.currentValue()).isNull(); } @Test void emptyContextWorksInMono() { ScopedValue scopedValue = ScopedValue.create("hello"); try (ScopedValue.Scope scope = scopedValue.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(scopedValue); + assertThat(ScopeHolder.currentValue()).isEqualTo(scopedValue); Mono.just("item") - .doOnNext(item -> assertThat(ScopedValue.getCurrent()).isInstanceOf(NullScopedValue.class)) + .doOnNext(item -> assertThat(ScopeHolder.currentValue().get()).isNull()) .contextWrite(ctx -> Context.empty()) .block(); - assertThat(ScopedValue.getCurrent()).isEqualTo(scopedValue); + assertThat(ScopeHolder.currentValue()).isEqualTo(scopedValue); } - assertThat(ScopedValue.getCurrent()).isNull(); + assertThat(ScopeHolder.currentValue()).isNull(); } @Test @@ -221,21 +114,21 @@ void subscribeMonoElsewhere() { ScopedValue internalValue = ScopedValue.create("inside"); try (ScopedValue.Scope scope = externalValue.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(externalValue); + assertThat(ScopeHolder.currentValue()).isEqualTo(externalValue); Mono.just(1) .subscribeOn(Schedulers.single()) .doOnNext(i -> { - valueInNewThread.set(ScopedValue.getCurrent()); + valueInNewThread.set(ScopeHolder.currentValue()); }) .contextWrite(Context.of(ScopedValueThreadLocalAccessor.KEY, internalValue)) .block(); assertThat(valueInNewThread.get()).isEqualTo(internalValue); - assertThat(ScopedValue.getCurrent()).isEqualTo(externalValue); + assertThat(ScopeHolder.currentValue()).isEqualTo(externalValue); } - assertThat(ScopedValue.getCurrent()).isEqualTo(null); + assertThat(ScopeHolder.currentValue()).isEqualTo(null); } @Test @@ -246,55 +139,21 @@ void subscribeFluxElsewhere() { ScopedValue internalValue = ScopedValue.create("inside"); try (ScopedValue.Scope scope = externalValue.open()) { - assertThat(ScopedValue.getCurrent()).isEqualTo(externalValue); + assertThat(ScopeHolder.currentValue()).isEqualTo(externalValue); Flux.just(1) .subscribeOn(Schedulers.single()) .doOnNext(i -> { - valueInNewThread.set(ScopedValue.getCurrent()); + valueInNewThread.set(ScopeHolder.currentValue()); }) .contextWrite(Context.of(ScopedValueThreadLocalAccessor.KEY, internalValue)) .blockLast(); assertThat(valueInNewThread.get()).isEqualTo(internalValue); - assertThat(ScopedValue.getCurrent()).isEqualTo(externalValue); + assertThat(ScopeHolder.currentValue()).isEqualTo(externalValue); } - assertThat(ScopedValue.getCurrent()).isEqualTo(null); - } - - @Test - void multiLevelScopesWithDifferentValues() { - ScopedValue v1 = ScopedValue.create("val1"); - ScopedValue v2 = ScopedValue.create("val2"); - - try (ScopedValue.Scope v1scope1 = v1.open()) { - try (ScopedValue.Scope v1scope2 = v1.open()) { - try (ScopedValue.Scope v2scope1 = v2.open()) { - try (ScopedValue.Scope v2scope2 = v2.open()) { - try (ScopedValue.Scope v1scope3 = v1.open()) { - try (ScopedValue.Scope nullScope = - ScopedValue.nullValue().open()) { - assertThat(ScopedValue.getCurrent()).isInstanceOf(NullScopedValue.class); - assertThat(ScopedValue.getCurrent().get()).isNull(); - } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope3); - } - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v2scope2); - } - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v2scope1); - } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope2); - } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope1); - } - - assertThat(ScopedValue.getCurrent()).isNull(); + assertThat(ScopeHolder.currentValue()).isEqualTo(null); } @Test @@ -311,36 +170,35 @@ void multiLevelScopesWithDifferentValuesAndFlux() { try (ScopedValue.Scope v1scope3 = v1.open()) { try (ScopedValue.Scope nullScope = ScopedValue.nullValue().open()) { - assertThat(ScopedValue.getCurrent()).isInstanceOf(NullScopedValue.class); - assertThat(ScopedValue.getCurrent().get()).isNull(); + assertThat(ScopeHolder.currentValue().get()).isNull(); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope3); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope3); Flux.just(1) .flatMap(i -> Flux.just(i) .publishOn(Schedulers.boundedElastic()) - .doOnNext(item -> valueInsideFlatMap.set(ScopedValue.getCurrent()))) + .doOnNext(item -> valueInsideFlatMap.set(ScopeHolder.currentValue()))) .blockLast(); - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope3); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope3); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v2scope2); + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(v2scope2); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v2scope1); + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(v2scope1); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope2); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope2); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope1); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope1); } - assertThat(ScopedValue.getCurrent()).isNull(); + assertThat(ScopeHolder.currentValue()).isNull(); assertThat(valueInsideFlatMap.get()).isEqualTo(v1); } @@ -359,197 +217,36 @@ void multiLevelScopesWithDifferentValuesAndMono() { try (ScopedValue.Scope v1scope3 = v1.open()) { try (ScopedValue.Scope nullScope = ScopedValue.nullValue().open()) { - assertThat(ScopedValue.getCurrent()).isInstanceOf(NullScopedValue.class); - assertThat(ScopedValue.getCurrent().get()).isNull(); + assertThat(ScopeHolder.currentValue().get()).isNull(); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope3); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope3); Mono.just(1) .flatMap(i -> Mono.just(i) .publishOn(Schedulers.boundedElastic()) - .doOnNext(item -> valueInsideFlatMap.set(ScopedValue.getCurrent()))) + .doOnNext(item -> valueInsideFlatMap.set(ScopeHolder.currentValue()))) .block(); - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope3); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope3); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v2scope2); + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(v2scope2); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v2); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v2scope1); + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(v2scope1); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope2); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope2); } - assertThat(ScopedValue.getCurrent()).isEqualTo(v1); - assertThat(ScopedValue.getCurrent().currentScope()).isEqualTo(v1scope1); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope1); } - assertThat(ScopedValue.getCurrent()).isNull(); + assertThat(ScopeHolder.currentValue()).isNull(); assertThat(valueInsideFlatMap.get()).isEqualTo(v1); } - - private static class ScopedValueThreadLocalAccessor implements ThreadLocalAccessor { - - static final String KEY = "svtla"; - - @Override - public Object key() { - return KEY; - } - - @Override - public ScopedValue getValue() { - return ScopedValue.VALUE_IN_SCOPE.get(); - } - - @Override - public void setValue(ScopedValue value) { - value.open(); - } - - @Override - public void setValue() { - ScopedValue.nullValue().open(); - } - - @Override - public void restore(ScopedValue previousValue) { - ScopedValue current = ScopedValue.VALUE_IN_SCOPE.get(); - if (current != null) { - ScopedValue.Scope currentScope = current.currentScope(); - if (currentScope == null || currentScope.parentScope == null || - currentScope.parentScope.scopedValue != previousValue) { - throw new RuntimeException("Restoring to a different previous scope than expected!"); - } - currentScope.close(); - } else { - throw new RuntimeException("Restoring to previous scope, but current is missing."); - } - } - - @Override - public void restore() { - ScopedValue current = ScopedValue.VALUE_IN_SCOPE.get(); - if (current != null) { - ScopedValue.Scope currentScope = current.currentScope(); - if (currentScope == null) { - throw new RuntimeException("Can't close current scope, as it is missing"); - } - currentScope.close(); - } else { - throw new RuntimeException("Restoring to previous scope, but current is missing."); - } - } - } - private interface ScopedValue { - - ThreadLocal VALUE_IN_SCOPE = new ThreadLocal<>(); - - @Nullable - static ScopedValue getCurrent() { - return VALUE_IN_SCOPE.get(); - } - - static ScopedValue create(String value) { - return new SimpleScopedValue(value); - } - - static ScopedValue nullValue() { - return new NullScopedValue(); - } - - @Nullable - String get(); - - @Nullable - Scope currentScope(); - - void updateCurrentScope(Scope scope); - - Scope open(); - - class Scope implements AutoCloseable { - - private final ScopedValue scopedValue; - - @Nullable - private final Scope parentScope; - - public Scope(ScopedValue scopedValue) { - log.info("{}: open scope[{}]", scopedValue.get(), hashCode()); - this.scopedValue = scopedValue; - - ScopedValue currentValue = ScopedValue.VALUE_IN_SCOPE.get(); - this.parentScope = currentValue != null ? currentValue.currentScope() : null; - - ScopedValue.VALUE_IN_SCOPE.set(scopedValue); - } - - @Override - public void close() { - if (parentScope == null) { - log.info("{}: remove scope[{}]", scopedValue.get(), hashCode()); - ScopedValue.VALUE_IN_SCOPE.remove(); - } else { - log.info("{}: close scope[{}] -> restore {} scope[{}]", - scopedValue.get(), hashCode(), - parentScope.scopedValue.get(), parentScope.hashCode()); - parentScope.scopedValue.updateCurrentScope(parentScope); - ScopedValue.VALUE_IN_SCOPE.set(parentScope.scopedValue); - } - } - } - } - - private static class SimpleScopedValue implements ScopedValue { - - private final String value; - - ThreadLocal currentScope = new ThreadLocal<>(); - - private SimpleScopedValue(String value) { - this.value = value; - } - - @Override - public String get() { - return value; - } - - @Override - public Scope currentScope() { - return currentScope.get(); - } - - @Override - public Scope open() { - Scope scope = new Scope(this); - this.currentScope.set(scope); - return scope; - } - - @Override - public void updateCurrentScope(Scope scope) { - log.info("{} update scope[{}] -> scope[{}]", - value, currentScope.get().hashCode(), scope.hashCode()); - this.currentScope.set(scope); - } - } - - private static class NullScopedValue extends SimpleScopedValue { - - public NullScopedValue() { - super("null"); - } - - @Override - public String get() { - return null; - } - } } diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/NullScopedValue.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/NullScopedValue.java new file mode 100644 index 0000000000..891ab2adfc --- /dev/null +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/NullScopedValue.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.core.publisher.scopedvalue; + +// NOTE: This is a copy from the context-propagation library. Any changes should be +// considered in the upstream first. Please keep in sync. + +/** + * Implementation of {@link ScopedValue} which is used when the actual value in scope + * should be missing, but the scope hierarchy needs to be maintained. + */ +class NullScopedValue extends SimpleScopedValue { + + NullScopedValue() { + super("null"); + } + + @Override + public String get() { + return null; + } + +} diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopeHolder.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopeHolder.java new file mode 100644 index 0000000000..2f87d922cd --- /dev/null +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopeHolder.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher.scopedvalue; + +import org.assertj.core.util.VisibleForTesting; + +// NOTE: This is a copy from the context-propagation library. Any changes should be +// considered in the upstream first. Please keep in sync. + +/** + * Thread-local storage for the current value in scope for the current Thread. + */ +public class ScopeHolder { + + private static final ThreadLocal SCOPE = new ThreadLocal<>(); + + public static ScopedValue currentValue() { + ScopedValue.Scope scope = SCOPE.get(); + return scope == null ? null : scope.scopedValue; + } + + public static ScopedValue.Scope get() { + return SCOPE.get(); + } + + static void set(ScopedValue.Scope scope) { + SCOPE.set(scope); + } + + @VisibleForTesting + public static void remove() { + SCOPE.remove(); + } + +} diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValue.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValue.java new file mode 100644 index 0000000000..299adc17c4 --- /dev/null +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValue.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher.scopedvalue; + +import java.util.logging.Logger; + +import static java.util.logging.Level.INFO; + +// NOTE: This is a copy from the context-propagation library. Any changes should be +// considered in the upstream first. Please keep in sync. + +/** + * Serves as an abstraction of a value which can be in the current Thread-local scope. + * + * @author Dariusz Jędrzejczyk + */ +public interface ScopedValue { + + /** + * Creates a new instance, which can be set in scope via {@link #open()}. + * @param value {@code String} value associated with created {@link ScopedValue} + * @return new instance + */ + static ScopedValue create(String value) { + return new SimpleScopedValue(value); + } + + /** + * Creates a dummy instance used for nested scopes, in which the value should be + * virtually absent, but allows reverting to the previous value in scope. + * @return new instance representing an empty scope + */ + static ScopedValue nullValue() { + return new NullScopedValue(); + } + + /** + * {@code String} value associated with this instance. + * @return associated value + */ + String get(); + + /** + * Create a new scope and set the value for this Thread. + * @return newly created {@link Scope} + */ + Scope open(); + + /** + * Represents a scope in which a {@link ScopedValue} is set for a particular Thread + * and maintains a hierarchy between this instance and the parent. + */ + class Scope implements AutoCloseable { + + private static final Logger log = Logger.getLogger(Scope.class.getName()); + + final ScopedValue scopedValue; + + final Scope parentScope; + + Scope(ScopedValue scopedValue) { + log.log(INFO, () -> String.format("%s: open scope[%s]", scopedValue.get(), hashCode())); + this.scopedValue = scopedValue; + + Scope currentScope = ScopeHolder.get(); + this.parentScope = currentScope; + ScopeHolder.set(this); + } + + @Override + public void close() { + if (parentScope == null) { + log.log(INFO, () -> String.format("%s: remove scope[%s]", scopedValue.get(), hashCode())); + ScopeHolder.remove(); + } + else { + log.log(INFO, () -> String.format("%s: close scope[%s] -> restore %s scope[%s]", scopedValue.get(), + hashCode(), parentScope.scopedValue.get(), parentScope.hashCode())); + ScopeHolder.set(parentScope); + } + } + + } + +} diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValueTest.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValueTest.java new file mode 100644 index 0000000000..b073397f2f --- /dev/null +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValueTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher.scopedvalue; + +import java.util.concurrent.atomic.AtomicReference; + +import io.micrometer.context.ContextRegistry; +import io.micrometer.context.ContextSnapshotFactory; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +// NOTE: These tests are a copy from the context-propagation library. Any changes should +// be considered in the upstream first. Please keep in sync. + +public class ScopedValueTest { + + @BeforeAll + static void initializeThreadLocalAccessors() { + ContextRegistry globalRegistry = ContextRegistry.getInstance(); + globalRegistry.registerThreadLocalAccessor(new ScopedValueThreadLocalAccessor()); + } + + @AfterEach + void cleanupThreadLocals() { + ScopeHolder.remove(); + } + + @AfterAll + static void removeThreadLocalAccessors() { + ContextRegistry globalRegistry = ContextRegistry.getInstance(); + globalRegistry.removeThreadLocalAccessor(ScopedValueThreadLocalAccessor.KEY); + } + + @Test + void basicScopeWorks() { + assertThat(ScopeHolder.currentValue()).isNull(); + + ScopedValue scopedValue = ScopedValue.create("hello"); + try (ScopedValue.Scope scope = scopedValue.open()) { + assertThat(ScopeHolder.currentValue()).isEqualTo(scopedValue); + } + + assertThat(ScopeHolder.currentValue()).isNull(); + } + + @Test + void emptyScopeWorks() { + assertThat(ScopeHolder.currentValue()).isNull(); + + ScopedValue scopedValue = ScopedValue.create("hello"); + try (ScopedValue.Scope scope = scopedValue.open()) { + assertThat(ScopeHolder.currentValue()).isEqualTo(scopedValue); + try (ScopedValue.Scope emptyScope = ScopedValue.nullValue().open()) { + assertThat(ScopeHolder.currentValue().get()).isNull(); + } + assertThat(ScopeHolder.currentValue()).isEqualTo(scopedValue); + } + + assertThat(ScopeHolder.currentValue()).isNull(); + } + + @Test + void scopeWorksInAnotherThread() throws Exception { + AtomicReference valueInNewThread = new AtomicReference<>(); + ContextSnapshotFactory snapshotFactory = ContextSnapshotFactory.builder().build(); + ScopedValue scopedValue = ScopedValue.create("hello"); + + assertThat(ScopeHolder.currentValue()).isNull(); + + try (ScopedValue.Scope scope = scopedValue.open()) { + assertThat(ScopeHolder.currentValue()).isEqualTo(scopedValue); + Runnable wrapped = snapshotFactory.captureAll().wrap(() -> valueInNewThread.set(ScopeHolder.currentValue())); + Thread t = new Thread(wrapped); + t.start(); + t.join(); + } + + assertThat(valueInNewThread.get()).isEqualTo(scopedValue); + assertThat(ScopeHolder.currentValue()).isNull(); + } + + @Test + void multiLevelScopesWithDifferentValues() { + ScopedValue v1 = ScopedValue.create("val1"); + ScopedValue v2 = ScopedValue.create("val2"); + + try (ScopedValue.Scope v1scope1 = v1.open()) { + try (ScopedValue.Scope v1scope2 = v1.open()) { + try (ScopedValue.Scope v2scope1 = v2.open()) { + try (ScopedValue.Scope v2scope2 = v2.open()) { + try (ScopedValue.Scope v1scope3 = v1.open()) { + try (ScopedValue.Scope nullScope = ScopedValue.nullValue().open()) { + assertThat(ScopeHolder.currentValue().get()).isNull(); + } + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope3); + } + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(v2scope2); + } + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(v2scope1); + } + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope2); + } + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + assertThat(ScopeHolder.get()).isEqualTo(v1scope1); + } + + assertThat(ScopeHolder.currentValue()).isNull(); + } + + @Test + void nestedScopeWorksInAnotherThread() throws Exception { + AtomicReference value1InNewThreadBefore = new AtomicReference<>(); + AtomicReference value1InNewThreadAfter = new AtomicReference<>(); + AtomicReference value2InNewThread = new AtomicReference<>(); + + ContextSnapshotFactory snapshotFactory = ContextSnapshotFactory.builder().build(); + + ScopedValue v1 = ScopedValue.create("val1"); + ScopedValue v2 = ScopedValue.create("val2"); + + assertThat(ScopeHolder.currentValue()).isNull(); + + Thread t; + + try (ScopedValue.Scope v1Scope = v1.open()) { + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + try (ScopedValue.Scope v2scope1T1 = v2.open()) { + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + try (ScopedValue.Scope v2scope2T1 = v2.open()) { + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + Runnable runnable = () -> { + value1InNewThreadBefore.set(ScopeHolder.currentValue()); + try (ScopedValue.Scope v2scopeT2 = v2.open()) { + value2InNewThread.set(ScopeHolder.currentValue()); + } + value1InNewThreadAfter.set(ScopeHolder.currentValue()); + }; + + Runnable wrapped = snapshotFactory.captureAll().wrap(runnable); + t = new Thread(wrapped); + t.start(); + + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(v2scope2T1); + } + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(v2scope1T1); + } + + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + + try (ScopedValue.Scope childScope3 = v2.open()) { + assertThat(ScopeHolder.currentValue()).isEqualTo(v2); + assertThat(ScopeHolder.get()).isEqualTo(childScope3); + } + + t.join(); + assertThat(ScopeHolder.currentValue()).isEqualTo(v1); + } + + assertThat(value1InNewThreadBefore.get()).isEqualTo(v2); + assertThat(value1InNewThreadAfter.get()).isEqualTo(v2); + assertThat(value2InNewThread.get()).isEqualTo(v2); + assertThat(ScopeHolder.currentValue()).isNull(); + } + +} diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValueThreadLocalAccessor.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValueThreadLocalAccessor.java new file mode 100644 index 0000000000..7afedaf00c --- /dev/null +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/ScopedValueThreadLocalAccessor.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher.scopedvalue; + +import io.micrometer.context.ThreadLocalAccessor; + +// NOTE: This is a copy from the context-propagation library. Any changes should be +// considered in the upstream first. Please keep in sync. + +/** + * Accessor for {@link ScopedValue}. + * + * @author Dariusz Jędrzejczyk + */ +public class ScopedValueThreadLocalAccessor implements ThreadLocalAccessor { + + /** + * The key used for registrations in {@link io.micrometer.context.ContextRegistry}. + */ + public static final String KEY = "svtla"; + + @Override + public Object key() { + return KEY; + } + + @Override + public ScopedValue getValue() { + return ScopeHolder.currentValue(); + } + + @Override + public void setValue(ScopedValue value) { + value.open(); + } + + @Override + public void setValue() { + ScopedValue.nullValue().open(); + } + + @Override + public void restore(ScopedValue previousValue) { + ScopedValue.Scope currentScope = ScopeHolder.get(); + if (currentScope != null) { + if (currentScope.parentScope == null || currentScope.parentScope.scopedValue != previousValue) { + throw new RuntimeException("Restoring to a different previous scope than expected!"); + } + currentScope.close(); + } + else { + throw new RuntimeException("Restoring to previous scope, but current is missing."); + } + } + + @Override + public void restore() { + ScopedValue.Scope currentScope = ScopeHolder.get(); + if (currentScope != null) { + currentScope.close(); + } + else { + throw new RuntimeException("Restoring to previous scope, but current is missing."); + } + } + +} diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/SimpleScopedValue.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/SimpleScopedValue.java new file mode 100644 index 0000000000..a2680d468f --- /dev/null +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/scopedvalue/SimpleScopedValue.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher.scopedvalue; + +// NOTE: This is a copy from the context-propagation library. Any changes should be +// considered in the upstream first. Please keep in sync. + +/** + * Implementation of {@link ScopedValue} for which {@link ScopedValue.Scope} maintains the + * hierarchy between parent scope and an opened scope for this value. + */ +class SimpleScopedValue implements ScopedValue { + + private final String value; + + SimpleScopedValue(String value) { + this.value = value; + } + + @Override + public String get() { + return value; + } + + @Override + public Scope open() { + return new Scope(this); + } + +}