From 66007da01456df73da2d83ae5d1bdb2095e2bff5 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Mon, 23 Oct 2023 16:54:45 -0400 Subject: [PATCH 1/5] merge main --- datafusion/common/src/config.rs | 26 +- .../src/datasource/file_format/parquet.rs | 525 ++++++++++++------ .../test_files/information_schema.slt | 12 +- docs/source/user-guide/configs.md | 136 ++--- 4 files changed, 446 insertions(+), 253 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6aefa4e05de2..5655d172e871 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -271,7 +271,7 @@ config_namespace! { /// for each output file being worked. Higher values can potentially /// give faster write performance at the cost of higher peak /// memory consumption - pub max_buffered_batches_per_output_file: usize, default = 2 + pub max_buffered_batches_per_output_file: usize, default = 10 } } @@ -377,12 +377,24 @@ config_namespace! { pub bloom_filter_ndv: Option, default = None /// Controls whether DataFusion will attempt to speed up writing - /// large parquet files by first writing multiple smaller files - /// and then stitching them together into a single large file. - /// This will result in faster write speeds, but higher memory usage. - /// Also currently unsupported are bloom filters and column indexes - /// when single_file_parallelism is enabled. - pub allow_single_file_parallelism: bool, default = false + /// parquet files by serializing them in parallel. Each column + /// in each row group in each output file are serialized in parallel + /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns. + pub allow_single_file_parallelism: bool, default = true + + /// If allow_single_file_parallelism=true, this setting allows + /// applying backpressure to prevent working on too many row groups in + /// parallel in case of limited memory or slow I/O speed causing + /// OOM errors. Lowering this number limits memory growth at the cost + /// of potentially slower write speeds. + pub maximum_parallel_row_group_writers: usize, default = 16 + + /// If allow_single_file_parallelism=true, this setting allows + /// applying backpressure to prevent too many RecordBatches building + /// up in memory in case the parallel writers cannot consume them fast + /// enough. Lowering this number limits memory growth at the cost + /// of potentially lower write speeds. + pub maximum_buffered_record_batches_per_stream: usize, default = 200 } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index a16db9d43213..5943fa4c2181 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,11 +17,45 @@ //! Parquet format abstractions +use arrow_array::RecordBatch; +use async_trait::async_trait; +use datafusion_common::stats::Precision; +use datafusion_physical_plan::metrics::MetricsSet; +use parquet::arrow::arrow_writer::{ + compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, + ArrowLeafColumn, +}; +use parquet::file::writer::SerializedFileWriter; +use rand::distributions::DistString; use std::any::Any; use std::fmt; use std::fmt::Debug; +use std::fs::File; use std::io::Write; use std::sync::Arc; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::task::{JoinHandle, JoinSet}; + +use crate::datasource::file_format::file_compression_type::FileCompressionType; +use crate::datasource::statistics::create_max_min_accs; +use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Fields, Schema}; +use bytes::{BufMut, BytesMut}; +use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use futures::{StreamExt, TryStreamExt}; +use hashbrown::HashMap; +use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::{ + arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, +}; +use parquet::file::footer::{decode_footer, decode_metadata}; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; +use parquet::file::statistics::Statistics as ParquetStatistics; +use rand::distributions::Alphanumeric; use super::write::demux::start_demuxer_task; use super::write::{create_writer, AbortableWrite, FileWriterMode}; @@ -32,12 +66,11 @@ use crate::arrow::array::{ use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; -use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::get_col_stats; use crate::datasource::physical_plan::{ FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter, }; -use crate::datasource::statistics::create_max_min_accs; + use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; @@ -47,29 +80,6 @@ use crate::physical_plan::{ Statistics, }; -use arrow::datatypes::{Fields, Schema, SchemaRef}; -use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; -use datafusion_execution::TaskContext; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; -use datafusion_physical_plan::metrics::MetricsSet; - -use async_trait::async_trait; -use bytes::{BufMut, BytesMut}; -use futures::{StreamExt, TryStreamExt}; -use hashbrown::HashMap; -use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter}; -use parquet::column::writer::ColumnCloseResult; -use parquet::file::footer::{decode_footer, decode_metadata}; -use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::WriterProperties; -use parquet::file::statistics::Statistics as ParquetStatistics; -use parquet::file::writer::SerializedFileWriter; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio::task::{JoinHandle, JoinSet}; - /// The Apache Parquet `FileFormat` implementation /// /// Note it is recommended these are instead configured on the [`ConfigOptions`] @@ -675,13 +685,21 @@ impl ParquetSink { &self, num_partitions: usize, object_store: Arc, + single_file_output: bool, ) -> Result>>> { let mut writers = Vec::new(); - for _ in 0..num_partitions { - let file_path = self.config.table_paths[0].prefix(); + let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions { + let file_path = if !single_file_output { + self.config.table_paths[0] + .prefix() + .child(format!("{}_{}.parquet", write_id, part_idx)) + } else { + self.config.table_paths[0].prefix().clone() + }; let object_meta = ObjectMeta { - location: file_path.clone(), + location: file_path, last_modified: chrono::offset::Utc::now(), size: 0, e_tag: None, @@ -726,29 +744,8 @@ impl DataSink for ParquetSink { .runtime_env() .object_store(&self.config.object_store_url)?; - let exec_options = &context.session_config().options().execution; - - let allow_single_file_parallelism = - exec_options.parquet.allow_single_file_parallelism; - - // This is a temporary special case until https://github.com/apache/arrow-datafusion/pull/7655 - // can be pulled in. - if allow_single_file_parallelism && self.config.single_file_output { - let object_store_writer = self - .create_object_store_writers(1, object_store) - .await? - .remove(0); - - let schema_clone = self.config.output_schema.clone(); - return output_single_parquet_file_parallelized( - object_store_writer, - vec![data], - schema_clone, - parquet_props, - ) - .await - .map(|r| r as u64); - } + let parquet_opts = &context.session_config().options().execution.parquet; + let allow_single_file_parallelism = parquet_opts.allow_single_file_parallelism; let part_col = if !self.config.table_partition_cols.is_empty() { Some(self.config.table_partition_cols.clone()) @@ -756,6 +753,12 @@ impl DataSink for ParquetSink { None }; + let parallel_options = ParallelParquetWriterOptions { + max_parallel_row_groups: parquet_opts.maximum_parallel_row_group_writers, + max_buffered_record_batches_per_stream: parquet_opts + .maximum_buffered_record_batches_per_stream, + }; + let (demux_task, mut file_stream_rx) = start_demuxer_task( data, context, @@ -768,8 +771,35 @@ impl DataSink for ParquetSink { let mut file_write_tasks: JoinSet> = JoinSet::new(); while let Some((path, mut rx)) = file_stream_rx.recv().await { - let mut writer = self - .create_async_arrow_writer( + if !allow_single_file_parallelism { + let mut writer = self + .create_async_arrow_writer( + ObjectMeta { + location: path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + } + .into(), + object_store.clone(), + parquet_props.clone(), + ) + .await?; + file_write_tasks.spawn(async move { + let mut row_count = 0; + while let Some(batch) = rx.recv().await { + row_count += batch.num_rows(); + writer.write(&batch).await?; + } + writer.close().await?; + Ok(row_count) + }); + } else { + let writer = create_writer( + FileWriterMode::PutMultipart, + // Parquet files as a whole are never compressed, since they + // manage compressed blocks themselves. + FileCompressionType::UNCOMPRESSED, ObjectMeta { location: path, last_modified: chrono::offset::Utc::now(), @@ -778,19 +808,22 @@ impl DataSink for ParquetSink { } .into(), object_store.clone(), - parquet_props.clone(), ) .await?; - - file_write_tasks.spawn(async move { - let mut row_count = 0; - while let Some(batch) = rx.recv().await { - row_count += batch.num_rows(); - writer.write(&batch).await?; - } - writer.close().await?; - Ok(row_count) - }); + let schema = self.get_writer_schema(); + let props = parquet_props.clone(); + let parallel_options_clone = parallel_options.clone(); + file_write_tasks.spawn(async move { + Ok(output_single_parquet_file_parallelized( + writer, + rx, + schema, + &props, + parallel_options_clone, + ) + .await?) + }); + } } let mut row_count = 0; @@ -823,119 +856,226 @@ impl DataSink for ParquetSink { } } -/// This is the return type when joining subtasks which are serializing parquet files -/// into memory buffers. The first part of the tuple is the parquet bytes and the -/// second is how many rows were written into the file. -type ParquetFileSerializedResult = Result<(Vec, usize), DataFusionError>; +/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them using an [ArrowColumnWriter] +/// Once the channel is exhausted, returns the ArrowColumnWriter. +async fn column_serializer_task( + mut rx: Receiver, + mut writer: ArrowColumnWriter, +) -> Result { + while let Some(col) = rx.recv().await { + writer.write(&col)?; + } + Ok(writer) +} -/// Parallelizes the serialization of a single parquet file, by first serializing N -/// independent RecordBatch streams in parallel to parquet files in memory. Another -/// task then stitches these independent files back together and streams this large -/// single parquet file to an ObjectStore in multiple parts. -async fn output_single_parquet_file_parallelized( - mut object_store_writer: AbortableWrite>, - mut data: Vec, - output_schema: Arc, - parquet_props: &WriterProperties, -) -> Result { - let mut row_count = 0; - // TODO decrease parallelism / buffering: - // https://github.com/apache/arrow-datafusion/issues/7591 - let parallelism = data.len(); - let mut join_handles: Vec> = - Vec::with_capacity(parallelism); - for _ in 0..parallelism { - let buffer: Vec = Vec::new(); - let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new( - buffer, - output_schema.clone(), - Some(parquet_props.clone()), - )?; - let mut data_stream = data.remove(0); - join_handles.push(tokio::spawn(async move { - let mut inner_row_count = 0; - while let Some(batch) = data_stream.next().await.transpose()? { - inner_row_count += batch.num_rows(); - writer.write(&batch)?; - } - let out = writer.into_inner()?; - Ok((out, inner_row_count)) - })) +type ColumnJoinHandle = JoinHandle>; +type ColSender = Sender; +/// Spawns a parallel serialization task for each column +/// Returns join handles for each columns serialization task along with a send channel +/// to send arrow arrays to each serialization task. +fn spawn_column_parallel_row_group_writer( + schema: Arc, + parquet_props: Arc, + max_buffer_size: usize, +) -> Result<(Vec, Vec)> { + let schema_desc = arrow_to_parquet_schema(&schema)?; + let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?; + let num_columns = col_writers.len(); + + let mut col_writer_handles = Vec::with_capacity(num_columns); + let mut col_array_channels = Vec::with_capacity(num_columns); + for writer in col_writers.into_iter() { + // Buffer size of this channel limits the number of arrays queued up for column level serialization + let (send_array, recieve_array) = + mpsc::channel::(max_buffer_size); + col_array_channels.push(send_array); + col_writer_handles + .push(tokio::spawn(column_serializer_task(recieve_array, writer))) + } + + Ok((col_writer_handles, col_array_channels)) +} + +/// Settings related to writing parquet files in parallel +#[derive(Clone)] +struct ParallelParquetWriterOptions { + max_parallel_row_groups: usize, + max_buffered_record_batches_per_stream: usize, +} + +/// This is the return type of calling [ArrowColumnWriter].close() on each column +/// i.e. the Vec of encoded columns which can be appended to a row group +type RBStreamSerializeResult = Result<(Vec, usize)>; + +/// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective +/// parallel column serializers. +async fn send_arrays_to_col_writers( + col_array_channels: &[ColSender], + rb: &RecordBatch, + schema: Arc, +) -> Result<()> { + for (tx, array, field) in col_array_channels + .iter() + .zip(rb.columns()) + .zip(schema.fields()) + .map(|((a, b), c)| (a, b, c)) + { + for c in compute_leaves(field, array)? { + tx.send(c).await.map_err(|_| { + DataFusionError::Internal("Unable to send array to writer!".into()) + })?; + } } - let mut writer = None; - let endpoints: (UnboundedSender>, UnboundedReceiver>) = - tokio::sync::mpsc::unbounded_channel(); - let (tx, mut rx) = endpoints; - let writer_join_handle: JoinHandle< - Result< - AbortableWrite>, - DataFusionError, - >, - > = tokio::task::spawn(async move { - while let Some(data) = rx.recv().await { - // TODO write incrementally - // https://github.com/apache/arrow-datafusion/issues/7591 - object_store_writer.write_all(data.as_slice()).await?; + Ok(()) +} + +/// Spawns a tokio task which joins the parallel column writer tasks, +/// and finalizes the row group. +fn spawn_rg_join_and_finalize_task( + column_writer_handles: Vec>>, + rg_rows: usize, +) -> JoinHandle { + tokio::spawn(async move { + let num_cols = column_writer_handles.len(); + let mut finalized_rg = Vec::with_capacity(num_cols); + for handle in column_writer_handles.into_iter() { + match handle.await { + Ok(r) => { + let w = r?; + finalized_rg.push(w.close()?); + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()) + } else { + unreachable!() + } + } + } + } + + Ok((finalized_rg, rg_rows)) + }) +} + +/// This task coordinates the serialization of a parquet file in parallel. +/// As the query produces RecordBatches, these are written to a RowGroup +/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per +/// row group is reached, the parallel tasks are joined on another separate task +/// and sent to a concatenation task. This task immediately continues to work +/// on the next row group in parallel. So, parquet serialization is parallelized +/// accross both columns and row_groups, with a theoretical max number of parallel tasks +/// given by n_columns * num_row_groups. +fn spawn_parquet_parallel_serialization_task( + mut data: Receiver, + serialize_tx: Sender>, + schema: Arc, + writer_props: Arc, + parallel_options: ParallelParquetWriterOptions, +) -> JoinHandle> { + tokio::spawn(async move { + // This is divided by 2 because we move RecordBatches between two channels so the effective + // buffer limit is the sum of the size of each buffer, i.e. when both buffers are full. + let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream / 2; + let max_row_group_rows = writer_props.max_row_group_size(); + let (mut column_writer_handles, mut col_array_channels) = + spawn_column_parallel_row_group_writer( + schema.clone(), + writer_props.clone(), + max_buffer_rb, + )?; + let mut current_rg_rows = 0; + + while let Some(rb) = data.recv().await { + if current_rg_rows + rb.num_rows() < max_row_group_rows { + send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()) + .await?; + current_rg_rows += rb.num_rows(); + } else { + let rows_left = max_row_group_rows - current_rg_rows; + let a = rb.slice(0, rows_left); + send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()) + .await?; + + // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup + // on a separate task, so that we can immediately start on the next RG before waiting + // for the current one to finish. + drop(col_array_channels); + let finalize_rg_task = spawn_rg_join_and_finalize_task( + column_writer_handles, + max_row_group_rows, + ); + + serialize_tx.send(finalize_rg_task).await.map_err(|_| { + DataFusionError::Internal( + "Unable to send closed RG to concat task!".into(), + ) + })?; + + let b = rb.slice(rows_left, rb.num_rows() - rows_left); + (column_writer_handles, col_array_channels) = + spawn_column_parallel_row_group_writer( + schema.clone(), + writer_props.clone(), + max_buffer_rb, + )?; + send_arrays_to_col_writers(&col_array_channels, &b, schema.clone()) + .await?; + current_rg_rows = b.num_rows(); + } } - Ok(object_store_writer) - }); + + // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows + drop(col_array_channels); + let finalize_rg_task = + spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows); + + serialize_tx.send(finalize_rg_task).await.map_err(|_| { + DataFusionError::Internal("Unable to send closed RG to concat task!".into()) + })?; + + Ok(()) + }) +} + +/// Consume RowGroups serialized by other parallel tasks and concatenate them in +/// to the final parquet file, while flushing finalized bytes to an [ObjectStore] +async fn concatenate_parallel_row_groups( + mut serialize_rx: Receiver>, + schema: Arc, + writer_props: Arc, + mut object_store_writer: AbortableWrite>, +) -> Result { let merged_buff = SharedBuffer::new(1048576); - for handle in join_handles { + + let schema_desc = arrow_to_parquet_schema(schema.as_ref())?; + let mut parquet_writer = SerializedFileWriter::new( + merged_buff.clone(), + schema_desc.root_schema_ptr(), + writer_props, + )?; + + let mut row_count = 0; + + while let Some(handle) = serialize_rx.recv().await { let join_result = handle.await; match join_result { Ok(result) => { - let (out, num_rows) = result?; - let reader = bytes::Bytes::from(out); - row_count += num_rows; - //let reader = File::open(buffer)?; - let metadata = parquet::file::footer::parse_metadata(&reader)?; - let schema = metadata.file_metadata().schema(); - writer = match writer { - Some(writer) => Some(writer), - None => Some(SerializedFileWriter::new( - merged_buff.clone(), - Arc::new(schema.clone()), - Arc::new(parquet_props.clone()), - )?), - }; - - match &mut writer{ - Some(w) => { - // Note: cannot use .await within this loop as RowGroupMetaData is not Send - // Instead, use a non-blocking channel to send bytes to separate worker - // which will write to ObjectStore. - for rg in metadata.row_groups() { - let mut rg_out = w.next_row_group()?; - for column in rg.columns() { - let result = ColumnCloseResult { - bytes_written: column.compressed_size() as _, - rows_written: rg.num_rows() as _, - metadata: column.clone(), - // TODO need to populate the indexes when writing final file - // see https://github.com/apache/arrow-datafusion/issues/7589 - bloom_filter: None, - column_index: None, - offset_index: None, - }; - rg_out.append_column(&reader, result)?; - let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); - if buff_to_flush.len() > 1024000{ - let bytes: Vec = buff_to_flush.drain(..).collect(); - tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?; - - } - } - rg_out.close()?; - let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); - if buff_to_flush.len() > 1024000{ - let bytes: Vec = buff_to_flush.drain(..).collect(); - tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?; - } - } - }, - None => unreachable!("Parquet writer should always be initialized in first iteration of loop!") + let mut rg_out = parquet_writer.next_row_group()?; + let (serialized_columns, cnt) = result?; + row_count += cnt; + for chunk in serialized_columns { + chunk.append_to_row_group(&mut rg_out)?; + let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); + if buff_to_flush.len() > 1024000 { + object_store_writer + .write_all(buff_to_flush.as_slice()) + .await?; + buff_to_flush.clear(); + } } + rg_out.close()?; } Err(e) => { if e.is_panic() { @@ -946,14 +1086,51 @@ async fn output_single_parquet_file_parallelized( } } } - let inner_writer = writer.unwrap().into_inner()?; + + let inner_writer = parquet_writer.into_inner()?; let final_buff = inner_writer.buffer.try_lock().unwrap(); - // Explicitly drop tx to signal to rx we are done sending data - drop(tx); + object_store_writer.write_all(final_buff.as_slice()).await?; + object_store_writer.shutdown().await?; + + Ok(row_count) +} - let mut object_store_writer = match writer_join_handle.await { - Ok(r) => r?, +/// Parallelizes the serialization of a single parquet file, by first serializing N +/// independent RecordBatch streams in parallel to RowGroups in memory. Another +/// task then stitches these independent RowGroups together and streams this large +/// single parquet file to an ObjectStore in multiple parts. +async fn output_single_parquet_file_parallelized( + object_store_writer: AbortableWrite>, + data: Receiver, + output_schema: Arc, + parquet_props: &WriterProperties, + parallel_options: ParallelParquetWriterOptions, +) -> Result { + let max_rowgroups = parallel_options.max_parallel_row_groups; + // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel + let (serialize_tx, serialize_rx) = + mpsc::channel::>(max_rowgroups); + + let arc_props = Arc::new(parquet_props.clone()); + let launch_serialization_task = spawn_parquet_parallel_serialization_task( + data, + serialize_tx, + output_schema.clone(), + arc_props.clone(), + parallel_options, + ); + let row_count = concatenate_parallel_row_groups( + serialize_rx, + output_schema.clone(), + arc_props.clone(), + object_store_writer, + ) + .await?; + + match launch_serialization_task.await { + Ok(Ok(_)) => (), + Ok(Err(e)) => return Err(e), Err(e) => { if e.is_panic() { std::panic::resume_unwind(e.into_panic()) @@ -962,8 +1139,6 @@ async fn output_single_parquet_file_parallelized( } } }; - object_store_writer.write_all(final_buff.as_slice()).await?; - object_store_writer.shutdown().await?; Ok(row_count) } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 4a2b6220fd85..ef51a8198922 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -150,10 +150,10 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false -datafusion.execution.max_buffered_batches_per_output_file 2 +datafusion.execution.max_buffered_batches_per_output_file 10 datafusion.execution.meta_fetch_concurrency 32 datafusion.execution.minimum_parallel_output_files 4 -datafusion.execution.parquet.allow_single_file_parallelism false +datafusion.execution.parquet.allow_single_file_parallelism true datafusion.execution.parquet.bloom_filter_enabled false datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL @@ -168,6 +168,8 @@ datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.max_statistics_size NULL +datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 200 +datafusion.execution.parquet.maximum_parallel_row_group_writers 16 datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false @@ -220,10 +222,10 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files -datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption +datafusion.execution.max_buffered_batches_per_output_file 10 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. -datafusion.execution.parquet.allow_single_file_parallelism false Controls whether DataFusion will attempt to speed up writing large parquet files by first writing multiple smaller files and then stitching them together into a single large file. This will result in faster write speeds, but higher memory usage. Also currently unsupported are bloom filters and column indexes when single_file_parallelism is enabled. +datafusion.execution.parquet.allow_single_file_parallelism true Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. datafusion.execution.parquet.bloom_filter_enabled false Sets if bloom filter is enabled for any column datafusion.execution.parquet.bloom_filter_fpp NULL Sets bloom filter false positive probability. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_ndv NULL Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting @@ -238,6 +240,8 @@ datafusion.execution.parquet.enable_page_index true If true, reads the Parquet d datafusion.execution.parquet.encoding NULL Sets default encoding for any column Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.max_row_group_size 1048576 Sets maximum number of rows in a row group datafusion.execution.parquet.max_statistics_size NULL Sets max statistics size for any column. If NULL, uses default parquet writer setting +datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 200 If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent too many RecordBatches building up in memory in case the parallel writers cannot consume them fast enough. Lowering this number limits memory growth at the cost of potentially lower write speeds. +datafusion.execution.parquet.maximum_parallel_row_group_writers 16 If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent working on too many row groups in parallel in case of limited memory or slow I/O speed causing OOM errors. Lowering this number limits memory growth at the cost of potentially slower write speeds. datafusion.execution.parquet.metadata_size_hint NULL If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer datafusion.execution.parquet.pruning true If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 3476118ca645..4279e0850060 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,70 +35,72 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. -| key | default | description | -| ---------------------------------------------------------- | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | -| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | -| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | -| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | -| datafusion.catalog.has_header | false | If the file has a header | -| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | -| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | -| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | -| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | -| datafusion.execution.parquet.enable_page_index | true | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | -| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | -| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | -| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | -| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | -| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.parquet.data_pagesize_limit | 1048576 | Sets best effort maximum size of data page in bytes | -| datafusion.execution.parquet.write_batch_size | 1024 | Sets write_batch_size in bytes | -| datafusion.execution.parquet.writer_version | 1.0 | Sets parquet writer version valid values are "1.0" and "2.0" | -| datafusion.execution.parquet.compression | zstd(3) | Sets default parquet compression codec Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.dictionary_enabled | NULL | Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | Sets best effort maximum dictionary page size, in bytes | -| datafusion.execution.parquet.statistics_enabled | NULL | Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.max_statistics_size | NULL | Sets max statistics size for any column. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.max_row_group_size | 1048576 | Sets maximum number of rows in a row group | -| datafusion.execution.parquet.created_by | datafusion version 32.0.0 | Sets "created by" property | -| datafusion.execution.parquet.column_index_truncate_length | NULL | Sets column index truncate length | -| datafusion.execution.parquet.data_page_row_count_limit | 18446744073709551615 | Sets best effort maximum number of rows in data page | -| datafusion.execution.parquet.encoding | NULL | Sets default encoding for any column Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.bloom_filter_enabled | false | Sets if bloom filter is enabled for any column | -| datafusion.execution.parquet.bloom_filter_fpp | NULL | Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.bloom_filter_ndv | NULL | Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.allow_single_file_parallelism | false | Controls whether DataFusion will attempt to speed up writing large parquet files by first writing multiple smaller files and then stitching them together into a single large file. This will result in faster write speeds, but higher memory usage. Also currently unsupported are bloom filters and column indexes when single_file_parallelism is enabled. | -| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | -| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | -| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | -| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | -| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | -| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | -| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | -| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | -| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | -| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | -| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | -| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | -| datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. | -| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | -| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | -| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | -| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | -| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | -| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | -| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | +| key | default | description | +| ----------------------------------------------------------------------- | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | +| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | +| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | +| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | +| datafusion.catalog.has_header | false | If the file has a header | +| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | +| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | +| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | +| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | +| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | +| datafusion.execution.parquet.enable_page_index | true | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | +| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | +| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | +| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | +| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | +| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.data_pagesize_limit | 1048576 | Sets best effort maximum size of data page in bytes | +| datafusion.execution.parquet.write_batch_size | 1024 | Sets write_batch_size in bytes | +| datafusion.execution.parquet.writer_version | 1.0 | Sets parquet writer version valid values are "1.0" and "2.0" | +| datafusion.execution.parquet.compression | zstd(3) | Sets default parquet compression codec Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.dictionary_enabled | NULL | Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | Sets best effort maximum dictionary page size, in bytes | +| datafusion.execution.parquet.statistics_enabled | NULL | Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.max_statistics_size | NULL | Sets max statistics size for any column. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.max_row_group_size | 1048576 | Sets maximum number of rows in a row group | +| datafusion.execution.parquet.created_by | datafusion version 32.0.0 | Sets "created by" property | +| datafusion.execution.parquet.column_index_truncate_length | NULL | Sets column index truncate length | +| datafusion.execution.parquet.data_page_row_count_limit | 18446744073709551615 | Sets best effort maximum number of rows in data page | +| datafusion.execution.parquet.encoding | NULL | Sets default encoding for any column Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.bloom_filter_enabled | false | Sets if bloom filter is enabled for any column | +| datafusion.execution.parquet.bloom_filter_fpp | NULL | Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.bloom_filter_ndv | NULL | Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.allow_single_file_parallelism | true | Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | +| datafusion.execution.parquet.maximum_parallel_row_group_writers | 16 | If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent working on too many row groups in parallel in case of limited memory or slow I/O speed causing OOM errors. Lowering this number limits memory growth at the cost of potentially slower write speeds. | +| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 200 | If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent too many RecordBatches building up in memory in case the parallel writers cannot consume them fast enough. Lowering this number limits memory growth at the cost of potentially lower write speeds. | +| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | +| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | +| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | +| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | +| datafusion.execution.max_buffered_batches_per_output_file | 10 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | +| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | +| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | +| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | +| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | +| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | +| datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. | +| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | +| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | +| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | +| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | +| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | +| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | +| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | +| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | From a2149370c08e3bd2bc3fae33d2f2672f68547391 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Mon, 23 Oct 2023 17:01:56 -0400 Subject: [PATCH 2/5] fixes and cmt --- .../src/datasource/file_format/parquet.rs | 46 +------------------ 1 file changed, 2 insertions(+), 44 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 5943fa4c2181..0739e449e71f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -26,11 +26,9 @@ use parquet::arrow::arrow_writer::{ ArrowLeafColumn, }; use parquet::file::writer::SerializedFileWriter; -use rand::distributions::DistString; use std::any::Any; use std::fmt; use std::fmt::Debug; -use std::fs::File; use std::io::Write; use std::sync::Arc; use tokio::io::{AsyncWrite, AsyncWriteExt}; @@ -55,7 +53,6 @@ use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; -use rand::distributions::Alphanumeric; use super::write::demux::start_demuxer_task; use super::write::{create_writer, AbortableWrite, FileWriterMode}; @@ -678,45 +675,6 @@ impl ParquetSink { } } } - - /// Creates an object store writer for each output partition - /// This is used when parallelizing individual parquet file writes. - async fn create_object_store_writers( - &self, - num_partitions: usize, - object_store: Arc, - single_file_output: bool, - ) -> Result>>> { - let mut writers = Vec::new(); - - let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - for part_idx in 0..num_partitions { - let file_path = if !single_file_output { - self.config.table_paths[0] - .prefix() - .child(format!("{}_{}.parquet", write_id, part_idx)) - } else { - self.config.table_paths[0].prefix().clone() - }; - let object_meta = ObjectMeta { - location: file_path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - writers.push( - create_writer( - FileWriterMode::PutMultipart, - FileCompressionType::UNCOMPRESSED, - object_meta.into(), - object_store.clone(), - ) - .await?, - ); - } - - Ok(writers) - } } #[async_trait] @@ -814,14 +772,14 @@ impl DataSink for ParquetSink { let props = parquet_props.clone(); let parallel_options_clone = parallel_options.clone(); file_write_tasks.spawn(async move { - Ok(output_single_parquet_file_parallelized( + output_single_parquet_file_parallelized( writer, rx, schema, &props, parallel_options_clone, ) - .await?) + .await }); } } From af03d972cae85aa4b49482e06ef50fbffc9fcdc1 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Tue, 24 Oct 2023 19:28:56 -0400 Subject: [PATCH 3/5] review comments, tuning parameters, updating docs --- datafusion/common/src/config.rs | 14 +++++++++----- .../core/src/datasource/file_format/parquet.rs | 16 ++++++++-------- .../test_files/information_schema.slt | 12 ++++++------ docs/source/user-guide/configs.md | 6 +++--- 4 files changed, 26 insertions(+), 22 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5655d172e871..75294daca6a2 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -271,7 +271,7 @@ config_namespace! { /// for each output file being worked. Higher values can potentially /// give faster write performance at the cost of higher peak /// memory consumption - pub max_buffered_batches_per_output_file: usize, default = 10 + pub max_buffered_batches_per_output_file: usize, default = 2 } } @@ -386,15 +386,19 @@ config_namespace! { /// applying backpressure to prevent working on too many row groups in /// parallel in case of limited memory or slow I/O speed causing /// OOM errors. Lowering this number limits memory growth at the cost - /// of potentially slower write speeds. - pub maximum_parallel_row_group_writers: usize, default = 16 + /// of potentially slower write speeds. Boosting this number may + /// help performance when batches can be produced very fast, + /// such as from an in-memory table. + pub maximum_parallel_row_group_writers: usize, default = 2 /// If allow_single_file_parallelism=true, this setting allows /// applying backpressure to prevent too many RecordBatches building /// up in memory in case the parallel writers cannot consume them fast /// enough. Lowering this number limits memory growth at the cost - /// of potentially lower write speeds. - pub maximum_buffered_record_batches_per_stream: usize, default = 200 + /// of potentially lower write speeds. Boosting this number may + /// help performance when batches can be produces very fast, such + /// as from an in-memory table. + pub maximum_buffered_record_batches_per_stream: usize, default = 128 } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0739e449e71f..07acd88560ab 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -933,9 +933,7 @@ fn spawn_parquet_parallel_serialization_task( parallel_options: ParallelParquetWriterOptions, ) -> JoinHandle> { tokio::spawn(async move { - // This is divided by 2 because we move RecordBatches between two channels so the effective - // buffer limit is the sum of the size of each buffer, i.e. when both buffers are full. - let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream / 2; + let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream; let max_row_group_rows = writer_props.max_row_group_size(); let (mut column_writer_handles, mut col_array_channels) = spawn_column_parallel_row_group_writer( @@ -984,14 +982,16 @@ fn spawn_parquet_parallel_serialization_task( } } - // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows drop(col_array_channels); - let finalize_rg_task = + // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows + if current_rg_rows > 0{ + let finalize_rg_task = spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows); - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal("Unable to send closed RG to concat task!".into()) - })?; + serialize_tx.send(finalize_rg_task).await.map_err(|_| { + DataFusionError::Internal("Unable to send closed RG to concat task!".into()) + })?; + } Ok(()) }) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index ef51a8198922..c202cabad90a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -150,7 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false -datafusion.execution.max_buffered_batches_per_output_file 10 +datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.meta_fetch_concurrency 32 datafusion.execution.minimum_parallel_output_files 4 datafusion.execution.parquet.allow_single_file_parallelism true @@ -168,8 +168,8 @@ datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.max_statistics_size NULL -datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 200 -datafusion.execution.parquet.maximum_parallel_row_group_writers 16 +datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 128 +datafusion.execution.parquet.maximum_parallel_row_group_writers 2 datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false @@ -222,7 +222,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files -datafusion.execution.max_buffered_batches_per_output_file 10 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption +datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. datafusion.execution.parquet.allow_single_file_parallelism true Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. @@ -240,8 +240,8 @@ datafusion.execution.parquet.enable_page_index true If true, reads the Parquet d datafusion.execution.parquet.encoding NULL Sets default encoding for any column Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.max_row_group_size 1048576 Sets maximum number of rows in a row group datafusion.execution.parquet.max_statistics_size NULL Sets max statistics size for any column. If NULL, uses default parquet writer setting -datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 200 If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent too many RecordBatches building up in memory in case the parallel writers cannot consume them fast enough. Lowering this number limits memory growth at the cost of potentially lower write speeds. -datafusion.execution.parquet.maximum_parallel_row_group_writers 16 If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent working on too many row groups in parallel in case of limited memory or slow I/O speed causing OOM errors. Lowering this number limits memory growth at the cost of potentially slower write speeds. +datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 128 If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent too many RecordBatches building up in memory in case the parallel writers cannot consume them fast enough. Lowering this number limits memory growth at the cost of potentially lower write speeds. Boosting this number may help performance when batches can be produces very fast, such as from an in-memory table. +datafusion.execution.parquet.maximum_parallel_row_group_writers 2 If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent working on too many row groups in parallel in case of limited memory or slow I/O speed causing OOM errors. Lowering this number limits memory growth at the cost of potentially slower write speeds. Boosting this number may help performance when batches can be produced very fast, such as from an in-memory table. datafusion.execution.parquet.metadata_size_hint NULL If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer datafusion.execution.parquet.pruning true If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 4279e0850060..f5dbdd7e3588 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -72,8 +72,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.bloom_filter_fpp | NULL | Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.bloom_filter_ndv | NULL | Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.allow_single_file_parallelism | true | Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | -| datafusion.execution.parquet.maximum_parallel_row_group_writers | 16 | If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent working on too many row groups in parallel in case of limited memory or slow I/O speed causing OOM errors. Lowering this number limits memory growth at the cost of potentially slower write speeds. | -| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 200 | If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent too many RecordBatches building up in memory in case the parallel writers cannot consume them fast enough. Lowering this number limits memory growth at the cost of potentially lower write speeds. | +| datafusion.execution.parquet.maximum_parallel_row_group_writers | 2 | If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent working on too many row groups in parallel in case of limited memory or slow I/O speed causing OOM errors. Lowering this number limits memory growth at the cost of potentially slower write speeds. Boosting this number may help performance when batches can be produced very fast, such as from an in-memory table. | +| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 128 | If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent too many RecordBatches building up in memory in case the parallel writers cannot consume them fast enough. Lowering this number limits memory growth at the cost of potentially lower write speeds. Boosting this number may help performance when batches can be produces very fast, such as from an in-memory table. | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | @@ -81,7 +81,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | -| datafusion.execution.max_buffered_batches_per_output_file | 10 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | +| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | From fd52457b313c775e2374452c113928c16f3dcb53 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Tue, 24 Oct 2023 19:29:17 -0400 Subject: [PATCH 4/5] cargo fmt --- datafusion/common/src/config.rs | 2 +- datafusion/core/src/datasource/file_format/parquet.rs | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 75294daca6a2..f0612f005f8d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -386,7 +386,7 @@ config_namespace! { /// applying backpressure to prevent working on too many row groups in /// parallel in case of limited memory or slow I/O speed causing /// OOM errors. Lowering this number limits memory growth at the cost - /// of potentially slower write speeds. Boosting this number may + /// of potentially slower write speeds. Boosting this number may /// help performance when batches can be produced very fast, /// such as from an in-memory table. pub maximum_parallel_row_group_writers: usize, default = 2 diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 07acd88560ab..62867c0e2b38 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -984,12 +984,14 @@ fn spawn_parquet_parallel_serialization_task( drop(col_array_channels); // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows - if current_rg_rows > 0{ + if current_rg_rows > 0 { let finalize_rg_task = - spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows); + spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows); serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal("Unable to send closed RG to concat task!".into()) + DataFusionError::Internal( + "Unable to send closed RG to concat task!".into(), + ) })?; } From d120a03221ccacf12086f4d9440bda4756caf45e Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 25 Oct 2023 08:58:01 -0400 Subject: [PATCH 5/5] reduce default buffer size to 2 and update docs --- datafusion/common/src/config.rs | 38 ++++++++++--------- .../test_files/information_schema.slt | 8 ++-- docs/source/user-guide/configs.md | 4 +- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f0612f005f8d..403241fcce58 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -382,23 +382,27 @@ config_namespace! { /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns. pub allow_single_file_parallelism: bool, default = true - /// If allow_single_file_parallelism=true, this setting allows - /// applying backpressure to prevent working on too many row groups in - /// parallel in case of limited memory or slow I/O speed causing - /// OOM errors. Lowering this number limits memory growth at the cost - /// of potentially slower write speeds. Boosting this number may - /// help performance when batches can be produced very fast, - /// such as from an in-memory table. - pub maximum_parallel_row_group_writers: usize, default = 2 - - /// If allow_single_file_parallelism=true, this setting allows - /// applying backpressure to prevent too many RecordBatches building - /// up in memory in case the parallel writers cannot consume them fast - /// enough. Lowering this number limits memory growth at the cost - /// of potentially lower write speeds. Boosting this number may - /// help performance when batches can be produces very fast, such - /// as from an in-memory table. - pub maximum_buffered_record_batches_per_stream: usize, default = 128 + /// By default parallel parquet writer is tuned for minimum + /// memory usage in a streaming execution plan. You may see + /// a performance benefit when writing large parquet files + /// by increasing maximum_parallel_row_group_writers and + /// maximum_buffered_record_batches_per_stream if your system + /// has idle cores and can tolerate additional memory usage. + /// Boosting these values is likely worthwhile when + /// writing out already in-memory data, such as from a cached + /// data frame. + pub maximum_parallel_row_group_writers: usize, default = 1 + + /// By default parallel parquet writer is tuned for minimum + /// memory usage in a streaming execution plan. You may see + /// a performance benefit when writing large parquet files + /// by increasing maximum_parallel_row_group_writers and + /// maximum_buffered_record_batches_per_stream if your system + /// has idle cores and can tolerate additional memory usage. + /// Boosting these values is likely worthwhile when + /// writing out already in-memory data, such as from a cached + /// data frame. + pub maximum_buffered_record_batches_per_stream: usize, default = 2 } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index c202cabad90a..ed85f54a39aa 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -168,8 +168,8 @@ datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.max_statistics_size NULL -datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 128 -datafusion.execution.parquet.maximum_parallel_row_group_writers 2 +datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 +datafusion.execution.parquet.maximum_parallel_row_group_writers 1 datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false @@ -240,8 +240,8 @@ datafusion.execution.parquet.enable_page_index true If true, reads the Parquet d datafusion.execution.parquet.encoding NULL Sets default encoding for any column Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.max_row_group_size 1048576 Sets maximum number of rows in a row group datafusion.execution.parquet.max_statistics_size NULL Sets max statistics size for any column. If NULL, uses default parquet writer setting -datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 128 If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent too many RecordBatches building up in memory in case the parallel writers cannot consume them fast enough. Lowering this number limits memory growth at the cost of potentially lower write speeds. Boosting this number may help performance when batches can be produces very fast, such as from an in-memory table. -datafusion.execution.parquet.maximum_parallel_row_group_writers 2 If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent working on too many row groups in parallel in case of limited memory or slow I/O speed causing OOM errors. Lowering this number limits memory growth at the cost of potentially slower write speeds. Boosting this number may help performance when batches can be produced very fast, such as from an in-memory table. +datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. +datafusion.execution.parquet.maximum_parallel_row_group_writers 1 By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.metadata_size_hint NULL If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer datafusion.execution.parquet.pruning true If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f5dbdd7e3588..c8ff1b06d609 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -72,8 +72,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.bloom_filter_fpp | NULL | Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.bloom_filter_ndv | NULL | Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.allow_single_file_parallelism | true | Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | -| datafusion.execution.parquet.maximum_parallel_row_group_writers | 2 | If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent working on too many row groups in parallel in case of limited memory or slow I/O speed causing OOM errors. Lowering this number limits memory growth at the cost of potentially slower write speeds. Boosting this number may help performance when batches can be produced very fast, such as from an in-memory table. | -| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 128 | If allow_single_file_parallelism=true, this setting allows applying backpressure to prevent too many RecordBatches building up in memory in case the parallel writers cannot consume them fast enough. Lowering this number limits memory growth at the cost of potentially lower write speeds. Boosting this number may help performance when batches can be produces very fast, such as from an in-memory table. | +| datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |