From ef152df80d57713f9d33cb6080d8f2ee2b90d093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 20 Aug 2024 13:46:29 +0200 Subject: [PATCH] [doc] Polish #3872 --- .../advanced-three-sorts-batching.adoc | 76 ++++++++++--------- .../java/reactor/core/publisher/Flux.java | 58 +++++++------- 2 files changed, 72 insertions(+), 62 deletions(-) diff --git a/docs/modules/ROOT/pages/advancedFeatures/advanced-three-sorts-batching.adoc b/docs/modules/ROOT/pages/advancedFeatures/advanced-three-sorts-batching.adoc index d07c057060..2dbd0e865d 100644 --- a/docs/modules/ROOT/pages/advancedFeatures/advanced-three-sorts-batching.adoc +++ b/docs/modules/ROOT/pages/advancedFeatures/advanced-three-sorts-batching.adoc @@ -50,7 +50,10 @@ continues fetching data from upstream and feeding more groups. Sometimes, these constraints multiply and lead to hangs, such as when you have a high cardinality and the concurrency of the `flatMap` consuming the groups is too low. -The following example groups values by their 1st character: +To better understand the risks of hangs when combining `groupBy` with inappropriate +operators, let's consider an example. + +The following snippet groups strings by their 1st character: [source,java] [%unbreakable] @@ -77,45 +80,48 @@ The following example groups values by their 1st character: .verifyComplete(); } ---- -In the above example: -cardinality of groups is 4 (a, b, c ,d being the group keys) -concurrency of concatMap is 1 -bufferSize of groupBy is 5 (since we defined prefetch as 5, by default its 256) - -`a alpha air aim apple` - -The test gets stuck after printing these elements. - -Explanation : - -Initially groupBy requests 5 elements -groupByBuffer : - -`"alpha", "air", "aim", "beta", "cat"` - -concatMap has concurrency 1, therefore group with key 'a' is the only group subscribed. -Out of these `"alpha", "air", "aim"` are consumed by concatMap and rest `"beta", "cat"` remain in the buffer. - -Next, groupBy requests for additional 3 items (2 spaces are already occupied in buffer) -groupByBuffer : -`"beta", "cat", "ball", "apple", "bat"` +In the above: -Out of these "apple" is consumed rest remain in the buffer +* The cardinality of groups is **4** (`"a"`, `"b"`, `"c"`, and `"d"` being the group +keys). +* The concurrency of `concatMap` is **1**. +* The buffer size of `groupBy` is **5** (since we defined `prefetch` as 5; by +default it's 256), -Next, groupBy requests for additional 1 item (4 spaces are already occupied in buffer) - -`"beta", "cat", "ball", "bat","dog"` - -Now, nothing from the buffer belongs to group a, hence no more consumption happens by concatMap and it remains open. -groupBy is unable to request more data from the publisher since its buffer size is full. The publisher faces a backpressure and is not able to publish the remaining items. This results in the deadlock. - -In the same example, if the data was in slightly different order, for example : +[source] +---- +a alpha air aim apple +---- -`"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"` +The test gets stuck after printing these elements. Let's consider what happened. + +1. Initially, `groupBy` requests 5 elements. They end up in a buffer: `"alpha", +"air", "aim", "beta", "cat"`. +2. `concatMap` has concurrency of **1**. Therefore, the group with key `"a"` is the only +group that gets subscribed. Out of the initial items, `"alpha", "air", "aim"` are consumed by +`concatMap` and `"beta", "cat"` remain in the buffer. +3. Next, `groupBy` requests additional 3 items (2 items are already buffered). +The buffer now contains `"beta", "cat", "ball", "apple", "bat"`. Out of these, `"apple"` +is consumed and the rest remain in the buffer. +4. Next, `groupBy` requests for additional 1 item (4 spaces are already occupied in +buffer). The buffered items are `"beta", "cat", "ball", "bat","dog"`. +5. Now, nothing in the buffer belongs to group `"a"`, hence no more consumption happens by +`concatMap` and the first Flux remains not completed. `groupBy` is unable to request more +data from the publisher since its buffer size is full. The publisher faces backpressure +and is not able to publish the remaining items. This results in the deadlock. + +In the same example, if the data was in a slightly different order, for example: + +[source] +---- +"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat" +---- -The same test would PASS successfully (the same concatMap would be able to receive a complete signal, complete one group subscribe to next group and so on). -Hence, when the pattern of data published is random, groupBy is likely to face a deadlock when the consumption rate is slower than the accommodation capacity of the groupBy buffer. +The same test would pass successfully: the same `concatMap` would be able to receive a +complete signal, complete one group, then subscribe to next group and so on. +Hence, when the pattern of data published is arbitrary, `groupBy` is likely to face a +deadlock when the consumption pattern doesn't match the capacity of the `groupBy` buffer. [[windowing-with-flux-flux]] == Windowing with `Flux>` diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index c3e57dd8ff..bb636c4a9c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -5924,13 +5924,14 @@ public int getPrefetch() { * if the groups are not suitably consumed downstream (eg. due to a {@code flatMap} * with a {@code maxConcurrency} parameter that is set too low). * - *

+ *

* To avoid deadlock, the concurrency of the subscriber to groupBy should be - * greater than or equal to the cardinality of groups created. In this case every group would have - * its own subscriber and there would be no deadlocks, even when the data publish pattern is random. - * In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers), - * the subscribers should be designed with caution, because if the rate of consumption - * is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure. + * greater than or equal to the number of groups created. In that case every group + * has its own subscriber and progress can be made, even when the data publish pattern + * is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, + * the subscribers should be designed with caution, because if the consumption + * pattern doesn't match what can be accommodated in its producer buffer, + * the process may enter deadlock due to backpressure. * *

* Note that groups are a live view of part of the underlying source publisher, @@ -5963,13 +5964,14 @@ public final Flux> groupBy(Function - * To avoid deadlock, the concurrency of the subscriber to groupBy should be - * greater than or equal to the cardinality of groups created. In this case every group would have - * its own subscriber and there would be no deadlocks, even when the data publish pattern is random. - * In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers), - * the subscribers should be designed with caution, because if the rate of consumption - * is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure. + *

+ * To avoid deadlock, the concurrency of the subscriber to groupBy should be + * greater than or equal to the number of groups created. In that case every group + * has its own subscriber and progress can be made, even when the data publish pattern + * is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, + * the subscribers should be designed with caution, because if the consumption + * pattern doesn't match what can be accommodated in its producer buffer, + * the process may enter deadlock due to backpressure. * *

* Note that groups are a live view of part of the underlying source publisher, @@ -6004,13 +6006,14 @@ public final Flux> groupBy(Function - * To avoid deadlock, the concurrency of the subscriber to groupBy should be - * greater than or equal to the cardinality of groups created. In this case every group would have - * its own subscriber and there would be no deadlocks, even when the data publish pattern is random. - * In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers), - * the subscribers should be designed with caution, because if the rate of consumption - * is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure. + *

+ * To avoid deadlock, the concurrency of the subscriber to groupBy should be + * greater than or equal to the number of groups created. In that case every group + * has its own subscriber and progress can be made, even when the data publish pattern + * is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, + * the subscribers should be designed with caution, because if the consumption + * pattern doesn't match what can be accommodated in its producer buffer, + * the process may enter deadlock due to backpressure. * *

* Note that groups are a live view of part of the underlying source publisher, @@ -6048,13 +6051,14 @@ public final Flux> groupBy(Function - * To avoid deadlock, the concurrency of the subscriber to groupBy should be - * greater than or equal to the cardinality of groups created. In this case every group would have - * its own subscriber and there would be no deadlocks, even when the data publish pattern is random. - * In the other scenario, where cardinality > concurrency (no.of groups > no. of subscribers), - * the subscribers should be designed with caution, because if the rate of consumption - * is less than what can be accommodated in its producer buffer, the process will enter deadlock due to back pressure. + *

+ * To avoid deadlock, the concurrency of the subscriber to groupBy should be + * greater than or equal to the number of groups created. In that case every group + * has its own subscriber and progress can be made, even when the data publish pattern + * is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, + * the subscribers should be designed with caution, because if the consumption + * pattern doesn't match what can be accommodated in its producer buffer, + * the process may enter deadlock due to backpressure. * *

* Note that groups are a live view of part of the underlying source publisher,