diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index fed0349dd505..c3f0861f2956 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -353,6 +353,15 @@ config_namespace! { /// Sets bloom filter number of distinct values. If NULL, uses /// default parquet writer setting pub bloom_filter_ndv: Option, default = None + + /// Controls whether DataFusion will attempt to speed up writing + /// large parquet files by first writing multiple smaller files + /// and then stitching them together into a single large file. + /// This will result in faster write speeds, but higher memory usage. + /// Also currently unsupported are bloom filters and column indexes + /// when single_file_parallelism is enabled. + pub allow_single_file_parallelism: bool, default = false + } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0200238c585b..3b15ccd17a84 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,18 +17,25 @@ //! Parquet format abstractions +use parquet::column::writer::ColumnCloseResult; +use parquet::file::writer::SerializedFileWriter; use rand::distributions::DistString; use std::any::Any; use std::fmt; use std::fmt::Debug; +use std::io::Write; use std::sync::Arc; -use tokio::task::JoinSet; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::task::{JoinHandle, JoinSet}; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; -use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; +use datafusion_common::{ + exec_err, not_impl_err, plan_err, DataFusionError, FileCompressionType, FileType, +}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; @@ -41,7 +48,7 @@ use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use rand::distributions::Alphanumeric; -use super::write::FileWriterMode; +use super::write::{create_writer, AbortableWrite, FileWriterMode}; use super::FileFormat; use super::FileScanConfig; use crate::arrow::array::{ @@ -605,8 +612,9 @@ impl ParquetSink { Self { config } } - // Create a write for parquet files - async fn create_writer( + /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore + /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized + async fn create_async_arrow_writer( &self, file_meta: FileMeta, object_store: Arc, @@ -639,26 +647,17 @@ impl ParquetSink { } } } -} -#[async_trait] -impl DataSink for ParquetSink { - async fn write_all( + /// Creates an AsyncArrowWriter for each partition to be written out + /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized + async fn create_all_async_arrow_writers( &self, - mut data: Vec, - context: &Arc, - ) -> Result { - let num_partitions = data.len(); - let parquet_props = self - .config - .file_type_writer_options - .try_into_parquet()? - .writer_options(); - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - + num_partitions: usize, + parquet_props: &WriterProperties, + object_store: Arc, + ) -> Result< + Vec>>, + > { // Construct writer for each file group let mut writers = vec![]; match self.config.writer_mode { @@ -689,7 +688,7 @@ impl DataSink for ParquetSink { e_tag: None, }; let writer = self - .create_writer( + .create_async_arrow_writer( object_meta.into(), object_store.clone(), parquet_props.clone(), @@ -707,7 +706,7 @@ impl DataSink for ParquetSink { e_tag: None, }; let writer = self - .create_writer( + .create_async_arrow_writer( object_meta.into(), object_store.clone(), parquet_props.clone(), @@ -719,55 +718,329 @@ impl DataSink for ParquetSink { } } + Ok(writers) + } + + /// Creates an object store writer for each output partition + /// This is used when parallelizing individual parquet file writes. + async fn create_object_store_writers( + &self, + num_partitions: usize, + object_store: Arc, + ) -> Result>>> { + let mut writers = Vec::new(); + + for _ in 0..num_partitions { + let file_path = self.config.table_paths[0].prefix(); + let object_meta = ObjectMeta { + location: file_path.clone(), + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + writers.push( + create_writer( + FileWriterMode::PutMultipart, + FileCompressionType::UNCOMPRESSED, + object_meta.into(), + object_store.clone(), + ) + .await?, + ); + } + + Ok(writers) + } +} + +#[async_trait] +impl DataSink for ParquetSink { + async fn write_all( + &self, + mut data: Vec, + context: &Arc, + ) -> Result { + let num_partitions = data.len(); + let parquet_props = self + .config + .file_type_writer_options + .try_into_parquet()? + .writer_options(); + + let object_store = context + .runtime_env() + .object_store(&self.config.object_store_url)?; + let mut row_count = 0; + let allow_single_file_parallelism = context + .session_config() + .options() + .execution + .parquet + .allow_single_file_parallelism; + match self.config.single_file_output { false => { - let mut join_set: JoinSet> = - JoinSet::new(); - for (mut data_stream, mut writer) in - data.into_iter().zip(writers.into_iter()) - { - join_set.spawn(async move { - let mut cnt = 0; + let writers = self + .create_all_async_arrow_writers( + num_partitions, + parquet_props, + object_store.clone(), + ) + .await?; + // TODO parallelize individual parquet serialization when already outputting multiple parquet files + // e.g. if outputting 2 parquet files on a system with 32 threads, spawn 16 tasks for each individual + // file to be serialized. + row_count = output_multiple_parquet_files(writers, data).await?; + } + true => { + if !allow_single_file_parallelism || data.len() <= 1 { + let mut writer = self + .create_all_async_arrow_writers( + num_partitions, + parquet_props, + object_store.clone(), + ) + .await? + .remove(0); + for data_stream in data.iter_mut() { while let Some(batch) = data_stream.next().await.transpose()? { - cnt += batch.num_rows(); + row_count += batch.num_rows(); writer.write(&batch).await?; } - writer.close().await?; - Ok(cnt) - }); + } + + writer.close().await?; + } else { + let object_store_writer = self + .create_object_store_writers(1, object_store) + .await? + .remove(0); + row_count = output_single_parquet_file_parallelized( + object_store_writer, + data, + self.config.output_schema.clone(), + parquet_props, + ) + .await?; } - while let Some(result) = join_set.join_next().await { - match result { - Ok(res) => { - row_count += res?; - } // propagate DataFusion error - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); + } + } + + Ok(row_count as u64) + } +} + +/// This is the return type when joining subtasks which are serializing parquet files +/// into memory buffers. The first part of the tuple is the parquet bytes and the +/// second is how many rows were written into the file. +type ParquetFileSerializedResult = Result<(Vec, usize), DataFusionError>; + +/// Parallelizes the serialization of a single parquet file, by first serializing N +/// independent RecordBatch streams in parallel to parquet files in memory. Another +/// task then stitches these independent files back together and streams this large +/// single parquet file to an ObjectStore in multiple parts. +async fn output_single_parquet_file_parallelized( + mut object_store_writer: AbortableWrite>, + mut data: Vec, + output_schema: Arc, + parquet_props: &WriterProperties, +) -> Result { + let mut row_count = 0; + let parallelism = data.len(); + let mut join_handles: Vec> = + Vec::with_capacity(parallelism); + for _ in 0..parallelism { + let buffer: Vec = Vec::new(); + let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new( + buffer, + output_schema.clone(), + Some(parquet_props.clone()), + )?; + let mut data_stream = data.remove(0); + join_handles.push(tokio::spawn(async move { + let mut inner_row_count = 0; + while let Some(batch) = data_stream.next().await.transpose()? { + inner_row_count += batch.num_rows(); + writer.write(&batch)?; + } + let out = writer.into_inner()?; + Ok((out, inner_row_count)) + })) + } + + let mut writer = None; + let endpoints: (UnboundedSender>, UnboundedReceiver>) = + tokio::sync::mpsc::unbounded_channel(); + let (tx, mut rx) = endpoints; + let writer_join_handle: JoinHandle< + Result< + AbortableWrite>, + DataFusionError, + >, + > = tokio::task::spawn(async move { + while let Some(data) = rx.recv().await { + object_store_writer.write_all(data.as_slice()).await?; + } + Ok(object_store_writer) + }); + let merged_buff = SharedBuffer::new(1048576); + for handle in join_handles { + let join_result = handle.await; + match join_result { + Ok(result) => { + let (out, num_rows) = result?; + let reader = bytes::Bytes::from(out); + row_count += num_rows; + //let reader = File::open(buffer)?; + let metadata = parquet::file::footer::parse_metadata(&reader)?; + let schema = metadata.file_metadata().schema(); + writer = match writer { + Some(writer) => Some(writer), + None => Some(SerializedFileWriter::new( + merged_buff.clone(), + Arc::new(schema.clone()), + Arc::new(parquet_props.clone()), + )?), + }; + + match &mut writer{ + Some(w) => { + // Note: cannot use .await within this loop as RowGroupMetaData is not Send + // Instead, use a non-blocking channel to send bytes to separate worker + // which will write to ObjectStore. + for rg in metadata.row_groups() { + let mut rg_out = w.next_row_group()?; + for column in rg.columns() { + let result = ColumnCloseResult { + bytes_written: column.compressed_size() as _, + rows_written: rg.num_rows() as _, + metadata: column.clone(), + bloom_filter: None, + column_index: None, + offset_index: None, + }; + rg_out.append_column(&reader, result)?; + let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); + if buff_to_flush.len() > 1024000{ + let bytes: Vec = buff_to_flush.drain(..).collect(); + tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?; + + } + } + rg_out.close()?; + let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); + if buff_to_flush.len() > 1024000{ + let bytes: Vec = buff_to_flush.drain(..).collect(); + tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?; } } - } + }, + None => unreachable!("Parquet writer should always be initialized in first iteration of loop!") } } - true => { - let mut writer = writers.remove(0); - for data_stream in data.iter_mut() { - while let Some(batch) = data_stream.next().await.transpose()? { - row_count += batch.num_rows(); - // TODO cleanup all multipart writes when any encounters an error - writer.write(&batch).await?; - } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); } + } + } + } + let inner_writer = writer.unwrap().into_inner()?; + let final_buff = inner_writer.buffer.try_lock().unwrap(); + + // Explicitly drop tx to signal to rx we are done sending data + drop(tx); + + let mut object_store_writer = match writer_join_handle.await { + Ok(r) => r?, + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()) + } else { + unreachable!() + } + } + }; + object_store_writer.write_all(final_buff.as_slice()).await?; + object_store_writer.shutdown().await?; + println!("done!"); + + Ok(row_count) +} - writer.close().await?; +/// Serializes multiple parquet files independently in parallel from different RecordBatch streams. +/// AsyncArrowWriter is used to coordinate serialization and MultiPart puts to ObjectStore +/// Only a single CPU thread is used to serialize each individual parquet file, so write speed and overall +/// CPU utilization is dependent on the number of output files. +async fn output_multiple_parquet_files( + writers: Vec< + AsyncArrowWriter>, + >, + data: Vec, +) -> Result { + let mut row_count = 0; + let mut join_set: JoinSet> = JoinSet::new(); + for (mut data_stream, mut writer) in data.into_iter().zip(writers.into_iter()) { + join_set.spawn(async move { + let mut cnt = 0; + while let Some(batch) = data_stream.next().await.transpose()? { + cnt += batch.num_rows(); + writer.write(&batch).await?; + } + writer.close().await?; + Ok(cnt) + }); + } + while let Some(result) = join_set.join_next().await { + match result { + Ok(res) => { + row_count += res?; + } // propagate DataFusion error + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } } } + } - Ok(row_count as u64) + Ok(row_count) +} + +/// A buffer with interior mutability shared by the SerializedFileWriter and +/// ObjectStore writer +#[derive(Clone)] +struct SharedBuffer { + /// The inner buffer for reading and writing + /// + /// The lock is used to obtain internal mutability, so no worry about the + /// lock contention. + buffer: Arc>>, +} + +impl SharedBuffer { + pub fn new(capacity: usize) -> Self { + Self { + buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))), + } + } +} + +impl Write for SharedBuffer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut buffer = self.buffer.try_lock().unwrap(); + Write::write(&mut *buffer, buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + let mut buffer = self.buffer.try_lock().unwrap(); + Write::flush(&mut *buffer) } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5d21afc937f4..58483cc8bd86 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -150,6 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false +datafusion.execution.parquet.allow_single_file_parallelism false datafusion.execution.parquet.bloom_filter_enabled false datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 22b1cc84086f..3633b7f6efe9 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -71,6 +71,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.bloom_filter_enabled | false | Sets if bloom filter is enabled for any column | | datafusion.execution.parquet.bloom_filter_fpp | NULL | Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.bloom_filter_ndv | NULL | Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.allow_single_file_parallelism | false | Controls whether DataFusion will attempt to speed up writing large parquet files by first writing multiple smaller files and then stitching them together into a single large file. This will result in faster write speeds, but higher memory usage. Also currently unsupported are bloom filters and column indexes when single_file_parallelism is enabled. | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |