Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues/3340 single buffer #3364

Closed

Conversation

MikkelHJuul
Copy link

This implements part of the Issue, and thus replaces the SizeBoundReplayBuffer in cases where only a single historical record is required. (replay latest, limit 1, all 1(, cache 1?))

Apart from fixing the memory-issue in this single Sinks.Many implementation (as described in the Issue ticket #3340), I will noted a sizable performance-increase:

  • single-threaded:
    • SingletonReplayBuffer:
      • 1000 sampleSize: 19322,433 ±(99.9%) 370,049 ns/op [Average]
      • 10000 sampleSize: 197191,348 ±(99.9%) 2778,534 ns/op [Average]
    • SizeBoundReplayBuffer:
      • 1000 sampleSize: 30733,449 ±(99.9%) 1619,685 ns/op [Average]
      • 10000 sampleSize: 315585,392 ±(99.9%) 22721,484 ns/op [Average]
  • 10 threads:
    • SingletonReplayBuffer:
      • 1000 sampleSize: 61306,088 ±(99.9%) 11100,377 ns/op [Average]
      • 10000 sampleSize: 628948,913 ±(99.9%) 135473,782 ns/op [Average]
    • SizeBoundReplayBuffer:
      • 1000 sampleSize: 137655,263 ±(99.9%) 14734,187 ns/op [Average]
      • 10000 sampleSize: 1313422,537 ±(99.9%) 60488,644 ns/op [Average]

33% better single-threaded performance, and about 100% better performance for the higher number of threads. see gist.

I'm not entirely strong on the Fusable-interface, so please read those lines carefully! (mostly a copy-paste of the SizeBoundReplayBuffer)

I may take longer to refactor the remaining SizeBoundReplayBuffer to use AtomicReferenceArray. in order to fix the issue for the remaining cases.

based off of SizeBoundReplayBuffer, but with simplifications in size/cpacity/replay etc.
add bindings to places where the SingletonReplayBuffer can be used in stead of the SizeBoundReplayBuffer
to compare against the replaced implementation
@MikkelHJuul MikkelHJuul requested a review from a team as a code owner February 24, 2023 12:46
@pivotal-cla
Copy link

@MikkelHJuul Please sign the Contributor License Agreement!

Click here to manually synchronize the status of this Pull Request.

See the FAQ for frequently asked questions.

1 similar comment
@pivotal-cla
Copy link

@MikkelHJuul Please sign the Contributor License Agreement!

Click here to manually synchronize the status of this Pull Request.

See the FAQ for frequently asked questions.

@pivotal-cla
Copy link

@MikkelHJuul Thank you for signing the Contributor License Agreement!

@MikkelHJuul
Copy link
Author

MikkelHJuul commented Feb 25, 2023

@OlegDokuka - you answered my issue ticket, so I'll pull you in here :)

Are there anything specific wrt. testing you require or want before being able to review?

Edit: I will run with coverage to check if the current tests cover the code, at latest Monday

@MikkelHJuul
Copy link
Author

I have run the tests with coverage. Mostly fine (about the same coverage as SizeBound). I added some tests (parameterizations)

There is a bug in Fusion that makes the replay(1) cache only "fuse" the first element. I suspect something with replayFused vs. poll not working correctly

When the same value is passed two times in a row the prior implementation does not convey the second signal
@MikkelHJuul
Copy link
Author

with ae5153e the performance of the SingletonBuffer is reduced; and it's about 18% more performant than the SizeBoundReplayBuffer.

The memory usage is about halved (not counting the leaked reference)

@OlegDokuka OlegDokuka added the type/enhancement A general enhancement label Mar 8, 2023
@OlegDokuka OlegDokuka added this to the 3.4.28 milestone Mar 8, 2023
@violetagg violetagg modified the milestones: 3.4.28, 3.4.29 Mar 14, 2023
FUSED still not working
@OlegDokuka
Copy link
Contributor

@MikkelHJuul do you mind adding a JCStress test as well. If you don't have time I can do a followup PR

@MikkelHJuul
Copy link
Author

MikkelHJuul commented Mar 22, 2023

@MikkelHJuul do you mind adding a JCStress test as well. If you don't have time I can do a followup PR

Absolutely, also, seeing as the leak was easily fixed in my other PR (I was too hellbent on reducing the memory footprint) I think it is a must have to verify better performance, and also to verify memory footprint before accepting this implementation.

Additionally, I will fix the integer overflow in #add. And note that there is a very rare bug (when overflow is fixed) where a subscriber can wait for integer_max emissions and request more then accidentally skip an emission because its index was re-hit; can that be accepted? nvm I remembered it as integer overflow threw exception, dunno why

@MikkelHJuul
Copy link
Author

@MikkelHJuul do you mind adding a JCStress test as well. If you don't have time I can do a followup PR

I have written a JCStress test can you validate that it actually tests anything of value

@MikkelHJuul
Copy link
Author

after thinking about this some more, I cannot fully see how subscription-buffer protects vs. high write (but I cannot see how this is done in the original solution; which in fact does a non-atomic volatile reassignment in its #add-implementation.

also should the #add-method be a non-blocking write?

@Override
public void add(T value) {
	T ref;
	int[] stamp = new int[1];
	do {
		ref = val.get(stamp);
	} while (!val.compareAndSet(ref, value, stamp[0], stamp[0]+1));
}

I wrote a jcstress with both cases and got:

GET > SET
  RESULT    SAMPLES     FREQ      EXPECT  DESCRIPTION
       1        922    0,08%   Forbidden  No default case provided, assume Forbidden
       2      4.853    0,40%   Forbidden  No default case provided, assume Forbidden
       3      7.460    0,61%   Forbidden  No default case provided, assume Forbidden
       4      6.785    0,56%   Forbidden  No default case provided, assume Forbidden
       5      9.851    0,81%   Forbidden  No default case provided, assume Forbidden
       6  1.186.641   97,54%  Acceptable  all signals go through

DO WHILE
  RESULT    SAMPLES     FREQ      EXPECT  DESCRIPTION
       1          2   <0,01%   Forbidden  No default case provided, assume Forbidden
       2      3.644    0,28%   Forbidden  No default case provided, assume Forbidden
       3      8.053    0,63%   Forbidden  No default case provided, assume Forbidden
       4      8.366    0,65%   Forbidden  No default case provided, assume Forbidden
       5     13.143    1,02%   Forbidden  No default case provided, assume Forbidden
       6  1.254.984   97,42%  Acceptable  all signals go through

which is close - Do-While-non-blocking write has a slightly higher affinity, but fewer fully correct results.

I ran the same test on the SizeBoundReplayBuffer, but it failed miserably (I guess this only really works when wrapped in the SinksManySerialized)

    Unrecoverable error while running
    java.lang.NullPointerException: Cannot invoke "reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer$Node.get()" because "this.head" is null
    	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.add(FluxReplay.java:1022)
    	at reactor.core.publisher.SinkManyReplayProcessor.tryEmitNext(SinkManyReplayProcessor.java:475)
    	at reactor.core.publisher.SinksManyReplayLatestStressTest$FluxReplaySizeBoundWriteStressTest.one(SinksManyReplayLatestStressTest.java:201)
    	at reactor.core.publisher.SinksManyReplayLatestStressTest_FluxReplaySizeBoundWriteStressTest_jcstress.run_one(SinksManyReplayLatestStressTest_FluxReplaySizeBoundWriteStressTest_jcstress.java:311)
    	at reactor.core.publisher.SinksManyReplayLatestStressTest_FluxReplaySizeBoundWriteStressTest_jcstress.task_one(SinksManyReplayLatestStressTest_FluxReplaySizeBoundWriteStressTest_jcstress.java:294)
    	at reactor.core.publisher.SinksManyReplayLatestStressTest_FluxReplaySizeBoundWriteStressTest_jcstress.access$000(SinksManyReplayLatestStressTest_FluxReplaySizeBoundWriteStressTest_jcstress.java:19)
    	at reactor.core.publisher.SinksManyReplayLatestStressTest_FluxReplaySizeBoundWriteStressTest_jcstress$8.internalRun(SinksManyReplayLatestStressTest_FluxReplaySizeBoundWriteStressTest_jcstress.java:223)
    	at org.openjdk.jcstress.infra.runners.CounterThread.run(CounterThread.java:38)

my code for these tests is present in the branch https://github.com/MikkelHJuul/reactor-core/tree/issues/3340-single-processor-stresstest

@johnnybgoode1885
Copy link

@MikkelHJuul Did you have any success mitigating the concurrency issue when using Sinks.unsafe().many().replay().latest()? I thought changing direct assignation to atomic one would do the trick (at least it did for me in my tests, but I can't be 100% sure those tests were good enough), but it seems that you found out that there is more to it.

@MikkelHJuul
Copy link
Author

@MikkelHJuul Did you have any success mitigating the concurrency issue when using Sinks.unsafe().many().replay().latest()? I thought changing direct assignation to atomic one would do the trick (at least it did for me in my tests, but I can't be 100% sure those tests were good enough), but it seems that you found out that there is more to it.

Sorry I didn't reply. The API between the wrapping class and the cache implementation is simply not that good for concurrent calling as far as I remember. The single implementation was "safe" for concurrent calling but cache calling to downstream subscribers could trigger incorrect behavior (skipped emission to a subscriber)

@violetagg violetagg removed this from the 3.4.29 milestone Dec 12, 2023
@OlegDokuka OlegDokuka closed this Jan 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants