Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish overload for IAsyncEnumerable #2159

Open
julealgon opened this issue Jul 29, 2024 · 4 comments
Open

Publish overload for IAsyncEnumerable #2159

julealgon opened this issue Jul 29, 2024 · 4 comments

Comments

@julealgon
Copy link

Feature request

I'd like for a new extension to exist with this signature:

public static IAsyncEnumerable<TSource> Publish<TSource>(this IAsyncEnumerable<TSource> source)

That would be the async counterpart of the existing IEnumerable.Publish extension:

Which subcomponent library (Ix, Async.Ix)?

Async,Ix.

Which next library version (i.e., patch, minor or major)?

Minor (feature).

What are the platform(s), environment(s) and related component version(s)?

How commonly is this feature needed (one project, several projects, company-wide, global)?

It is fairly specialized in our case, but at the same time fairly broadly applicable.

Please describe the feature.

We just went through some specific requirement where the capability of grabbing "the next N elements" from an IAsyncEnumerable multiple times based on a parent IAsyncEnumerable would cleanly solve our requirement in an efficient manner.

I came to know that the Publish extensions on IEnumerable allows for something like that by buffering the last enumerated index of the sequence, which allows you to, say, call Take(10) on the sequence, do something with those 10 elements, then call Take(10) again and grab the next 10, without multiple enumeration of the original (and hopefully without too much of a memory overhead).

We would want that same capability but for IAsyncEnumerable.

We have a scenario where a small database has been serialized as CSV files: each CSV file is an entity, and relationships between entities (1:1 or 1:N) are controlled by columns in those CSVs that work like foreign keys. At the same time, this multi-CSV structure has an additional column in all files that keeps track "what entity we are dealing with". For example, if the set of files describes 5 main entities, this column will go from 1 through 5 in values, in that order.

For example:

vehicle.csv (main entity)

entity_num vehicle_id vin ...
1 4512 WP0CA29972S650104 ...
2 9102 1GCDC14H5DS161081 ...

engine.csv (N:1 with vehicle)

entity_num vehicle_id engine_type fuel_type ...
1 4512 4 Cylinder Engine Gasoline Fuel ...
1 4512 4 Cylinder Engine Flex Fuel Capability ...
2 9102 V6 Cylinder Engine Gasoline Fuel ...

We need to parse this structure as cleanly and as efficiently as possible. Assuming we have an IAsyncEnumerable for each of these CSV files (using something like CsvHelper, Publish would allow us to do something like:

await foreach (var vehicleRow in vehicleCsv)
{
    Vehicle vehicle = mapper.Map<Vehicle>(vehicleRow);

    await foreach (var engineRow in engineCsv.TakeWhile(e => e.EntityNum == vehicleRow.EntityNum))
    {
        vehicle.Engines.Add(mapper.Map<Engine>(engineRow);
    }

    ... // several other await foreach loops for different child CSVs here

    yield return vehicle;
}

The outside foreach is straightforward as that's the 1:1 parent entity, but the child entities can have multiple rows that correspond to that parent, so we use TakeWhile to read those that are related to this entity based on the running EntityNum value.

Without Publish, this of course doesn't work for anything past the first parent since the TakeWhile call would always be on the beginning of the sequence, and having to add a SkipWhile (or even a Skip) there would mean unneeded multiple enumeration too.

The only other equivalent in my mind for this problem would be to drop to the enumerator level and call MoveNextAsync manually but that makes the code substantially less readable.

@idg10
Copy link
Collaborator

idg10 commented Aug 21, 2024

I came to know that the Publish extensions on IEnumerable allows for something like that by buffering the last enumerated index of the sequence, which allows you to, say, call Take(10) on the sequence, do something with those 10 elements, then call Take(10) again and grab the next 10, without multiple enumeration of the original (and hopefully without too much of a memory overhead).

This isn't a part of the code I'm deeply familiar with, but having just looked at it now, the IEnumerable<T> version does indeed appears to do what you want. In fact the Publish behaviour is a little more complex because it also supports multiple concurrent subscribers: if 2 or more callers call GetEnumerator before either calls MoveNext, Publish will buffer items retrieved from the underlying source so it can supply each item to all subscribers while fetching each item from the underlying source only once.

This might be behind your "hopefully without too much of a memory overhead" concern. This buffering only occurs for elements retrieved from the source after any particular subscriber called GetEnumerator, so it won't hold onto any elements indefinitely. The basic model is that each time it retrieves an item from the source, it asks itself "How many subscribers are there right now?", and then once that many subscribers have retrieved the item, it discards that item from the buffer so it won't continue to consume memory.

So in the scenario you describe, it shouldn't actually hold onto anything because there is only one enumerator active at any one time.

I've been trying to work out if there's some reason this was not implemented for IAsyncEnumerable<T>. There seem to be a few missing operators that are in normal Ix:

  • Case
  • Create
  • DoWhile
  • For
  • Hide
  • If
  • Min/MaxByWithTies
  • Memoize
  • While

Memoize seems quite closely related to Publish. (The main difference is that with Memoize you can tell it up front how many readers you expect, and they don't all need to call GetEnumerator up front.) Other than that I don't see an obvious pattern here.

The reason I'm looking at all the gaps is to see if there's some overarching theme here. I think if we were to add Publish we should add Memoize too. There's also a (somewhat weaker) argument that if we're filling in gaps between System.Interactive and System.Interactive.Async it would be better do to it all in one version instead of in dribs and drabs.

However, since I don't think anyone has asked for any of these before the demand clearly isn't all that strong... So it might be best to look at adding just Publish and Memoize.

The one implementation concern I can see that does unite these two methods is that we'd need an async-friendly lock. The IEnumerable<T> implementations of these both use the C# lock keyword, and call _source.MoveNext() while holding the lock. That wouldn't work in an async implementation because you'd need to use await _source.MoveNextAsync(), and you can't await inside a lock block. So we'd need something like the AsyncLock that Rx has. (AsyncLock is actually a public type of System.Reactive (although I kind of wish it wasn't) but I don't want to force a dependency on System.Reactive just so we can use it.)

This does make me wonder if this was why these got left out: holding locks in async code is potentially problematic. And this in turn leads to the question of whether just producing async version of exactly the same code would really be good enough. In your scenario, there won't be any contention so a naive approach to locking would be fine. But the existing non-async Publish is designed to support multiple concurrently active enumerators, and an async equivalent should do the same, but expectations around concurrent usage tend to be different with async code. We'd need to think carefully about how to handle locking for such scenarios.

@julealgon
Copy link
Author

This might be behind your "hopefully without too much of a memory overhead" concern. This buffering only occurs for elements retrieved from the source after any particular subscriber called GetEnumerator, so it won't hold onto any elements indefinitely. The basic model is that each time it retrieves an item from the source, it asks itself "How many subscribers are there right now?", and then once that many subscribers have retrieved the item, it discards that item from the buffer so it won't continue to consume memory.

So in the scenario you describe, it shouldn't actually hold onto anything because there is only one enumerator active at any one time.

That would be absolutely perfect! I assume it keeps track of the last executed position then, to be able to continue from there?

The reason I'm looking at all the gaps is to see if there's some overarching theme here. I think if we were to all Publish we should add Memoize too. There's also a (somewhat weaker) argument that if we're filling in gaps between System.Interactive and System.Interactive.Async it would be better do to it all in one version instead of in dribs and drabs.

Would definitely be nice to have full coverage of all operators for IAsyncEnumerable, but you have a fairly constrained scenario with not too many people to work on this, right? That might be a limiting factor that could prevent any work from being done if we want to have full parity.

However, since I don't think anyone has asked for any of these before the demand clearly isn't all that strong... So it might be best to look at adding just Publish and Memoize.

Adoption of IAsyncEnumerable has been quite slow from what I gather. We ourselves have a 20+ year codebase and we just started finally moving some of our Task<IEnumerable<... code into IAsyncEnumerable as we finally started to move into modern C#/.NET features in the .NET472 codebase. I assume the fact that EFCore also doesn't have direct integration with IAsyncEnumerable also contributes to adoption being reduced.

The one implementation concern I can see that does unite these two methods is that we'd need an async-friendly lock. The IEnumerable<T> implementations of these both use the C# lock keyword, and call _source.MoveNext() while holding the lock. That wouldn't work in an async implementation because you'd need to use await _source.MoveNextAsync(), and you can't await inside a lock block. So we'd need something like the AsyncLock that Rx has. (AsyncLock is actually a public type of System.Reactive (although I kind of wish it wasn't) but I don't want to force a dependency on System.Reactive just so we can use it.)

I see. That makes sense. And I saw your discussion with the .NET team on this regard in another issue. I agree with you that it would make the most sense that this async lock be hosted in BCL and not in a library. And I would understand if you wanted to wait for that to happen before tackling this particular work here.

This does make me wonder if this was why these got left out: holding locks in async code is potentially problematic. And this in turn leads to the question of whether just producing async version of exactly the same code would really be good enough. In your scenario, there won't be any contention so a naive approach to locking would be fine. But the existing non-async Publish is designed to support multiple concurrently active enumerators, and an async equivalent should do the same, but expectations around concurrent usage tend to be different with async code. We'd need to think carefully about how to handle locking for such scenarios.

Totally fair. I would not like to see any drastic differences in behavior between the sync and async counterparts of the same operator even if in my particular case, the more limited feature-set would suffice. I'm also with you here.

The only other thing that came to my mind while reading your answer describing the features of Publish, was if there wouldn't be a more basic or "core" behavior that could be a different operator... something that would be simpler and tailored to the situation I described, when you want to keep iterating from where the last iteration left off but not considering multiple consumers. Perhaps such simplified version of Publish (likely called something else) could be doable with current constraints?

I opened this issue when I just went through a feature on our codebase that would benefit from it, but it is not the first time I've wanted to be able to "continue iterating from the last iteration position", it's just that the other times I needed it, I ended up changing the logic to work around it somehow.

@idg10
Copy link
Collaborator

idg10 commented Aug 29, 2024

you have a fairly constrained scenario with not too many people to work on this, right?

Yes. It's mostly me, and not full time.

I would not like to see any drastic differences in behavior between the sync and async counterparts of the same operator

I wasn't envisaging differences in logical behaviour. It was more that I'm thinking it might need a different implementation strategy.

This is actually an instance of a larger problem with introducing async in Rx and Ix. In the classic non-async Rx the usual assumption is that calls to OnNext should complete pretty quickly. It is possible to push literally millions of messages per second through Rx processing chains, and this is possible because the basic assumption in all of the operators is that messages will be processed pretty quickly. This enables us to use pretty simple locking approaches that assume that contention is not the normal case, and they perform well when there is minimal contention. But if someone wants an async-capable version of Rx, it implies that they're expecting to perform operations that can't complete quickly. So the usage patterns are most likely going to be different. The locking strategies used inside Rx (and Ix for that matter) might no longer be a good fit.

There's a sort of related issue over in Reaqtor. We have no async support there, and it's mainly because in that world we need to be able to checkpoint the state of operators (to provide reliable persistence, including the ability to migrate an Rx subscription from one machine to another). The way that works today is that we are able to bring everything to a complete halt briefly by temporarily stopping processing input. Because there's no async, and because the basic assumption pervading everything in Reaqtor is that we avoid blocking as much as possible, everything comes to a halt very quickly, we can make our checkpoint, and then resume without an obvious gap in service. But once you introduce async, it seems likely that this strategy won't work (because if you need async it's presumably because you can't actually avoid blocking).

(If you're familiar with Reaqtor this might have you raising an eyebrow because it does in fact define various async versions of the Rx interfaces. However, we actually rewrite those subscriptions. We support async because when a client outside of the Reaqtor query engine kicks off a subscription, that's a remote operation so it needs to be async, but when we materialize the subscription inside the query engine, we actually rewrite it at runtime to be a non-async one.)

The libraries in this repo don't need to worry about that checkpointing problem. However, there are plenty of cases where we currently hold locks in the non-async version while performing steps that need to be awaited in the async world, and with each of these, it's important to ask whether expectations user code is likely to have (as a result of us offering async versions) mean that holding locks while waiting for those operations to complete will no longer be viable.

That's not to say I'm not going to do this, just that it might not be good enough to copy and paste in the existing Publish and just sprinkle await throughout. (That alone won't work because the compiler will then refuse to compile code that performs an await inside a lock, but just replacing all of those with some kind of async lock won't necessarily be good enough.)

@julealgon
Copy link
Author

Yes. It's mostly me, and not full time.

I see. Yeah... that's harsh. You are in an even worse position than the OData team then. I wish you the best and hope more folks will join you on this project if only for the selfish reason that I love Rx in general 😆.

I would not like to see any drastic differences in behavior between the sync and async counterparts of the same operator

I wasn't envisaging differences in logical behaviour. It was more that I'm thinking it might need a different implementation strategy.

Oh, to be clear, I wasn't implying you'd end up with different behaviors. I was basically agreeing with your initial statement that the operations should work consistently.

(If you're familiar with Reaqtor this might have you raising an eyebrow because it does in fact define various async versions of the Rx interfaces. However, we actually rewrite those subscriptions. We support async because when a client outside of the Reaqtor query engine kicks off a subscription, that's a remote operation so it needs to be async, but when we materialize the subscription inside the query engine, we actually rewrite it at runtime to be a non-async one.)

This is fascinating to hear. Even though I'm not super familiar with Reaqtor, I'm aware of its existence, how complex it is and what it can be used for. Thanks for sharing this interesting tidbit. Makes me wonder if perhaps what it needs is some sort of "durable task" shenanigans that durable azure functions use which basically "override" the way async/await works. But that's probably off-topic here.

Thanks for your insights.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants