Skip to content

Commit

Permalink
[#3821] PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
spierce committed Aug 20, 2024
1 parent e50b51a commit aac0e9e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 30 deletions.
24 changes: 6 additions & 18 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -2882,10 +2882,10 @@ public final Flux<List<T>> buffer(int maxSize) {
* will be emitted by the returned {@link Flux} each time the given max size is reached
* or once this Flux completes.
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSize.svg" alt="">
* <p>
* Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSize.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal,
* as well as latest unbuffered element if the bufferSupplier fails.
Expand Down Expand Up @@ -2950,6 +2950,10 @@ public final Flux<List<T>> buffer(int maxSize, int skip) {
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSizeEqualsSkipSize.svg" alt="">
*
* <p>
* Note for exact buffers: If buffers provided by the bufferSupplier return {@literal false} upon invocation
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
*
* <p><strong>Discard Support:</strong> This operator discards elements in between buffers (in the case of
* dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal.
* Note however that overlapping buffer variant DOES NOT discard, as this might result in an element
Expand Down Expand Up @@ -3126,10 +3130,6 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down Expand Up @@ -3170,10 +3170,6 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Schedule
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down Expand Up @@ -3241,10 +3237,6 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime,
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand All @@ -3269,10 +3261,6 @@ public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSiz
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -422,14 +421,4 @@ public void discardOnError() {
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void bufferSupplierUsesSet() {
Flux.just(1, 1, 1, 1, 1, 1, 1)
.<Set<Object>>bufferTimeout(3, Duration.ofSeconds(2), HashSet::new)
.as(it -> StepVerifier.create(it, 3))
.expectNext(Collections.singleton(1), Collections.singleton(1), Collections.singleton(1))
.expectComplete()
.verify(Duration.ofSeconds(2));
}
}

0 comments on commit aac0e9e

Please sign in to comment.