Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Serialization of Columns within Parquet RowGroups #7655

Merged
merged 5 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ config_namespace! {
/// for each output file being worked. Higher values can potentially
/// give faster write performance at the cost of higher peak
/// memory consumption
pub max_buffered_batches_per_output_file: usize, default = 2
pub max_buffered_batches_per_output_file: usize, default = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very high

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lowered this one back to 2 with no noticeable impact on performance.


}
}
Expand Down Expand Up @@ -377,12 +377,24 @@ config_namespace! {
pub bloom_filter_ndv: Option<u64>, default = None

/// Controls whether DataFusion will attempt to speed up writing
/// large parquet files by first writing multiple smaller files
/// and then stitching them together into a single large file.
/// This will result in faster write speeds, but higher memory usage.
/// Also currently unsupported are bloom filters and column indexes
/// when single_file_parallelism is enabled.
pub allow_single_file_parallelism: bool, default = false
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
pub allow_single_file_parallelism: bool, default = true

/// If allow_single_file_parallelism=true, this setting allows
/// applying backpressure to prevent working on too many row groups in
/// parallel in case of limited memory or slow I/O speed causing
/// OOM errors. Lowering this number limits memory growth at the cost
/// of potentially slower write speeds.
pub maximum_parallel_row_group_writers: usize, default = 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct in thinking this is still bounded by the input parallelism? Is it worth noting this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be limited based on the queue sizes as described in my other comment.


/// If allow_single_file_parallelism=true, this setting allows
/// applying backpressure to prevent too many RecordBatches building
/// up in memory in case the parallel writers cannot consume them fast
/// enough. Lowering this number limits memory growth at the cost
/// of potentially lower write speeds.
pub maximum_buffered_record_batches_per_stream: usize, default = 200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems extraordinarily high, in order places we buffer up to 1

Copy link
Contributor Author

@devinjdangelo devinjdangelo Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one does have a significant impact on performance if lowered significantly. I spent some time testing and tuning the exact values. Setting max parallel row groups to 2 and maximum_buffered_record_batches_per_stream to 128 allows two row groups to run in parallel. If this is set too low, backpressure will kick in too long before a second row group can be spawned and everything will wait on just 1 rowgroup to write.

                 ┌─────────────────┐        ┌────────────────────────┐
 ExecPlan───────►│ RowGroup1 Queue ├───────►│Parallel Col Serializers│
   │             └─────────────────┘        └────────────────────────┘
   │
   │
   │
   │             ┌─────────────────┐        ┌────────────────────────┐
   └────────────►│RowGroup2 Queue  ├───────►│Parallel Col Serializers│
                 └─────────────────┘        └────────────────────────┘
Once max_rowgroup_rows
Sent to RowGroup1 Queue
Spawn a new Queue with
its own parallel writers

RowGroup2 Queue won't be created until RowGroup1 Queue has received the desired number of rows. The goal is to have two row groups serializing in parallel if RecordBatches are being produced fast enough. For a streaming plan reading from disk, we probably never need more than 2 in parallel. If we are writing already in-memory data on a system with many cores, it is highly beneficial to boost these queue sizes even more so we could have an arbitrarily large number of row groups serializing in parallel.

Copy link
Contributor

@tustvold tustvold Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead rely on input partitioning? Relying on being able to buffer an entire row group in memory as arrow data has caused major issues in the past. I don't feel comfortable merging this as is, as it will lead to a major regression in memory usage.

Edit: do we even really need row group parallelism, we already have parallelism at the file and column level, it seems a tad unnecessary tbh, and comes with major drawbacks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lower the buffer size down to 2 would save a few 100MB of memory, but at the cost of more than double the serialization time. The decision comes down to which is a better default behavior. Imo, most users can spare the memory for the performance benefit, and for those that can't they can always lower the buffer size. We could instead default to a low buffer size (favoring minimum memory usage over execution time) and I could update the docs to suggest increasing the buffer for signficant performance gains on systems with many cores. Here are the numbers I gathered using the script in the description:

Parallel Parquet Writer, Varying Row Group Buffer Sizes

  Buffer Size=2 Buffer Size=64 Buffer Size=128
Execution Time (s) 62.6 35.6 25.02
Peak Memory Usage (MB) 495.0 606.8 712.05

For comparison, the non parallelized writer takes 155s and peak memory usage is 449.7MB for the same task.

Copy link
Contributor

@tustvold tustvold Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the memory usage of these buffered RecordBatch tracked with a MemoryReservation at all?

For context apache/arrow-rs#3871 was the issue from back when ArrowWriter did something similar, and this caused operational problems for us as writers could easily consume GB of memory when writing large files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't added anything special to track with a MemoryReservation. It is just a bounded channel size. Memory usage can't grow without bound, but it can grow up to however large 128 RecordBatches is in memory. So with a particularly wide table or an extra large batch_size setting, I could see it climbing into Gb of memory. If we are concerned about that, we could use buffer_size=2 as the default and leave it up to user's if it is worth the memory/performance tradeoff to increase the buffer size.

It is also true that the numbers above are gathered from a fairly extreme case of writing ~250 million rows to a single parquet file, and you could instead just write 2 or more files in parallel to close the performance gap. For a more reasonably sized ~50million rows and 1.5Gb the gap is smaller, but it is still there:

Parallel Parquet Writer, Varying Row Group Buffer Sizes

  Buffer Size=2 Buffer Size=64 Buffer Size=128
Execution Time (s) 19.76 13.41 13.68
Peak Memory Usage (MB) 272.2 277.2 281.8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I pushed a commit to change the buffer size back down to 2. I think this is a good default in most cases. I called out in the docs that if you have memory to spare and in particular if you are writing out cached in memory data (like a cached data frame) then you will likely benefit significantly from boosting the buffer sizes.

This optimization could perhaps be automatic at some point if we could automatically set the buffer size based on a known memory budget.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think once you start buffering non-trivial numbers of batches, and therefore non-trivial amounts of data in memory, it is important that this is accounted for in the MemoryPool. Ideally we would account everywhere, but for practical reasons we just really need to get the places where non-trivial amounts can build up, so as to prevent OOMs.


}
}
Expand Down
Loading