Skip to content

Commit

Permalink
[doc] Polish #3872
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Aug 20, 2024
1 parent bd04464 commit ef152df
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ continues fetching data from upstream and feeding more groups. Sometimes, these
constraints multiply and lead to hangs, such as when you have a high cardinality and the
concurrency of the `flatMap` consuming the groups is too low.

The following example groups values by their 1st character:
To better understand the risks of hangs when combining `groupBy` with inappropriate
operators, let's consider an example.

The following snippet groups strings by their 1st character:

[source,java]
[%unbreakable]
Expand All @@ -77,45 +80,48 @@ The following example groups values by their 1st character:
.verifyComplete();
}
----
In the above example:
cardinality of groups is 4 (a, b, c ,d being the group keys)
concurrency of concatMap is 1
bufferSize of groupBy is 5 (since we defined prefetch as 5, by default its 256)

`a alpha air aim apple`

The test gets stuck after printing these elements.

Explanation :

Initially groupBy requests 5 elements
groupByBuffer :

`"alpha", "air", "aim", "beta", "cat"`

concatMap has concurrency 1, therefore group with key 'a' is the only group subscribed.
Out of these `"alpha", "air", "aim"` are consumed by concatMap and rest `"beta", "cat"` remain in the buffer.

Next, groupBy requests for additional 3 items (2 spaces are already occupied in buffer)
groupByBuffer :

`"beta", "cat", "ball", "apple", "bat"`
In the above:

Out of these "apple" is consumed rest remain in the buffer
* The cardinality of groups is **4** (`"a"`, `"b"`, `"c"`, and `"d"` being the group
keys).
* The concurrency of `concatMap` is **1**.
* The buffer size of `groupBy` is **5** (since we defined `prefetch` as 5; by
default it's 256),

Next, groupBy requests for additional 1 item (4 spaces are already occupied in buffer)

`"beta", "cat", "ball", "bat","dog"`

Now, nothing from the buffer belongs to group a, hence no more consumption happens by concatMap and it remains open.
groupBy is unable to request more data from the publisher since its buffer size is full. The publisher faces a backpressure and is not able to publish the remaining items. This results in the deadlock.

In the same example, if the data was in slightly different order, for example :
[source]
----
a alpha air aim apple
----

`"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"`
The test gets stuck after printing these elements. Let's consider what happened.

1. Initially, `groupBy` requests 5 elements. They end up in a buffer: `"alpha",
"air", "aim", "beta", "cat"`.
2. `concatMap` has concurrency of **1**. Therefore, the group with key `"a"` is the only
group that gets subscribed. Out of the initial items, `"alpha", "air", "aim"` are consumed by
`concatMap` and `"beta", "cat"` remain in the buffer.
3. Next, `groupBy` requests additional 3 items (2 items are already buffered).
The buffer now contains `"beta", "cat", "ball", "apple", "bat"`. Out of these, `"apple"`
is consumed and the rest remain in the buffer.
4. Next, `groupBy` requests for additional 1 item (4 spaces are already occupied in
buffer). The buffered items are `"beta", "cat", "ball", "bat","dog"`.
5. Now, nothing in the buffer belongs to group `"a"`, hence no more consumption happens by
`concatMap` and the first Flux remains not completed. `groupBy` is unable to request more
data from the publisher since its buffer size is full. The publisher faces backpressure
and is not able to publish the remaining items. This results in the deadlock.

In the same example, if the data was in a slightly different order, for example:

[source]
----
"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"
----

The same test would PASS successfully (the same concatMap would be able to receive a complete signal, complete one group subscribe to next group and so on).
Hence, when the pattern of data published is random, groupBy is likely to face a deadlock when the consumption rate is slower than the accommodation capacity of the groupBy buffer.
The same test would pass successfully: the same `concatMap` would be able to receive a
complete signal, complete one group, then subscribe to next group and so on.
Hence, when the pattern of data published is arbitrary, `groupBy` is likely to face a
deadlock when the consumption pattern doesn't match the capacity of the `groupBy` buffer.

[[windowing-with-flux-flux]]
== Windowing with `Flux<Flux<T>>`
Expand Down
58 changes: 31 additions & 27 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -5924,13 +5924,14 @@ public int getPrefetch() {
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* <p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
* greater than or equal to the number of groups created. In that case every group
* has its own subscriber and progress can be made, even when the data publish pattern
* is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency,
* the subscribers should be designed with caution, because if the consumption
* pattern doesn't match what can be accommodated in its producer buffer,
* the process may enter deadlock due to backpressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
Expand Down Expand Up @@ -5963,13 +5964,14 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
* <p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the number of groups created. In that case every group
* has its own subscriber and progress can be made, even when the data publish pattern
* is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency,
* the subscribers should be designed with caution, because if the consumption
* pattern doesn't match what can be accommodated in its producer buffer,
* the process may enter deadlock due to backpressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
Expand Down Expand Up @@ -6004,13 +6006,14 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
* <p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the number of groups created. In that case every group
* has its own subscriber and progress can be made, even when the data publish pattern
* is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency,
* the subscribers should be designed with caution, because if the consumption
* pattern doesn't match what can be accommodated in its producer buffer,
* the process may enter deadlock due to backpressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
Expand Down Expand Up @@ -6048,13 +6051,14 @@ public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extend
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
*<p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the cardinality of groups created. In this case every group would have
* its own subscriber and there would be no deadlocks, even when the data publish pattern is random.
* In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers),
* the subscribers should be designed with caution, because if the rate of consumption
* is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure.
* <p>
* To avoid deadlock, the concurrency of the subscriber to groupBy should be
* greater than or equal to the number of groups created. In that case every group
* has its own subscriber and progress can be made, even when the data publish pattern
* is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency,
* the subscribers should be designed with caution, because if the consumption
* pattern doesn't match what can be accommodated in its producer buffer,
* the process may enter deadlock due to backpressure.
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
Expand Down

0 comments on commit ef152df

Please sign in to comment.