diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxFilterFuseableTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxFilterFuseableTest.java index 176411c1fc..cfbded92ec 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxFilterFuseableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxFilterFuseableTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,10 @@ package reactor.core.publisher; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.reactivestreams.Subscription; @@ -27,6 +29,7 @@ import reactor.core.Scannable; import reactor.core.publisher.FluxFilterFuseable.FilterFuseableConditionalSubscriber; import reactor.core.publisher.FluxFilterFuseable.FilterFuseableSubscriber; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.MockUtils; import reactor.test.StepVerifier; @@ -162,26 +165,33 @@ public void discardTryOnNextPredicateMiss() { } @Test - public void discardPollAsyncPredicateFail() { + public void discardPollAsyncPredicateFail() throws InterruptedException { + Scheduler scheduler = Schedulers.newSingle("discardPollAsync"); CountDownLatch latch = new CountDownLatch(10); - StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead - .publishOn(Schedulers.newSingle("discardPollAsync"), 1) - .contextWrite(previous -> { - Consumer previousDiscardHandler = previous.get(Hooks.KEY_ON_DISCARD); - - return Operators.enableOnDiscard(previous, (discarded) -> { - latch.countDown(); - previousDiscardHandler.accept(discarded); - }); - }) - .filter(i -> { throw new IllegalStateException("boom"); }) - ) - .expectFusion(Fuseable.ASYNC) - .expectErrorMessage("boom") - .verifyThenAssertThat() - .hasDiscarded(1) //publishOn also might discard the rest - .hasDiscardedElementsMatching(list -> !list.contains(0)) - .hasDiscardedElementsSatisfying(list -> assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + StepVerifier.Assertions assertions = + Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .publishOn(scheduler, 1) + .filter(i -> { + throw new IllegalStateException("boom"); + }) + .contextWrite(previous -> { + Consumer previousDiscardHandler = previous.get(Hooks.KEY_ON_DISCARD); + + return Operators.enableOnDiscard(previous, (discarded) -> { + previousDiscardHandler.accept(discarded); + latch.countDown(); + }); + }) + .as(StepVerifier::create) + .expectFusion(Fuseable.ASYNC) + .expectErrorMessage("boom") + .verifyThenAssertThat(); + + Assertions.assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertions + .hasDiscarded(1) //publishOn also might discard the rest + .hasDiscardedElementsMatching(list -> !list.contains(0)) + .hasDiscardedElementsSatisfying(list -> assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); } @Test @@ -277,27 +287,34 @@ public void discardConditionalTryOnNextPredicateMiss() { } @Test - public void discardConditionalPollAsyncPredicateFail() { + public void discardConditionalPollAsyncPredicateFail() throws InterruptedException { + Scheduler scheduler = Schedulers.newSingle("discardPollAsync"); CountDownLatch latch = new CountDownLatch(10); - StepVerifier.create(Flux.range(1, 10) //range uses tryOnNext, so let's use just instead - .publishOn(Schedulers.newSingle("discardPollAsync")) - .filter(i -> { throw new IllegalStateException("boom"); }) - .filter(i -> true) - .contextWrite(previous -> { - Consumer previousDiscardHandler = previous.get(Hooks.KEY_ON_DISCARD); - - return Operators.enableOnDiscard(previous, (discarded) -> { - latch.countDown(); - previousDiscardHandler.accept(discarded); - }); - }) - ) - .expectFusion(Fuseable.ASYNC) - .expectErrorMessage("boom") - .verifyThenAssertThat() - .hasDiscarded(1) //publishOn also discards the rest - .hasDiscardedElementsMatching(list -> !list.contains(0)) - .hasDiscardedElementsSatisfying(list -> assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + StepVerifier.Assertions assertions = + Flux.range(1, 10) + .publishOn(scheduler, 1) + .filter(i -> { + throw new IllegalStateException("boom"); + }) + .filter(i -> true) + .contextWrite(previous -> { + Consumer previousDiscardHandler = previous.get(Hooks.KEY_ON_DISCARD); + + return Operators.enableOnDiscard(previous, (discarded) -> { + previousDiscardHandler.accept(discarded); + latch.countDown(); + }); + }) + .as(StepVerifier::create) + .expectFusion(Fuseable.ASYNC) + .expectErrorMessage("boom") + .verifyThenAssertThat(); + + Assertions.assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertions.hasDiscarded(1) //publishOn also discards the rest + .hasDiscardedElementsMatching(list -> !list.contains(0)) + .hasDiscardedElementsSatisfying(list -> assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); } @Test