Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sinks.many().replay().latest() holds two values in cache #3340

Open
MikkelHJuul opened this issue Jan 31, 2023 · 27 comments
Open

Sinks.many().replay().latest() holds two values in cache #3340

MikkelHJuul opened this issue Jan 31, 2023 · 27 comments
Labels
status/need-investigation This needs more in-depth investigation type/bug A general bug

Comments

@MikkelHJuul
Copy link

The caching used for Sinks.many().replay().latest() holds both head and tail in the cache, although only one is available for replay (as it should). This means that the extra reference holds unnecessary memory, and holds onto a reference that cannot be reclaimed.

Expected Behavior

The cache holds only a single reference in memory.

Actual Behavior

The https://github.com/reactor/reactor-core/blob/main/reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java#L774 SizeBoundReplayBuffer holds two items in cache.

Steps to Reproduce

package reactor.core.publisher;

@Test
void reproCase() {
     var replayBuffer = new FluxReplay.SizeBoundReplayBuffer<Integer>(1);
     replayBuffer.add(11);
     replayBuffer.add(22);
     assert replayBuffer.head != replayBuffer.tail;
     assert replayBuffer.head.value != replayBuffer.tail.value;
}

Possible Solution

package reactor.core.publisher;

@Test
void fixCase() {
     var replayBuffer = new FluxReplay.SizeBoundReplayBuffer<Integer>(0);
     replayBuffer.add(11);
     replayBuffer.add(22);
     assert replayBuffer.head == replayBuffer.tail;
     assert replayBuffer.head.value == replayBuffer.tail.value;
     assert replayBuffer.head.value == 22;
}

Your Environment

doesn't bloody matter

  • Reactor version(s) used: 3.5.0
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jan 31, 2023
@MikkelHJuul
Copy link
Author

MikkelHJuul commented Feb 1, 2023

I will just add that the possible solution stated here fixes the head != tail issue but the buffer will not emit its value, it looks like this is a "simple" off-by-one error. (edit: limit 2 yields 3 items referenced)

Consider making a singleton implementation of the buffer. And moving to using the Java's AtomicReferenceArray in stead of the linked list (Node extends AtomicReference<Node>). This should improve memory and performance.

@MikkelHJuul
Copy link
Author

MikkelHJuul commented Feb 1, 2023

My least effort implementations of a SingletonReplayBuffer and an AtomiReferenceArray-based implementation

There is bound to be some issues with these, but they both passed a naive test:

    public static <T> Sinks.Many<T> replayLatestSink() {
        var sink = new SinkManyReplayProcessor<>(new AtomicArraySizeBoundReplayBuffer<T>(1));
        return sink; // and wrapped: new SinkManySerialized<>(sink, sink);
    }
    public static <T> Sinks.Many<T> replayLatestSinkSingleton() {
        var sink = new SinkManyReplayProcessor<>(new SingletonReplayBuffer<T>());
        return  sink; // and wrapped: new SinkManySerialized<>(sink, sink);
    }

    @Test
    void testFunctionalityOfSinksManyReplayLatest() {
        Sinks.Many<Integer> sink  = replayLatestSink(); // or replayLatestSingleton();
        sink.tryEmitNext(12);
        var firstResult = sink.asFlux().blockFirst();
        assert firstResult != null;
        assert firstResult == 12;
        var secondResult = sink.asFlux().blockFirst();
        assert firstResult.equals(secondResult);

        sink.tryEmitNext(21);
        firstResult = sink.asFlux().blockFirst();
        assert firstResult != null;
        assert firstResult == 21;
        secondResult = sink.asFlux().blockFirst();
        assert firstResult.equals(secondResult);
    }

sorry it's not in a PR form, I do this on my work PC and I cannot clone from github (private PC not set up for Java, nor IntelliJ)

    static final class SingletonReplayBuffer<T> implements FluxReplay.ReplayBuffer<T> {
        final AtomicReference<T> val = new AtomicReference<>(null);

        final int indexUpdateLimit = Operators.unboundedOrLimit(1);
        volatile boolean done;
        Throwable error;
        @Override
        public void add(T value) {
            val.set(value);
        }

        @Override
        public void onError(Throwable ex) {
            error = ex;
            done = true;
        }

        @Override
        public Throwable getError() {
            return error;
        }

        @Override
        public void onComplete() {
            done = true;
        }

        @Override
        public void replay(FluxReplay.ReplaySubscription<T> rs) {
            if (!rs.enter()) {
                return;
            }

            if (rs.fusionMode() == FluxReplay.NONE) {
                replayNormal(rs);
            }
            else {
                replayFused(rs);
            }
        }

        private void replayNormal(FluxReplay.ReplaySubscription<T> rs) {
            final Subscriber<? super T> a = rs.actual();

            int missed = 1;
            long r = rs.requested(); // can this be 0?
            long e = 0L;

            for (; ; ) {

                @SuppressWarnings("unchecked") T curr = (T) rs.node();
                T t = val.get();
                if (curr == null) {
                    curr = t;
                    rs.node(curr);
                }
                boolean d = done;
                boolean empty = t == null;

                if (d && empty) {
                    rs.node(null);
                    Throwable ex = error;
                    if (ex != null) {
                        a.onError(ex);
                    }
                    else {
                        a.onComplete();
                    }
                    return;
                }

                if (empty) {
                    continue;
                }

                if (rs.isCancelled()) {
                    rs.node(null);
                    return;
                }


                a.onNext(t);
                e++;

                if (r != Long.MAX_VALUE) {
                    rs.produced(1);
                }

                rs.node(t);

                missed = rs.leave(missed);
                if (missed == 0 || e == r) {
                    break;
                }
            }
        }

        private void replayFused(FluxReplay.ReplaySubscription<T> rs) {
            // I have no idea...
        }

        @Override
        public boolean isDone() {
            return done;
        }

        @Override
        public T poll(FluxReplay.ReplaySubscription<T> rs) {
            @SuppressWarnings("unchecked") T node = (T) rs.node();
            T next = val.get();
            if (node == null) {
                rs.node(next);
            }

//            if ((1) % indexUpdateLimit == 0) {
//                rs.requestMore(1);
//            } // this should be rethought for this implementation

            return next;
        }

        @Override
        public void clear(FluxReplay.ReplaySubscription<T> rs) {
            rs.node(null);
        }

        @Override
        public boolean isEmpty(FluxReplay.ReplaySubscription<T> rs) {
            @SuppressWarnings("unchecked") T node = (T) rs.node();
            if (node == null) {
                rs.node(val.get());
                return false;
                // doesn't this mean that the subscription is not up-to-date?
                // then the subscription can progress, why add the node, then?
            }
            return node == val.get();
        }

        @Override
        public int size(FluxReplay.ReplaySubscription<T> rs) {
            @SuppressWarnings("unchecked") T node = (T) rs.node();
            T t = val.get();
            if (node == null) {
                rs.node(t);
            }
            return sizeOf(t);
        }

        @Override
        public int size() {
            return  sizeOf(val.get());
        }

        private int sizeOf(@Nullable T t) {
            return t != null ? 1 : 0;
        }

        @Override
        public int capacity() {
            return 1;
        }

        @Override
        public boolean isExpired() {
            return false;
        }
    }

    static final class AtomicArraySizeBoundReplayBuffer<T> implements FluxReplay.ReplayBuffer<T> {

        final AtomicInteger head = new AtomicInteger();
        final int indexUpdateLimit;
        volatile boolean done;
        Throwable error;

        final AtomicReferenceArray<T> buffer;

        AtomicArraySizeBoundReplayBuffer(int limit) {
            if (limit < 0) {
                throw new IllegalArgumentException("Limit cannot be negative");
            }
            this.indexUpdateLimit = Operators.unboundedOrLimit(limit);

            this.buffer = new AtomicReferenceArray<>(limit);
        }

        @Override
        public boolean isExpired() {
            return false;
        }

        @Override
        public int capacity() {
            return buffer.length();
        }

        @Override
        public void add(T value) {
            buffer.set(
                    head.accumulateAndGet(capacity(), (curr, cap) -> {
                        if (curr + 1 == cap) return 0;
                        return curr + 1;
                    }),
                    value
            );
        }

        @Override
        public void onError(Throwable ex) {
            error = ex;
            done = true;
        }

        @Override
        public void onComplete() {
            done = true;
        }

        void replayNormal(FluxReplay.ReplaySubscription<T> rs) {
            final Subscriber<? super T> a = rs.actual();

            int missed = 1;

            for (; ; ) {

                long r = rs.requested();
                long e = 0L;

                Integer curr = (Integer) rs.node();
                if (curr == null) {
                    curr = head.get();
                    rs.node(curr);
                }
                int start = curr;

                while (e != r) {
                    if (rs.isCancelled()) {
                        rs.node(null);
                        return;
                    }

                    boolean d = done;
                    T next = buffer.get(curr);
                    boolean empty = next == null;

                    if (d && empty) { // if start is not 0 here that is a clear bug!
                        rs.node(null);
                        Throwable ex = error;
                        if (ex != null) {
                            a.onError(ex);
                        }
                        else {
                            a.onComplete();
                        }
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(next);

                    e++;
                    if (curr == capacity() && start != 0) {
                        curr = 0;
                    } else {
                        curr++;
                    }

                    if (curr == start) break;

                    if ((curr + 1) % indexUpdateLimit == 0) {
                        rs.requestMore(curr + 1);
                    }
                }

                if (e != 0L) {
                    if (r != Long.MAX_VALUE) {
                        rs.produced(e);
                    }
                }

                rs.node(curr);

                missed = rs.leave(missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        void replayFused(FluxReplay.ReplaySubscription<T> rs) {
            int missed = 1;

            final Subscriber<? super T> a = rs.actual();

            for (; ; ) {

                if (rs.isCancelled()) {
                    rs.node(null);
                    return;
                }

                boolean d = done;

                a.onNext(null);

                if (d) {
                    Throwable ex = error;
                    if (ex != null) {
                        a.onError(ex);
                    }
                    else {
                        a.onComplete();
                    }
                    return;
                }

                missed = rs.leave(missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        @Override
        public void replay(FluxReplay.ReplaySubscription<T> rs) {
            if (!rs.enter()) {
                return;
            }

            if (rs.fusionMode() == FluxReplay.NONE) {
                replayNormal(rs);
            }
            else {
                replayFused(rs);
            }
        }

        @Override
        @Nullable
        public Throwable getError() {
            return error;
        }

        @Override
        public boolean isDone() {
            return done;
        }

        @Override
        @Nullable
        public T poll(FluxReplay.ReplaySubscription<T> rs) {
            Integer node = (Integer) rs.node();
            if (node == null) {
                node = head.get();
                rs.node(node);
            }

            T next = buffer.get(node);
            if (next == null) {
                return null;
            }
            rs.node(node+1);

            if ((node + 1) % indexUpdateLimit == 0) {
                rs.requestMore(node + 1);
            }

            return next;
        }

        @Override
        public void clear(FluxReplay.ReplaySubscription<T> rs) {
            //clear array?
            rs.node(null);
        }

        @Override
        public boolean isEmpty(FluxReplay.ReplaySubscription<T> rs) {
            Integer node = (Integer) rs.node();
            if (node == null) {
                node = head.get();
                rs.node(node);
            }
            return buffer.get(0) == null;
        }

        @Override
        public int size(FluxReplay.ReplaySubscription<T> rs) {
            Integer node = (Integer) rs.node();
            if (node == null) {
                node = head.get();
            }
            return sizeFrom(node);
        }

        @Override
        public int size() {
            return sizeFrom(head.get());
        }

        private int sizeFrom(int position) {
            if (
                    position+1 >= capacity()
                            || buffer.get(position+1) != null
            ) return capacity();
            return position;
        }
    }

There is bound to be some bugs here... but anyway the implementations are pretty simple. I didn't really understand how to use the rs.node() nor requestMore for the singleton implementation;

@MikkelHJuul
Copy link
Author

fixed a bug in replay:

       void replayNormal(FluxReplay.ReplaySubscription<T> rs) {
            final Subscriber<? super T> a = rs.actual();

            int missed = 1;

            for (; ; ) {

                long r = rs.requested();
                long e = 0L;

                Integer curr = (Integer) rs.node();
                if (curr == null) {
                    curr = head.get();
                    rs.node(curr);
                }
                int last = curr;
                curr = nextOrReset(curr);

                while (e != r) {
                    if (rs.isCancelled()) {
                        rs.node(null);
                        return;
                    }

                    boolean d = done;
                    T next = buffer.get(curr);
                    boolean empty = next == null;

                    if (d && empty) { // if start is not 0 here that is a clear bug!
                        rs.node(null);
                        Throwable ex = error;
                        if (ex != null) {
                            a.onError(ex);
                        }
                        else {
                            a.onComplete();
                        }
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(next);

                    e++;
                    if (curr == last) break;
                    curr = nextOrReset(curr);


                    if ((curr + 1) % indexUpdateLimit == 0) {
                        rs.requestMore(curr + 1);
                    }
                }

                if (e != 0L) {
                    if (r != Long.MAX_VALUE) {
                        rs.produced(e);
                    }
                }

                rs.node(curr);

                missed = rs.leave(missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        private Integer nextOrReset(Integer curr) {
            if (curr + 1 == capacity()) {
                return 0;
            }
            return curr + 1;
        }

But there is some timing issue/race condition in the code... Replay is called twice, producing double responses. but stepping through it works... and some times I got 11 and 13 outputs in a buffer of size 10 (always in correct order), by adding a breakpoint... it may be easier to reimplement than to go with this implementation then :(

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Feb 2, 2023

Hi, @MikkelHJuul!

Can you reproduce that issue via black-box test as well? Something like a test that demonstrates the operator fails on its own can be useful. Consider JCStress Test, etc

Thanks,
Oleh

@OlegDokuka OlegDokuka added the status/need-user-input This needs user input to proceed label Feb 3, 2023
@MikkelHJuul
Copy link
Author

Hi, @MikkelHJuul!

Can you reproduce that issue via black-box test as well? Something like a test that demonstrates the operator fails on its own can be useful. Consider JCStress Test, etc

Thanks,

Oleh

In the implementation? Or the original issue? Because the original issue is not really something that is testable via the API, but I can tell give you some jmap histogram showing the 2:1 correlation. Again, I cannot really be of much help because I cannot share actual code with you guys because I don't have a dev PC where it is permitted.

@OlegDokuka OlegDokuka added type/bug A general bug status/need-investigation This needs more in-depth investigation and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet status/need-user-input This needs user input to proceed labels Feb 6, 2023
@OlegDokuka
Copy link
Contributor

OlegDokuka commented Feb 6, 2023

@MikkelHJuul I'm not sure the problem is mitigatable since in all possible cases the previous value is referenced and being held longer than needed. This is "by design" of the current replay storage and I can only guess that this is the best effort, the most efficient implementation. I'm afraid we will not fix it

actually, RxJava has the same issue, so I wonder what @akarnokd thinks about it

@OlegDokuka
Copy link
Contributor

@MikkelHJuul can you tell us how critical is this issue for you?

@MikkelHJuul
Copy link
Author

@OlegDokuka - Well... our use of this is kinda spurious as well.. holding all cache objects in a Sinks.many().replay().latest() in stead of only the data that is subscribed downstream. also eagerly caching too much. but it is the single largest shallow memory usage apart from String and byte[] in our current use.

for a possible fix of the above add eager tests for equality:

                Integer curr = (Integer) rs.node();
                if (curr == null) {
                    curr = head.get();
                    rs.node(curr);
                }

replaced by

                Integer curr = (Integer) rs.node();
                Integer hd = head.get();
                if (hd.equals(curr)) {
                    break;
                }
                if (curr == null) {
                    curr = hd;
                    rs.node(curr);
                }

This I am mostly afraid of a subscriber that dissallows #enter until it is incidentally at the same head again making it miss the limited offset.. so the Integer that marks the ringbuffer head should increment, and mark the modulated position in stead.

and for the Singleton; add a simple

                 if (curr == t) {
                    break;
                }

add the similar point for the Singletons implementation

It seemed to fix all the issues, but you will have to test yourself.

I'm not entirely sure that I bite into the we save one dangling AtomicReference too many "by design". I understand that the implementation works wrt. your tests, that is not the same as it being the most correct implementation.

(we had some actual memory leaks (growing) which is also probably the only reason that I found this at all)

I may try to get a developer environment running on my personal PC so I can make an actual PR if you are not interested in taking over.

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Feb 11, 2023

Hi, @MikkelHJuul!

Thank you for the open response with deeper explanation of the problem.

First of all, any contribution is appreciated, and it is not equal to 'we dont wanna mess with that' 😅

What i mean is that the atomic replay store is singular case, while the rest of scenarios (e.g. cache(x)) are still having the same issue.

That means although your case is covered the others arent.

From, what you say it sounds you care alot about performance, so I start wondering whether you can replace sink API with something different to unblock you for now. 🧐

Nevertheless, I want to do a bit deeper research first to see if we can solve the general scenario as well, and weight your proposal as an good alternative. 🤓

That say, dont hesitate to send your PR while we going to spawn own research in parallel 🙂

@MikkelHJuul
Copy link
Author

Hi, @MikkelHJuul!

Thank you for the open response with deeper explanation of the problem.

First of all, any contribution is appreciated, and it is not equal to 'we dont wanna mess with that' 😅

What i mean is that the atomic replay store is singular case, while the rest of scenarios (e.g. cache(x)) are still having the same issue.

That means although your case is covered the others arent.

From, what you say it sounds you care alot about performance, so I start wondering whether you can replace sink API with something different to unblock you for now. 🧐

Nevertheless, I want to do a bit deeper research first to see if we can solve the general scenario as well, and weight your proposal as an good alternative. 🤓

That say, dont hesitate to send your PR while we going to spawn own research in parallel 🙂

Thanks, I'm more concerned with holding on to the memory too long. I think I will look at it this week. Can you tell me if it's maybe just more feasible to copy the implementation of the FluxOnBackpressureBufferLatest?

@MikkelHJuul
Copy link
Author

hello, I would like to do a short summary, since I have now had some more time to look into this, I think I may need some help to get completely finished. Yesterday I implemented a replacement for the SizeBoundReplayBuffer in https://github.com/MikkelHJuul/reactor-core/tree/issues/3340-object-array using an Object[], that solution solves the concurrency stress-test (with about 97% fully correct replay, as noted about the other implementations as well), making it safer for use via Sinks.unsafe(). The solution does not pass the tests related to RefCount (issue/1260), and I need more help on that. Also, as you may note in that solution I have done resetting via rs.index(0) (since the implementation does not use rs#node at all) Also, is this correct behavior, and should this also be implemented this way in the SingletonReplayBuffer?

using index solely does give the buffers (Singleton and Array) a discrete bug, in which after overflow, and reaching index = 0 the buffer will skip a single emission on a subscriber arriving at then. Should this bug be accepted or should it be prevented (by rolling the buffer to a valid index, that would prevent this, singleton can simply skip 0 and go to 1, the array could move to (INTEGER_MIN % capacity()) instead of going to INTEGER_MIN)

I had a bit too many questions to simply go on myself at this point.

Thanks

@akarnokd
Copy link
Contributor

I don't fully understand the issue. We have an operator parameter in RxJava to avoid the retention of the head item.

@MikkelHJuul
Copy link
Author

I don't fully understand the issue. We have an operator parameter in RxJava to avoid the retention of the head item.

The code is not completely comparable, although close. I would expect the same leak to be present there (and be reducible by this flag)

Is the RXJava buffer also protected behind a construct like sinkManySerialized? It looks close so I would expect the same; not safe for concurrent use. (Note the JCStress test in the PR related to this and a singleton implementation)

@akarnokd
Copy link
Contributor

akarnokd commented Mar 31, 2023

The buffer is fed by a single producer. Each subscriber walks the linked list from a node they captured at the time of subscription. Every new item arriving will indicate the need for more draining per subscriber.

Where is this JCStress code?

@MikkelHJuul
Copy link
Author

The buffer is fed by a single producer. Each subscriber walks the linked list from a node they captured at the time of subscription. Every new item arriving will indicate the need for more draining per subscriber.

Where is this JCStress code?

https://github.com/MikkelHJuul/reactor-core/blob/issues/3340-single-processor-stresstest/reactor-core/src/jcstress/java/reactor/core/publisher/SinksManyReplayLatestStressTest.java

The reactor SizeBoundReplayBuffer has a hard failure (NullPointerException)

@akarnokd
Copy link
Contributor

Is SinkManyReplayProcessor supposed to support concurrent access to its tryEmitNext and other such methods?

@MikkelHJuul
Copy link
Author

MikkelHJuul commented Mar 31, 2023

Is SinkManyReplayProcessor supposed to support concurrent access to its tryEmitNext and other such methods?

I highly doubt it at this time. The Object-array implementation I have been working on is error safe but does have the issue of missing emissions (in 2% of cases in JCStress) when tryEmitNext is called. This is through the design/implementation of the ReplayProcessor and how add and replay is called

buffer.add(x)  // concurrent addition overwrites before replay
for(var sub: subscriptions)
    buffer.replay(sub)

There is a big difference in using Sinks.unsafe().many().replay().latest() with concurrent modification exceptions(NullPointer) and missing an emission.

@MikkelHJuul
Copy link
Author

Hello, I just wanted to chip in on this issue. I just added a new PR that implements the SizeBoundReplayBuffer as an atomic object-array.

I'm not entirely sure which implementation should go in; there are now 3 to chose from: the SingletonReplayBuffer, enhancing the SizeBoundReplayBuffer and the ArraySizeBoundReplayBuffer. In all cases, I believe there could be an effort to pull in things across multiple of these PRs. fx the cancelOnNext test in the object-array implementation. The general tests in the enhanced SizeBoundReplayBuffer-case etc.

Also I see that the SizeBoundReplayBuffer could still be further improved. It could be done like the implementation in the object-array-implementation, or it could be changed to be atomically updated with CAS-semantics.

There is an issue with the license I believe? I will not change anything in any licenses, so please help me fix that.

Please contact me if to clarify.

@chemicL
Copy link
Member

chemicL commented Jan 19, 2024

Linking the PR for future reference: #3688

@OlegDokuka
Copy link
Contributor

Hi, @MikkelHJuul!

Thank you for your hard effort putting all potential improvements in multiple PRs. We have reviewed all of them and estimated the impact on the existing behaviour. To understand the impact please consider the following sample:

public static void main(String[] args) {
BaseSubscriber subscriber = new BaseSubscriber() {
@OverRide
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1);
}
};

    Sinks.Many<String> sink = Sinks.many()
                                   .replay()
                                   .latest();

   sink.asFlux().log("SUB1").subscribe(subscriber);

    System.out.println("sending first");

    sink.tryEmitNext("1");

    System.out.println("sending more");

    sink.tryEmitNext("2");
    sink.tryEmitNext("3");
    sink.tryEmitNext("4");
    sink.tryEmitNext("5");
    
    System.out.println("another subscriber");
    sink.asFlux().log("SUB2").subscribe();
    
    System.out.println("sub 1 more demand");
    subscriber.request(4);

once the sample is executed, the output is the following:

12:39:36.831 [main] INFO SUB1 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
12:39:36.840 [main] INFO SUB1 - | request(1)
sending first
12:39:36.841 [main] INFO SUB1 - | onNext(1)
sending more
another subscriber
12:39:36.844 [main] INFO SUB2 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
12:39:36.844 [main] INFO SUB2 - | request(unbounded)
12:39:36.844 [main] INFO SUB2 - | onNext(5)
sub 1 more demand
12:39:36.844 [main] INFO SUB1 - | request(4)
12:39:36.844 [main] INFO SUB1 - | onNext(2)
12:39:36.844 [main] INFO SUB1 - | onNext(3)
12:39:36.844 [main] INFO SUB1 - | onNext(4)
12:39:36.844 [main] INFO SUB1 - | onNext(5)
as it could be noticed, all 4 elements that was not observed by subscriber 1 due to lack of demand are delivered (this is because of linked list data structure used right now)

with your suggestion we have the following logs output:

12:39:36.831 [main] INFO SUB1 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
12:39:36.840 [main] INFO SUB1 - | request(1)
sending first
12:39:36.841 [main] INFO SUB1 - | onNext(1)
sending more
another subscriber
12:39:36.844 [main] INFO SUB2 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
12:39:36.844 [main] INFO SUB2 - | request(unbounded)
12:39:36.844 [main] INFO SUB2 - | onNext(5)
sub 1 more demand
12:39:36.844 [main] INFO SUB1 - | request(4)
12:39:36.844 [main] INFO SUB1 - | onNext(5)
as it could be noticed only the last element is delivered while the others are dropped.
The most recent behaviour could be valid and expected for some but for the others it could be a breaking change.

For now, all of the PRs introduces behaviour change which is not acceptable for this operator (it stays in the codebase from the very first release of the 3.x line with this behaviour). To mitigate that breaking change we need to add an extra builder step in the MulticastReplaySpec which would let one to decide transparently which behaviour is expected. Consider the following configurations code sample:

   BaseSubscriber<String> subscriber = new BaseSubscriber<String>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            subscription.request(1);
        }
    };
    Sinks.Many<String> sink = Sinks.many()
                                   .replay()
                                   .allOrNothing() // we use the Array based replay buffer / single element replay buffer
                                   .latest();
                                   
    sink.asFlux().log("SUB1").subscribe(subscriber);

    System.out.println("sending first");

    sink.tryEmitNext("1");

    System.out.println("sending more");

    sink.tryEmitNext("2"); / / returned EmitResult is FAIL_OVERFLOW
    sink.tryEmitNext("3");  / / returned EmitResult is FAIL_OVERFLOW
    sink.tryEmitNext("4");  / / returned EmitResult is FAIL_OVERFLOW
    sink.tryEmitNext("5");  / / returned EmitResult is FAIL_OVERFLOW                      
    
    subscriber.request(4); // nothing happens after that since no items are stored                             

or

   BaseSubscriber<String> subscriber = new BaseSubscriber<String>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            subscription.request(1);
        }
    };
   Sinks.Many<String> sink = Sinks.many()
                                   .replay()
                                   .bestEffort() // we use the Array based replay buffer / single element replay buffer
                                   .latest();
                                   
    sink.asFlux().log("SUB1").subscribe(subscriber);

    System.out.println("sending first");

    sink.tryEmitNext("1");

    System.out.println("sending more");

    sink.tryEmitNext("2");  / / returned EmitResult is OK
    sink.tryEmitNext("3");  / / returned EmitResult is OK
    sink.tryEmitNext("4");  / / returned EmitResult is OK
    sink.tryEmitNext("5");  / / returned EmitResult is OK 

   
    subscriber.request(4); // element 5 is delivered                   

and

   BaseSubscriber<String> subscriber = new BaseSubscriber<String>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            subscription.request(1);
        }
    };
   Sinks.Many<String> sink = Sinks.many()
                                   .replay() // we use the old linked list based replay buffer
                                  // nothing selected so we need to preserve old behaviour 
                                   .latest();
                                   
    sink.asFlux().log("SUB1").subscribe(subscriber);

    System.out.println("sending first");

    sink.tryEmitNext("1");

    System.out.println("sending more");

    sink.tryEmitNext("2");  / / returned EmitResult is OK
    sink.tryEmitNext("3");  / / returned EmitResult is OK
    sink.tryEmitNext("4");  / / returned EmitResult is OK
    sink.tryEmitNext("5");  / / returned EmitResult is OK 

   
    subscriber.request(4); // element 2, 3, 4, 5 are delivered                   

Would this solution work for you?
If so, we can close the original issue and open a new one with the specification expressed above.
Also, are you interested in opening new PR and add required spec changes (in such a case we don't need a separate issue)?

Thanks,
Oleh

@MikkelHJuul
Copy link
Author

MikkelHJuul commented Jan 26, 2024

Hi @OlegDokuka

Thanks, I didn't actually catch that difference.

I'm not sure that any performance/memory differences is worth keeping the singleton implementation over the array implementation. We need to decide on that.

And I guess we should go with the fixes for the add method that makes the SizeBoundReplayBuffer (and timed) safe for concurrent use at least (and maybe explain how the unsafe API actually behaves - latest element always emitted to all subscribers but elements emitted concurrently may not be visible to all subscribers - for the array buffer)

The explanation above further explains why the SizeBoundReplayBuffer fairs comparatively well in the concurrent example; it only needs to replay once and will always replay all remaining elements. For the array based implementation, each #add competes with #replay. We could fix this btw if the added item also is added in the call to replay (method parameter); then all added items must be replayed, albeit with probable loss of ordering.

Looking at the two proposed API changes the latter (bestEffort) is in line with what I expect; in fact the former is not easily achieveable; the wrapping handler would have to verify demand before hitting the buffer
(Edit: I guess that was what the SinkBestEffort wrapper does)

Also, I suppose I need to add a similar implementation for the SizeAndTimeBound implementation?

On a side note. I have been running the SingletonReplayBuffer in our production environment since November now, in unsafe mode (we only care for latest in case of multiple rapid updates). (We have 3 million instances, saving about 200-300MB in bookkeeping - and one of these buffers are known to hold a reference to a 700MB string that is updated once in a while so this really helped reduce memory for us)

@MikkelHJuul
Copy link
Author

Should any changes be made available via the Flux#cache or Flux#replay. We should decide on that as well, I suppose.

@MikkelHJuul
Copy link
Author

I added a PR to fix the current implementations. We could also proceed with the enhancements in another ticket

@MikkelHJuul
Copy link
Author

@OlegDokuka - I have slowly started work on this. Please review the referenced PR. It should be fully compatible with current behavior, but adds safe concurrency.

@chemicL
Copy link
Member

chemicL commented Jul 17, 2024

Hey, @MikkelHJuul. I realize it has been a while since your last two PRs were opened. I am just now starting to look into this issue. A few things that are not super clear to me and we need a plan going forward.

I notice the implementation in RxJava is different than in Reactor currently and according to @akarnokd it allows to avoid storing the unnecessary item. Link to the PR with an explanation of the trade-offs. Can you explain:

  • Why we shouldn't port the changes from RxJava to Reactor and instead work on improving the concurrency of FluxReplay?
  • Also, why a separate constant-size-buffer based implementation with "best effort" semantics is needed?

Thanks in advance for following up.

@MikkelHJuul
Copy link
Author

Hey, @MikkelHJuul. I realize it has been a while since your last two PRs were opened. I am just now starting to look into this issue. A few things that are not super clear to me and we need a plan going forward.

I notice the implementation in RxJava is different than in Reactor currently and according to @akarnokd it allows to avoid storing the unnecessary item. Link to the PR with an explanation of the trade-offs. Can you explain:

  • Why we shouldn't port the changes from RxJava to Reactor and instead work on improving the concurrency of FluxReplay?

  • Also, why a separate constant-size-buffer based implementation with "best effort" semantics is needed?

Thanks in advance for following up.

Hi and thanks for catching up. It has been a while. And I must admit that I was pondering lately if the array based implementation was better than copy-pasting the already implemented version but throwing away the reference from tail on #add. But if there is a simpler way that only removes the dangling reference that could be okay, I suppose. especially if paired with a note that lack of request could lead to a growing buffer.

@chemicL
Copy link
Member

chemicL commented Jul 22, 2024

Definitely. Are you considering a contribution which ports the evolution of the replay-buffer implementations from RxJava into Reactor?

I think that with the current spec, we don't need to make the current linked list implementation thread-safe. I believe the proposed implementation should also yield worse performance results if you do a JMH benchmarks due to the looping and CAS operations. If you are ok with the implementation from RxJava, I'd suggest that we close #3714.

Regarding the array-based best-effort implementation, I think we can also make a pause there and close #3798 as you suggest we'd be able to communicate to users what risks there are with the current linked-list implementation with a proper note in the javadoc. While still the primary concern of holding a reference to a stale value is solved using the eager truncation of the head element at the only cost of always allocating a dummy head once the size limitation is reached.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation This needs more in-depth investigation type/bug A general bug
Projects
None yet
Development

No branches or pull requests

5 participants