Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][POC] Add ResourceBarrier expression to change resources within an expression graph #1116

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Aug 2, 2024

Proposal

We should make it possible to define explicit resource barriers within a query.

Version 2 (Current)

For example:

# IO phase: Use cpu workers
df = dd.read_parquet("s3://my-bucket/my-dataset", resources={'CPU': 1})

# Compute phase: Convert to cudf and use GPU
# (Only using mean as a simple example)
df.resource_barrier({'GPU': 1}).to_backend("cudf").mean()

Here, we add a resources argument to IO operations (like read_parquet), and add a distinct resource_barrier API to apply resource-constraint changes after a collection has already been created.

Under the hood, we use a ResourceBarrier expression to ensure that the resource constraints won't get lost or broken during optimization. Also, we assume:

  • Resource constraints should be inherited by later operations
  • ResourceBarrier expressions should not be fused with adjacent expressions
  • Column projection and filtering is resource agnostic (thus allowing those expression to pass through)

If this general approach is palatable, the to_backend API can be updated to cover the common case that the backend and resource-constraints are often changed at the same time. E.g.

df.to_backend("cudf", resources={'GPU': 1}).mean()

Version 1 (August 1, 2024)

For example:

# IO phase: Use cpu workers
df = dd.read_parquet("s3://my-bucket/my-dataset").resource_barrier({'CPU': 1})

# Compute phase: Convert to cudf and use GPU
# (Only using mean as a simple example)
df.to_backend("cudf").mean().resource_barrier({'GPU': 1})

In this case, calling resource_barrier will add an explicit ResourceBarrier expression to the expression graph. Then, when persist/compute is called, the corresponding resource requirement will be attached to all expressions before that point (stopping only at other ResourceBarrier expressions). This makes it possible to add arbitrary resource barriers within a query without adding any special resource/annotation code to any Expr classes.

The underlying ResourceBarrier expression also allows column-projection and filter operations to pass through without preventing the final resource annotations from being "optimized away."

@fjetter
Copy link
Member

fjetter commented Aug 2, 2024

df.to_backend("cudf").mean().resource_barrier({'GPU': 1})

Shouldn't this be the other way round? df.resource_barrier({"GPU": 1}).to_backend("cudf").mean()

@rjzamora
Copy link
Member Author

rjzamora commented Aug 2, 2024

Shouldn't this be the other way round?

This is actually they way I tried to implement it at first. However, I realized that we would need to add an additional mechanism to set the resources for "root" IO expressions. We cannot rely on global annotations, because we may not want all dependencies to use the same resources.

To put it another way: The current implementation pushes down resources until there are no more dependencies, or another resource barrier is reached. If we want resources to be inherited in the other direction (which seems very reasonable to me), we will need to figure out how to deal with root IO tasks.

Possible solution: We add an optional roots= argument to resource_barrier to specify resource bindings that should be pushed down instead of inherited. E.g. df.resource_barrier({"GPU": 1}, roots={"CPU": 1}).to_backend("cudf").mean()

Any other ideas?

@fjetter
Copy link
Member

fjetter commented Aug 2, 2024

I have to admit that I don't fully understand the problem you are describing. What is a "root IO expression"?

@phofl
Copy link
Collaborator

phofl commented Aug 2, 2024

I find the behaviour that a resource barrier defines the resource up until that point (i.e. for the past) very unintuitive.

One potential solution is that an IO root can consume a resource barrier if it is stacked directly on top of it, but I am not sure if I like this any better tbh.

@rjzamora
Copy link
Member Author

rjzamora commented Aug 2, 2024

I have to admit that I don't fully understand the problem you are describing. What is a "root IO expression"?

Sorry, I'm sort of making up language as I go to fill in the gaps... Calling read_parquet will produce what I'm thinking of as a "root IO" expression. This expression does not have any dependencies. Therefore, it cannot come "after" a ResourceBarrier expression.

@rjzamora
Copy link
Member Author

rjzamora commented Aug 2, 2024

I find the behaviour that a resource barrier defines the resource up until that point (i.e. for the past) very unintuitive.

I completely agree with this.

I do like the idea of using an expression to define the resource barrier, because this makes optimization behavior much easier to define. Perhaps we can make this work if we can come up with an intuitive (and maintainable) way to define the resources for an IO expression.

I think we are in agreement about the direction of resource inheritance. I'll keep thinking about the root-IO problem.

@rjzamora
Copy link
Member Author

rjzamora commented Aug 2, 2024

Update: I revised the direction of resource inheritance to be more intuitive. I also decided that it would be relatively easy to add a simple ResourceBarrier dependency to IO operations (like read_parquet). I think the simplicity of the ResouceBarrier approach makes up for the need to add a resource argument to some IO functions, but I may be biased.

@fjetter
Copy link
Member

fjetter commented Aug 5, 2024

I'm not entirely sold on the "resources as expr" approach.

  • Semantics are a little unintuitive
  • Expressions are so far descriptions of what should be computed. There are no "meta expressions". Similar to a SQL expression they are describing what should be done and not how. I worry that the introduction of a "HOW" in the expression graph will make this more complicated down the line.
  • I currently don't see how this will make optimization behavior easier to define. I guess this strongly depends on the perspective and what the alternative would look like. Also, I currently don't have a good understanding of what behavior we would even have to implement. I feel like we should discuss how resources should behave before we iterate on an implementation

@fjetter
Copy link
Member

fjetter commented Aug 5, 2024

Another question I have is whether the Expression or the graph itself is even the best (or even just a good) place to define behavior like this.

In the end you want...

# The OR clauses are just different flavors of where and how this could be defined
if task.is_io_task() or isinstance(expr, IO) or isinstance(group, IOTask) or stuff := collection.get_io_things():
    dead_scheduler_please_try_to_run_this_on_CPUs(task, expr, group, stuff)
elif task.uses_gpu() ...:
    this_actually_requires_a_GPU(task, expr, group, collection)
...

We're used to think about this problem on a per-task or Layer basis and implemented the earlier version as part of the HLG infrastructure. We're now trying to move this to the expressions but I wonder if this isn't the wrong approach. Especially the CPU vs GPU question doesn't feel at all like a "graph or expression problem" at all but rather a "what function are we actually running" problem so dask/dask#9969 or something similar might be a better choice.
For instance, there is already some detection in the worker (with dask/dask#11248 on the task itself) that distinguishes coroutine functions from ordinary functions, i.e. they encode a notion of "HOW".

I'm not saying that we should use task/runspecs to encode this knowledge but I would like us to at least consider options that don't necessarily involve expressions and the optimizer

@jacobtomlinson
Copy link
Member

One thing I've not seem mentioned here is repartitioning. A common use case we are keen to improve is reading data from S3 and performing heavy compute on it.

Cloud object stores are typically most performant when you chunk the data into partitions of the order of 100MB.

GPU compute is most performant when operating on larger partitions that can utilize GPU memory.

So to get the best throughput we want to have the CPU reading lots of small partitions from S3 and then concatenating them into larger partitions on the GPU ready for compute.

I wonder if the repartition step is a common place where resource barriers could be added?

@phofl
Copy link
Collaborator

phofl commented Aug 6, 2024

FWIW dask-expr supports concatenating multiple partitions directly in the read-parquet step, this gets rid of a lot of network transfer that would be necessary if you use repartition and also hides the complexity from the scheduler

You can configure this with dataframe.parquet.minimum-partition-size, default is approx 75MB

@jacobtomlinson
Copy link
Member

Thansk @phofl. If my parquet is stored as 100MB partitions and I want to group them into 1GB partitions on the GPU does each native partition get read in parallel, and then concatenated in the next step? Or are they read sequentially?

@phofl
Copy link
Collaborator

phofl commented Aug 6, 2024

They are read sequentially in a single task. So using this configuration doesn't make sense if you have "nr-gpu-partitions < threads on cluster". If "nr-of-gpu-partitions > (by quite a bit) threads-on-cluster" then the option will be a lot better than repartitioning

@jacobtomlinson
Copy link
Member

They are read sequentially in a single task.

So this is something we want to avoid because it will not saturate S3 patiularly well when using large GPU partitions. We want all the CPU cores to read the native partition sizes in parallel, then concatenate them onto the GPU from main memory.

@rjzamora
Copy link
Member Author

Thanks for the feedback here!

@fjetter - You are skeptical that we should use an expression (or graph) to define resource requirements. I like your idea of defining this kind of relationship at the function/task level instead. I also agree that a general resource specification describes more-so "how" things" should be computed than "what" should be computed.

With that said, CPU<->GPU movement is typically initiated with a to_backend call (or similar). Therefore, the resource-change I am focused on is absolutely both a "what" and a "how". In practice, we will typically want the CPU/GPU resources of a task to be the same as it's dependencies unless the data has been explicitly moved to a different backend.

Given this perspective, maybe it is only the backend movement that should be defined at the expression level (similar to #1115)?

I currently don't see how this will make optimization behavior easier to define... Also, I currently don't have a good understanding of what behavior we would even have to implement.

It is not currently possible to read_parquet/read_csv with the "pandas" backend on many CPU-only workers and then convert to the "cudf" backend on a smaller number of GPU-bound Dask-CUDA workers without persisting in-between. You can consider this simple workflow to be my primary motivation. I essentially want to be able change the resource requirements for all tasks created after a to_backend call. Note that I may only be using the pandas backend to perform IO, but I may us it for other light-wight ETL before moving to the "cudf" backend, and I may want to move back to the "pandas" backend for output IO.

They are read sequentially in a single task.

So this is something we want to avoid because it will not saturate S3 patiularly well when using large GPU partitions. We want all the CPU cores to read the native partition sizes in parallel, then concatenate them onto the GPU from main memory.

@jacobtomlinson - The optimization @phofl is describing improves overall ELT performance for TPC-H using the "pandas" backend, because they still have many threads reading from s3 at once (even though each thread is also iterating over multiple files to avoid network transfer after the fact). For dask-cudf, we only have one thread per GPU, and so we get lower S3 throughput. We are currently looking into a multi-threaded Cpp-level solution for this in RAPIDS, but using distinct resources in Dask also seems like a very practical solution to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants