Skip to content

Commit

Permalink
Remove single_file_output option from FileSinkConfig and Copy stateme…
Browse files Browse the repository at this point in the history
…nt (#9041)

* initial: remove single file config in all related files

* judge according to collections

* feat: change test cases

* fix test cases

* test case and doc

* format doc using prettier

* regen proto files
  • Loading branch information
yyy1000 committed Jan 30, 2024
1 parent 228c88f commit 4d389c2
Show file tree
Hide file tree
Showing 34 changed files with 53 additions and 142 deletions.
2 changes: 0 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,6 @@ impl DataFrame {
self.plan,
path.into(),
FileType::CSV,
options.single_file_output,
copy_options,
)?
.build()?;
Expand All @@ -1100,7 +1099,6 @@ impl DataFrame {
self.plan,
path.into(),
FileType::JSON,
options.single_file_output,
copy_options,
)?
.build()?;
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl DataFrame {
self.plan,
path.into(),
FileType::PARQUET,
options.single_file_output,
copy_options,
)?
.build()?;
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ impl DataSink for ArrowFileSink {
part_col,
self.config.table_paths[0].clone(),
"arrow".into(),
self.config.single_file_output,
);

let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> =
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,6 @@ impl DataSink for ParquetSink {
part_col,
self.config.table_paths[0].clone(),
"parquet".into(),
self.config.single_file_output,
);

let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> =
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ pub(crate) fn start_demuxer_task(
partition_by: Option<Vec<(String, DataType)>>,
base_output_path: ListingTableUrl,
file_extension: String,
single_file_output: bool,
) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let context = context.clone();
let single_file_output = !base_output_path.is_collection();
let task: JoinHandle<std::result::Result<(), DataFusionError>> = match partition_by {
Some(parts) => {
// There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ pub(crate) async fn stateless_multipart_put(
.runtime_env()
.object_store(&config.object_store_url)?;

let single_file_output = config.single_file_output;
let base_output_path = &config.table_paths[0];
let part_cols = if !config.table_partition_cols.is_empty() {
Some(config.table_partition_cols.clone())
Expand All @@ -234,7 +233,6 @@ pub(crate) async fn stateless_multipart_put(
part_cols,
base_output_path.clone(),
file_extension,
single_file_output,
);

let rb_buffer_size = &context
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,6 @@ impl TableProvider for ListingTable {
file_groups,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
single_file_output: false,
overwrite,
file_type_writer_options,
};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,8 @@ mod tests {
ctx.runtime_env().register_object_store(&local_url, local);

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
let out_dir_url = "file://local/out/";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
df.write_csv(out_dir_url, DataFrameWriteOptions::new(), None)
.await?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,8 @@ mod tests {
ctx.runtime_env().register_object_store(&local_url, local);

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
let out_dir_url = "file://local/out/";
let df = ctx.sql("SELECT a, b FROM test").await?;
df.write_json(out_dir_url, DataFrameWriteOptions::new())
.await?;
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ pub struct FileSinkConfig {
/// A vector of column names and their corresponding data types,
/// representing the partitioning columns for the file
pub table_partition_cols: Vec<(String, DataType)>,
/// If true, it is assumed there is a single table_path which is a file to which all data should be written
/// regardless of input partitioning. Otherwise, each table path is assumed to be a directory
/// to which each output partition is written to its own output file.
pub single_file_output: bool,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
/// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2089,8 +2089,8 @@ mod tests {
ctx.runtime_env().register_object_store(&local_url, local);

// execute a simple query and write the results to parquet
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let out_dir_url = "file://local/out";
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
let out_dir_url = "file://local/out/";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
df.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
.await?;
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ impl DefaultPhysicalPlanner {
input,
output_url,
file_format,
single_file_output,
copy_options,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;
Expand All @@ -588,7 +587,6 @@ impl DefaultPhysicalPlanner {
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols: vec![],
single_file_output: *single_file_output,
overwrite: false,
file_type_writer_options
};
Expand Down
2 changes: 0 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,12 @@ impl LogicalPlanBuilder {
input: LogicalPlan,
output_url: String,
file_format: FileType,
single_file_output: bool,
copy_options: CopyOptions,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
file_format,
single_file_output,
copy_options,
})))
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ pub struct CopyTo {
pub output_url: String,
/// The file format to output (explicitly defined or inferred from file extension)
pub file_format: FileType,
/// If false, it is assumed output_url is a file to which all data should be written
/// regardless of input partitioning. Otherwise, output_url is assumed to be a directory
/// to which each output partition is written to its own output file
pub single_file_output: bool,
/// Arbitrary options as tuples
pub copy_options: CopyOptions,
}
Expand Down
6 changes: 2 additions & 4 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,11 @@ impl LogicalPlan {
output_url,
file_format,
copy_options,
single_file_output,
}) => Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(inputs.swap_remove(0)),
output_url: output_url.clone(),
file_format: file_format.clone(),
single_file_output: *single_file_output,

copy_options: copy_options.clone(),
})),
LogicalPlan::Values(Values { schema, .. }) => {
Expand Down Expand Up @@ -1551,7 +1550,6 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
single_file_output,
copy_options,
}) => {
let op_str = match copy_options {
Expand All @@ -1565,7 +1563,7 @@ impl LogicalPlan {
CopyOptions::WriterOptions(_) => "".into(),
};

write!(f, "CopyTo: format={file_format} output_url={output_url} single_file_output={single_file_output} options: ({op_str})")
write!(f, "CopyTo: format={file_format} output_url={output_url} options: ({op_str})")
}
LogicalPlan::Ddl(ddl) => {
write!(f, "{}", ddl.display())
Expand Down
2 changes: 0 additions & 2 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ message DistinctOnNode {
message CopyToNode {
LogicalPlanNode input = 1;
string output_url = 2;
bool single_file_output = 3;
oneof CopyOptions {
SQLOptions sql_options = 4;
FileTypeWriterOptions writer_options = 5;
Expand Down Expand Up @@ -1267,7 +1266,6 @@ message FileSinkConfig {
repeated string table_paths = 3;
Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
bool single_file_output = 7;
bool overwrite = 8;
FileTypeWriterOptions file_type_writer_options = 9;
}
Expand Down
36 changes: 0 additions & 36 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,6 @@ impl AsLogicalPlan for LogicalPlanNode {
input: Arc::new(input),
output_url: copy.output_url.clone(),
file_format: FileType::from_str(&copy.file_type)?,
single_file_output: copy.single_file_output,
copy_options,
},
))
Expand Down Expand Up @@ -1640,7 +1639,6 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::Copy(dml::CopyTo {
input,
output_url,
single_file_output,
file_format,
copy_options,
}) => {
Expand Down Expand Up @@ -1723,7 +1721,6 @@ impl AsLogicalPlan for LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
protobuf::CopyToNode {
input: Some(Box::new(input)),
single_file_output: *single_file_output,
output_url: output_url.to_string(),
file_type: file_format.to_string(),
copy_options: copy_options_proto,
Expand Down
Loading

0 comments on commit 4d389c2

Please sign in to comment.