Skip to content

Commit

Permalink
DataSink Dynamic Execution Time Demux (#7791)
Browse files Browse the repository at this point in the history
* dynamic partition

* linting

* update docs

* fix all tests

* cargo doc fix

* test correct number of files

* add asci art and reduce buffer size

* fix config tests

* review comments

* cleanup post rebase

* fix test
  • Loading branch information
devinjdangelo committed Oct 18, 2023
1 parent 033b2ef commit 7c6fdcc
Show file tree
Hide file tree
Showing 13 changed files with 720 additions and 561 deletions.
18 changes: 18 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,24 @@ config_namespace! {

/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32

/// Target number of rows in output files when writing multiple.
/// This is a soft max, so it can be exceeded slightly. There also
/// will be one file smaller than the limit if the total
/// number of rows written is not roughly divisible by the soft max
pub soft_max_rows_per_output_file: usize, default = 50000000

/// This is the maximum number of output files being written
/// in parallel. Higher values can potentially give faster write
/// performance at the cost of higher peak memory consumption.
pub max_parallel_ouput_files: usize, default = 8

/// This is the maximum number of RecordBatches buffered
/// for each output file being worked. Higher values can potentially
/// give faster write performance at the cost of higher peak
/// memory consumption
pub max_buffered_batches_per_output_file: usize, default = 2

}
}

Expand Down
192 changes: 86 additions & 106 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;

use super::write::{stateless_append_all, stateless_multipart_put};
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{
create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode,
};
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
Expand All @@ -51,7 +50,6 @@ use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use rand::distributions::{Alphanumeric, DistString};

/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
Expand Down Expand Up @@ -481,6 +479,82 @@ impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
Self { config }
}

async fn append_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
let compression = FileCompressionType::from(*compression);

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let file_groups = &self.config.file_groups;

let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move |file_size| {
let inner_clone = builder_clone.clone();
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 0 {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(false)
} else {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(options_clone.has_header)
});
serializer
};

stateless_append_all(
data,
context,
object_store,
file_groups,
self.config.unbounded_input,
compression,
Box::new(get_serializer),
)
.await
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let builder = &writer_options.writer_options;

let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move || {
let inner_clone = builder_clone.clone();
let serializer: Box<dyn BatchSerializer> = Box::new(
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(options_clone.has_header),
);
serializer
};

stateless_multipart_put(
data,
context,
"csv".into(),
Box::new(get_serializer),
&self.config,
writer_options.compression.into(),
)
.await
}
}

#[async_trait]
Expand All @@ -495,116 +569,22 @@ impl DataSink for CsvSink {

async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = data.len();
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
let mut has_header = writer_options.has_header;
let compression = FileCompressionType::from(*compression);

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
// Construct serializer and writer for each file group
let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
for file_group in &self.config.file_groups {
let mut append_builder = builder.clone();
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
if file_group.object_meta.size != 0 {
has_header = false;
append_builder = append_builder.has_headers(false);
}
let serializer = CsvSerializer::new()
.with_builder(append_builder)
.with_header(has_header);
serializers.push(Box::new(serializer));

let file = file_group.clone();
let writer = create_writer(
self.config.writer_mode,
compression,
file.object_meta.clone().into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
FileWriterMode::Put => {
return not_impl_err!("Put Mode is not implemented for CSV Sink yet")
let total_count = self.append_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
match self.config.single_file_output {
false => {
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id =
Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let serializer = CsvSerializer::new()
.with_builder(builder.clone())
.with_header(has_header);
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("{}_{}.csv", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
compression,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
true => {
let serializer = CsvSerializer::new()
.with_builder(builder.clone())
.with_header(has_header);
serializers.push(Box::new(serializer));
let file_path = base_path.prefix();
let object_meta = ObjectMeta {
location: file_path.clone(),
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
compression,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::Put => {
return not_impl_err!("FileWriterMode::Put is not supported yet!")
}
}

stateless_serialize_and_write_files(
data,
serializers,
writers,
self.config.single_file_output,
self.config.unbounded_input,
)
.await
}
}

Expand Down
Loading

0 comments on commit 7c6fdcc

Please sign in to comment.