Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug in Merge for USE_FAIR_AND_CHEAPER_MERGE #2134

Open
fabianoliver opened this issue Jun 30, 2024 · 4 comments
Open

Bug in Merge for USE_FAIR_AND_CHEAPER_MERGE #2134

fabianoliver opened this issue Jun 30, 2024 · 4 comments

Comments

@fabianoliver
Copy link

fabianoliver commented Jun 30, 2024

I think the Merge implementation when using USE_FAIR_AND_CHEAPER_MERGE may be incorrect.

Consider this (fairly bad) unit test; this does not throw an AggregateException with an inner "test"-Exception, but rather the following:

Exit code is -532462766 (Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.OnCompleted(Action`1 continuation, Object state, Int16 token, ValueTaskSourceOnCompletedFlags flags)
   at TestProject1.Tests.DontThrow()+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.OnCompleted()
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1.AwaitUnsafeOnCompleted[TAwaiter](TAwaiter& awaiter, IAsyncStateMachineBox box)
--- End of stack trace from previous location ---
   at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
   at System.Threading.QueueUserWorkItemCallback.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart())
    [Test]
    public async Task Test()
    {
        var observables = new[] { DontThrow(), Throw() };
        var merged = AsyncEnumerableEx.Merge(observables);

        await foreach (var _ in merged)
        {
            // ignored
        }
    }

    private async IAsyncEnumerable<int> DontThrow()
    {
        await Task.Delay(TimeSpan.FromSeconds(20));
        yield return 1;
    }
    
    private async IAsyncEnumerable<int> Throw()
    {
        await Task.Delay(TimeSpan.FromSeconds(10));
        throw new Exception("test");
        yield break;
    }

I think that is because this implementation may await a given ValueTask up to three times - and worse so, the latter awaits may be on an already completed ValueTask, which to the best of my knowledge is not allowed / causes undefined behaviour (given the backing state machine in the async case may be recycled for other ValueTasks after the first await).

As a simple example, consider the following chain of events. For simplicity, say the input argument to Merge is a array that contains only a single IAsyncEnumerable.

  1. Line 58: MoveNext() is called, and stored in moveNextTasks
  2. Line 61: A WhenAny awaiatble construct is created, which will effectively await this task (by registeding an OnCompleted callback )
  3. Let's say after that happens, the source IAsyncEnumerable now throws an exception
  4. Line 68 not awaits WhenAny, which will return 0 (as our single input IAsyncEnumerable completed by throwing its exception)
  5. In line 73, we now await the already created MoveNext-ValueTask again. This is the very same value task that was already awaited by the WhenAny construct - therefore, we now await an already completed ValueTask for the second time.
  6. Because this await throws, we will exit the block immediately, and therefore not clean up moveNextTasks (as in, we will not replace the existing / awaited ValueTask with anything different)
  7. Finally, we end up in line 146, where we await the same ValueTask once again. So, we're awaiting it for the third time now.

If there's any interest in doing so, I'd be happy to open up a PR and either attempt to fix this, or possibly remove this implementation altogether - depending on whether there's any interest of switching on USE_FAIR_AND_CHEAPER_MERGE for NuGet releases in the foreseeable future?

@idg10
Copy link
Collaborator

idg10 commented Jul 1, 2024

Thanks for the offer of help.

Presumably you're building your own version of Ix? As far as I can tell, that USE_FAIR_AND_CHEAPER_MERGE code is never built into any released NuGet package. The PR in which it was added describes this as a prototype.

I have to admit I've not looked at this part of the code at all so far. I'm not familiar with the problems this was meant to address.

We're trying to bring in a practice of quantifying the effects of performance-related changes, as we did in the changes to fix #2005 so if we were going to do this, I'd want to begin by adding (or finding existing) benchmarks that characterise the performance of the relevant code under the anticipated workloads, and then run those benchmarks before and after the change.

Is that something you'd be prepared to do as part of opening up a PR?

If not, do you have specific scenarios in mind that this change would address?

@fabianoliver
Copy link
Author

Hi,

I'm not familiar with the problems this was meant to address.

I don't really want to speak for the original authors here of course, but I think it boils down to two things:

Fairness
In case more than 1 enumerable have their next item readily available. Say we have something like this:

AsyncEnumerableEx.Merge([AsyncEnumerable.Repeat("left", int.MaxValue), AsyncEnumerable.Repeat("right", int.MaxValue)])

In the current implementation, the merged observable would pretty much only ever yield items from its left source.
In the new implementation (bugs aside), it would alternate between left/right

Efficiency
The current implementation converts each MoveNext call from ValueTask into a Task, plus another Task for each iteration (WhenAny) - whereas the new implementation stays squarely in the ValueTask-world. I'd imagine the new implementation would have a measurable edge in terms of allocations in particular if the source sequences have many synchronously available items.

It's probably worth considering two things though:

  1. The change in terms of fairness behaviour could be considered a breaking change. It's certainly not impossible that existing users rely on earlier-appearing sources to be given preference over later sources. Which makes me think that changing the existing function is risky - maybe another new variation of the Merge operator would be better to avoid breaking anyone? There's already more than just a single Merge operator right now though, so that would probably add to the overall confusion for users to choose the right one
  2. In terms of performance gains, quite hard to judge without proper benchmarks, but I'd imagine the overall improvement between Tasks & ValueTasks won't be anywhere near cases like the O(n^2) you mentioned

With that in mind, happy to hear your thoughts. If there's still interest in this, I can try coming up with a PR & benchmarks.
Otherwise, maybe it's still worth a thought if the old code should be removed for the time being - just to stop unsuspecting users (such as myself :-) ) to compile themselves and/or extract it out, given it doesn't seem to be entirely correct?

@idg10
Copy link
Collaborator

idg10 commented Jul 4, 2024

I started looking into this a little further, and I now think there may be an additional problem. The code includes this line:

var whenAny = TaskExt.WhenAny(moveNextTasks);

which is a ValueTask-based equivalent of Task.WhenAny. My first thought was to wonder whether such a thing had been added to a recent version of .NET yet. And that was when I found this comment from Stephen Toub:

I don't see a good way to implement WhenAny; ValueTasks can only be awaited once, and doesn't support unregistration of callbacks, which means using an implementation of WhenAny would leak a callback into the ValueTask and it wouldn't be usable again after that.

If I've understood his argument correctly, there are two critical issues here:

  1. the nature of WhenAny is that you're likely to want to try waiting more than once with most of the tasks you pass in because only one will complete
  2. you can't wait for a ValueTask more than once, so the first time you pass one into WhenAny, you've burned it

Now it does look like TaskExt.WhenAny was written in a way that takes this into account because it doesn't actually work the same way as the Task version: it returns a WhenAnyValueTask<T> object that you keep using until all the tasks are done (and which also allows you to supply new tasks for particular slots, which is what enables it to handle not just multiple value tasks, but multiple sources of sequences of value tasks).

The fact that you can't unregister a completion callback doesn't need to be a problem because this design for WhenAny effectively takes ownership of every value task it sees—there's no expectation that the task will be usable by any other means afterwards. (Probably this shouldn't be called WhenAny because it really looks very different from Task.WhenAny. Perhaps WhenAllIncremental might actually be a better name: ultimately it's going to consume all the tasks, but it's going to let you know when each individual one completes.)

So the argument Stephen Toub presents for why you can't have a ValueTask.WhenAny that works just like Task.WhenAny doesn't have to apply here because this works quite differently from Task.WhenAny.

However, this is the kind of code that's extremely easy to get wrong.

The early shutdown logic (in finally, used when one of the sources throws) worries me, for example. The finally in Merge has this:

// REVIEW: This adds an additional continuation to all of the pending tasks (note that
// whenAny also has registered one). The whenAny object will be collectible

Although it is not literally using await twice on the same ValueTask, it is calling GetAwaiter().OnCompleted(...) and then, in the finally block, calling await. I think that does amount to awaiting it twice, which is something the docs include in the list of operations that "should never be performed".

That seems like it's probably a bug, although I've been unable to work out from the documentation whether the rule against "awaiting it twice" means you're not allowed to call either GetAwaiter or OnComplete more than once, or merely that as soon as you've called GetResult() on a completed ValueTask awaiter that you now can't do anything more with it. But the fact that https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/ says:

The underlying object expects to work with only a single callback from a single consumer at a time,

does seem to say that adding a second continuation is going to be bad. I've not yet found the exact bit of the documentation that specifically forbids multiple callbacks in particular (rather than multiple awaits), but few people understand this with as much depth as @stephentoub so if he thinks it's something you shouldn't do, I'd say we shouldn't do it...

So that implies that the USE_FAIR_AND_CHEAPER_MERGE code has multiple problems, and would require a deep and thorough review (and the fixing of at least two problems) before we could move over to it.

In your latest reply, you said you're happy to help:

If there's still interest in this

I think you may be the first person to raise this since that experimental code was added over 5 years ago, so I'd have to say that the level of interest is probably low.

Then again, there are multiple // REVIEW comments indicating multiple possible problems with the existing code... So apparently this feature area needs work even if we continue with the current approach. So I'd say the path forward at this point needs to be:

  1. create some benchmarks to quantify the actual performance difference the USE_FAIR_AND_CHEAPER_MERGE approach could make
  2. decide whether to stay with the existing approach, or to use something like USE_FAIR_AND_CHEAPER_MERGE
  3. whichever approach we select, look at all the outstanding // REVIEW comments and address them; if we do attempt to do something clever with ValueTask we should perform some kind of formal analysis so that we can be confident that the approach take really does abide by all of the rules for value tasks

So on that basis, if you are happy to create a PR with benchmarks as the first step, that would be awesome. Equally, if you decide on the basis that you're apparently the first person to ask about this in over half a decade, that this might not be worth your time, that would be wholly understandable!

If you do decide to proceed, be aware that right now there are a load of problems building Ix.NET on .NET SDK 8.0. #2135 addresses these, but as I write this it is still open. We're hoping to merge it in the next day or two, but if you're looking at doing any dev work at all on Ix.NET, I'd recommend waiting until that's done.

@fabianoliver
Copy link
Author

Thanks for having a look at this.

I absolutely agree with all of what you said, including the ambiguities of "don't await twice" and what that exactly boils down to in terms of awaiters, and the apparent dodgyness of the current implementation adding multiple continuations.

The current sketch I'd have in mind is to ensure the Merge operator only ever awaits the WhenAny object, including in its finally block - and never one of the MoveNext ValueTasks directly.

I believe the finally block just needs to ensure that no MoveNext is in-flight anymore, so awaiting WhenAny until all tasks are done should be enough (though bit of extra logic might be needed if wanting to maintain the current deterministic order of DisposeAsync calls).
And as for yielding results, once we've awaited whenAny successfully, we know which MoveNext task has completed and should be able to access its result (or exception) safely & guaranteed without blocking via .Result, rather than having to await again (I think..).

Definitely agree this would ultimately need a very thorough review!

So on that basis, if you are happy to create a PR with benchmarks as the first step, that would be awesome. Equally, if you decide on the basis that you're apparently the first person to ask about this in over half a decade, that this might not be worth your time, that would be wholly understandable!

If you do decide to proceed, be aware that right now there are a load of problems building Ix.NET on .NET SDK 8.0. #2135 addresses these, but as I write this it is still open. We're hoping to merge it in the next day or two, but if you're looking at doing any dev work at all on Ix.NET, I'd recommend waiting until that's done.

That probably works in my favor, because it'll likely take me a little while to get around to this :)
I'll hopefully be able to free up some time over the next weeks to try having a shot at this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants