Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make meta calculcation for merge more efficient #284

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

phofl
Copy link
Collaborator

@phofl phofl commented Aug 30, 2023

The non-empty nature of the meta creation is a bottleneck. Avoiding these repeated calculations cuts the optimization time of some of the tpch queries by 60 percent (excluding the read from remote storage part)

sits on top of #283

@phofl phofl requested a review from rjzamora August 30, 2023 10:40
@@ -37,6 +37,7 @@ class Merge(Expr):
"suffixes",
"indicator",
"shuffle_backend",
"_meta",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me nervous. I think that it's a good principle to only have necessary operands. Any derived state should be computed.

For example, if an optimization were to change some parameter here, like suffixes or something, I wouldn't want to worry about also modifying meta at the same time. It's nice to be able to rely on this invariant across the project.

If we want to include some other state in a custom constructor I would be more ok with that (although still nervous). In that case I'd want to make sure that the constructor always passed type(self)(*self.operands) == self

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside with this is that we are stuck with the repeated computation, caching objects and so on won't help here, since meta will change, e.g. caching is not useful. We genuinely change the object when we re-create it, which means that we will always trigger a fresh computation of meta. Which by itself isn't bad, but non-empty meta computations are relatively expensive (empty meta won't work here).

For example, if an optimization were to change some parameter here, like suffixes or something, I wouldn't want to worry about also modifying meta at the same time. It's nice to be able to rely on this invariant across the project

We can simply not pass meta in this case which would trigger a fresh computation, this is only a fast path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We genuinely change the object when we re-create it, which means that we will always trigger a fresh computation of meta

If the inputs to the pd.merge call are going to change then it seems like we need to recompute meta anyway. If the inputs aren't changing but we're recomputing then maybe that is a sign that we're caching on the wrong things. Maybe we should have a staticfunction or something instead.

We can simply not pass meta in this case which would trigger a fresh computation, this is only a fast path

Imagine an optimization which did something like the following:

def subs(obj, old, new):
    operands = [
        new if operand == old else operand  # substitute old for new in all operands
        for operand in obj.operands
    ]
    return type(obj)(*operands)

This fails if we store derived state in operands, because _meta is in there and really we should choose to not include it any more.

I'm ok keeping derived state on the class. I just don't think that it should be in operands. This probably requires custom constructors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are optimisations where meta changes but we compute the necessary information anyway, so we can simply adjust meta if we want to, Projections are a good example for this.

Lower is an example where meta won't change in case of merge, but you can't properly cache it either. We might introduce a shuffle which means caching will fail for most cases.

I'll add a custom constructor here that should work as well. Then we can see how that looks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lower is an example where meta won't change in case of merge, but you can't properly cache it either. We might introduce a shuffle which means caching will fail for most cases.

What about caching not on the object, but somewhere else? That way anyone that asks "what is the meta for these inputs?" will get the cached result, regardless of what object they're calling from? (this was my staticmethod suggestion from above)

Copy link
Collaborator Author

@phofl phofl Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my comment wasn't clear enough probably. That's how I understood what you were saying, but the problem is as follows:

  • That won't work if we introduce a Projection (which is something we could accept, although that wouldn't make me happy in this particular case)
  • Meta won't change when we lower the graph, but we will introduce a shuffle, so the inputs of the merge computation will change while lowering, which means that the cache wouldn't work anymore

Caching won't help in either of these two cases, which is where most of the time is spent unfortunately

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like we're maybe talking past each other here. Happy to chat live if you're free later.

Comment on lines +52 to +54
def __init__(self, *args, _precomputed_meta=None, **kwargs):
super().__init__(*args, **kwargs)
self._precomputed_meta = _precomputed_meta
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would strongly prefer if we use a global key-value cache in the same way we cache dataset info for parquet. In fact, we should probably formalize this caching approach to avoid repeating the same kind of logic in multiple places.

It seems like a unique meta depends on a token like...

    @functools.cached_property
    def _meta_cache_token(self):
        return _tokenize_deterministic(
            self.left._meta,
            self.right._meta,
            self.how,
            self.left_on,
            self.right_on,
            self.left_index,
            self.right_index,
            self.suffixes,
            self.indicator,
        )

If self.left._meta or self.right._meta were to change (due to column projection), we would need to recalculate meta anyway. However, if the Merge object was responsible for pushing down the column projection, we could always update the cache within the simplify logic (since we would already know how the meta needs to change).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t see a global need for this yet. The slowdown in merge goes back to the nonempty meta objects, not the actual computation on empty objects.

some of the operations in Lower have side effects, which makes adjusting the meta objects of left and right bothersome and complicated.

I am open to adjusting the implementation if we run into this in more places, but as long as we need it only for merge I’d prefer this solution since we keep the complexity in here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw I am also not too big of a fan of relying on meta in hashes, there are too many things in pandas that might mutate this unexpectedly, which would break this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I don't really mind if we implement stand-alone caching logic in _merge.py for now. The thing I'm unsure about in this PR is that we are overriding __init__ so that we can effectively cache _meta without adding _meta as an official operand.

It may be the case that this is exactly how we should be attacking this problem in Merge (and maybe everywhere). For example, maybe we will eventually have a special known_meta= kwarg in Expr, which all expression objects could leverage. However, since it is not a proper operand, this mechanism feels a bit confusing and fragile to me.

The slowdown in merge goes back to the nonempty meta objects, not the actual computation on empty objects.

I don't think I understand your point here. Either way we are effectively caching the output of _meta, no?

some of the operations in Lower have side effects, which makes adjusting the meta objects of left and right bothersome and complicated.

I don't see how this is any different for _precomputed_meta? In any case where you are confident defining _precomputed_meta, you could also just add the "lowered" object to the global cache before returning it.

Copy link
Member

@rjzamora rjzamora Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw I am also not too big of a fan of relying on meta in hashes, there are too many things in pandas that might mutate this unexpectedly, which would break this

Interesting. I'd say that should be a fundamental concern for dask-expr in then. What would be then most reliable way to hash the schema?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand your point here. Either way we are effectively caching the output of _meta, no?

Yes but the nature of the slowdown makes me think that we need it only in merge and not in other places as of now. I am open to rewriting this here as well if this turns out differently.

I don't see how this is any different for _precomputed_meta? In any case where you are confident defining _precomputed_meta, you could also just add the "lowered" object to the global cache before returning it.

pandas has some caveats that might change your dtype in meta but not on the actual df. Relying on the initial meta seems saver to me. But this might also be totally wrong.

To clarify, I don't really mind if we implement stand-alone caching logic in _merge.py for now. The thing I'm unsure about in this PR is that we are overriding init so that we can effectively cache _meta without adding _meta as an official operand.

@mrocklin and I chatted offline and landed on this solution. One motivating factor was the last part here: #284 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I'd say that should be a fundamental concern for dask-expr in then. What would be then most reliable way to hash the schema?

I don't have a good answer for that. Meta is still the best bet, but it has some limitations. This will get more stable in the future since we are deprecating all of these caveats at the moment.

Copy link
Member

@rjzamora rjzamora Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but the nature of the slowdown makes me think that we need it only in merge and not in other places as of now. I am open to rewriting this here as well if this turns out differently.

Okay, I think you are are talking about the decision not to write general Expr-wide caching code here, which is all good with me. I was only thinking about the decision to use _precomputed_meta instead of a simple k/v cache.

Possible problems with the k/v cache approach:

  • The meta-hashing issue you mentioned
  • We would be keeping the cached meta in memory even after we need it (also a problem for parquet)

Possible problems with _precomputed_meta:

  • I suppose we are breaking with convention a bit (seems okay)
  • Any substitute_parameters call will drop the information, even if you aren't changing information that is relevant to meta

One motivating factor was the last part here ...

Right, I agree that it would be a mistake to make _precomputed_meta a proper operand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would also have to patch meta of left and right in the HashJoin layer because that adds a column

substitute_parameters parameters is annoying, we could override but that's not great either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for adding all this discussion. I do get that you are focusing on the Merge-specific meta issue at hand.

I'm just doing my best to keep the big picture in mind - It seems like we are going to keep running into cases where we would benefit from caching information outside the Expr object itself. Therefore, it would be nice if we could design a formal system where a collection of different caches can be managed in one place.

That said, I definitely don't think we need to do something like that right now.

We would also have to patch meta of left and right in the HashJoin layer because that adds a column

Right, HashJoinP2P._meta_cache_token does need to drop the hash columns.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants