Skip to content

Commit

Permalink
[doc] Add example of deadlock with groupBy and concatMap (See #3443)
Browse files Browse the repository at this point in the history
  • Loading branch information
NamrataGuptaRoy authored Aug 15, 2024
1 parent e4ac1f9 commit 1264a56
Showing 1 changed file with 67 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,73 @@ continues fetching data from upstream and feeding more groups. Sometimes, these
constraints multiply and lead to hangs, such as when you have a high cardinality and the
concurrency of the `flatMap` consuming the groups is too low.

The following example groups values by their 1st character:

[source,java]
[%unbreakable]
----
public static Flux<String> createGroupedFlux() {
List<String> data = List.of("alpha", "air", "aim", "beta", "cat", "ball", "apple", "bat", "dog", "ace");
return Flux.fromIterable(data)
.groupBy(d -> d.charAt(0), 5)
.concatMap(g -> g.map(String::valueOf)
.startWith(String.valueOf(g.key()))
.map(o -> {
System.out.println(o);
return o;
})
);
}
@Test
public void testGroupBy() {
StepVerifier.create(createGroupedFlux())
.expectNext("a", "alpha", "air", "aim", "apple", "ace")
.expectNext("b", "beta", "ball", "bat")
.expectNext("c", "cat", "d", "dog")
.verifyComplete();
}
----
In the above example:
cardinality of groups is 4 (a, b, c ,d being the group keys)
concurrency of concatMap is 1
bufferSize of groupBy is 5 (since we defined prefetch as 5, by default its 256)

`a alpha air aim apple`

The test gets stuck after printing these elements.

Explanation :

Initially groupBy requests 5 elements
groupByBuffer :

`"alpha", "air", "aim", "beta", "cat"`

concatMap has concurrency 1, therefore group with key 'a' is the only group subscribed.
Out of these `"alpha", "air", "aim"` are consumed by concatMap and rest `"beta", "cat"` remain in the buffer.

Next, groupBy requests for additional 3 items (2 spaces are already occupied in buffer)
groupByBuffer :

`"beta", "cat", "ball", "apple", "bat"`

Out of these "apple" is consumed rest remain in the buffer

Next, groupBy requests for additional 1 item (4 spaces are already occupied in buffer)

`"beta", "cat", "ball", "bat","dog"`

Now, nothing from the buffer belongs to group a, hence no more consumption happens by concatMap and it remains open.
groupBy is unable to request more data from the publisher since its buffer size is full. The publisher faces a backpressure and is not able to publish the remaining items. This results in the deadlock.

In the same example, if the data was in slightly different order, for example :

`"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"`

The same test would PASS successfully (the same concatMap would be able to receive a complete signal, complete one group subscribe to next group and so on).
Hence, when the pattern of data published is random, groupBy is likely to face a deadlock when the consumption rate is slower than the accommodation capacity of the groupBy buffer.

[[windowing-with-flux-flux]]
== Windowing with `Flux<Flux<T>>`

Expand Down

0 comments on commit 1264a56

Please sign in to comment.