diff --git a/.gitattributes b/.gitattributes index bcdeffc09a11..84b47a6fc56e 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,3 +1,4 @@ .github/ export-ignore +datafusion/core/tests/data/newlines_in_values.csv text eol=lf datafusion/proto/src/generated/prost.rs linguist-generated datafusion/proto/src/generated/pbjson.rs linguist-generated diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b46b002baac0..3cbe14cb558e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -184,6 +184,16 @@ config_namespace! { /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE` /// if not specified explicitly in the statement. pub has_header: bool, default = false + + /// Specifies whether newlines in (quoted) CSV values are supported. + /// + /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` + /// if not specified explicitly in the statement. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + pub newlines_in_values: bool, default = false } } @@ -1593,6 +1603,14 @@ config_namespace! { pub quote: u8, default = b'"' pub escape: Option, default = None pub double_quote: Option, default = None + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. + pub newlines_in_values: Option, default = None pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED pub schema_infer_max_rec: usize, default = 100 pub date_format: Option, default = None @@ -1665,6 +1683,18 @@ impl CsvOptions { self } + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. + pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self { + self.newlines_in_values = Some(newlines_in_values); + self + } + /// Set a `CompressionTypeVariant` of CSV /// - defaults to `CompressionTypeVariant::UNCOMPRESSED` pub fn with_file_compression_type( diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 5b648c5c9d1d..8a3cfa153606 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -246,6 +246,18 @@ impl CsvFormat { self } + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. + pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self { + self.options.newlines_in_values = Some(newlines_in_values); + self + } + /// Set a `FileCompressionType` of CSV /// - defaults to `FileCompressionType::UNCOMPRESSED` pub fn with_file_compression_type( @@ -343,6 +355,9 @@ impl FileFormat for CsvFormat { self.options.quote, self.options.escape, self.options.comment, + self.options + .newlines_in_values + .unwrap_or(state.config_options().catalog.newlines_in_values), self.options.compression.into(), ); Ok(Arc::new(exec)) @@ -1065,6 +1080,41 @@ mod tests { Ok(()) } + #[rstest(n_partitions, case(1), case(2), case(3), case(4))] + #[tokio::test] + async fn test_csv_parallel_newlines_in_values(n_partitions: usize) -> Result<()> { + let config = SessionConfig::new() + .with_repartition_file_scans(true) + .with_repartition_file_min_size(0) + .with_target_partitions(n_partitions); + let csv_options = CsvReadOptions::default() + .has_header(true) + .newlines_in_values(true); + let ctx = SessionContext::new_with_config(config); + let testdata = arrow_test_data(); + ctx.register_csv( + "aggr", + &format!("{testdata}/csv/aggregate_test_100.csv"), + csv_options, + ) + .await?; + + let query = "select sum(c3) from aggr;"; + let query_result = ctx.sql(query).await?.collect().await?; + let actual_partitions = count_query_csv_partitions(&ctx, query).await?; + + #[rustfmt::skip] + let expected = ["+--------------+", + "| sum(aggr.c3) |", + "+--------------+", + "| 781 |", + "+--------------+"]; + assert_batches_eq!(expected, &query_result); + assert_eq!(1, actual_partitions); // csv won't be scanned in parallel when newlines_in_values is set + + Ok(()) + } + /// Read a single empty csv file in parallel /// /// empty_0_byte.csv: diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index c6d143ed6749..552977baba17 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -63,6 +63,14 @@ pub struct CsvReadOptions<'a> { pub escape: Option, /// If enabled, lines beginning with this byte are ignored. pub comment: Option, + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. + pub newlines_in_values: bool, /// An optional schema representing the CSV files. If None, CSV reader will try to infer it /// based on data in file. pub schema: Option<&'a Schema>, @@ -95,6 +103,7 @@ impl<'a> CsvReadOptions<'a> { delimiter: b',', quote: b'"', escape: None, + newlines_in_values: false, file_extension: DEFAULT_CSV_EXTENSION, table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, @@ -133,6 +142,18 @@ impl<'a> CsvReadOptions<'a> { self } + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. + pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self { + self.newlines_in_values = newlines_in_values; + self + } + /// Specify the file extension for CSV file selection pub fn file_extension(mut self, file_extension: &'a str) -> Self { self.file_extension = file_extension; @@ -490,6 +511,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_delimiter(self.delimiter) .with_quote(self.quote) .with_escape(self.escape) + .with_newlines_in_values(self.newlines_in_values) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 327fbd976e87..fb0e23c6c164 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -59,6 +59,7 @@ pub struct CsvExec { quote: u8, escape: Option, comment: Option, + newlines_in_values: bool, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Compression type of the file associated with CsvExec @@ -68,6 +69,7 @@ pub struct CsvExec { impl CsvExec { /// Create a new CSV reader execution plan provided base and specific configurations + #[allow(clippy::too_many_arguments)] pub fn new( base_config: FileScanConfig, has_header: bool, @@ -75,6 +77,7 @@ impl CsvExec { quote: u8, escape: Option, comment: Option, + newlines_in_values: bool, file_compression_type: FileCompressionType, ) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = @@ -91,6 +94,7 @@ impl CsvExec { delimiter, quote, escape, + newlines_in_values, metrics: ExecutionPlanMetricsSet::new(), file_compression_type, cache, @@ -126,6 +130,17 @@ impl CsvExec { self.escape } + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. + pub fn newlines_in_values(&self) -> bool { + self.newlines_in_values + } + fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning { Partitioning::UnknownPartitioning(file_scan_config.file_groups.len()) } @@ -196,15 +211,15 @@ impl ExecutionPlan for CsvExec { /// Redistribute files across partitions according to their size /// See comments on [`FileGroupPartitioner`] for more detail. /// - /// Return `None` if can't get repartitioned(empty/compressed file). + /// Return `None` if can't get repartitioned (empty, compressed file, or `newlines_in_values` set). fn repartitioned( &self, target_partitions: usize, config: &ConfigOptions, ) -> Result>> { let repartition_file_min_size = config.optimizer.repartition_file_min_size; - // Parallel execution on compressed CSV file is not supported yet. - if self.file_compression_type.is_compressed() { + // Parallel execution on compressed CSV files or files that must support newlines in values is not supported yet. + if self.file_compression_type.is_compressed() || self.newlines_in_values { return Ok(None); } @@ -589,6 +604,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -658,6 +674,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -727,6 +744,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -793,6 +811,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(14, csv.base_config.file_schema.fields().len()); @@ -858,6 +877,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -953,6 +973,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index afed5dd37535..9791f23f963e 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1472,6 +1472,7 @@ pub(crate) mod tests { b'"', None, None, + false, FileCompressionType::UNCOMPRESSED, )) } @@ -1496,6 +1497,7 @@ pub(crate) mod tests { b'"', None, None, + false, FileCompressionType::UNCOMPRESSED, )) } @@ -3770,6 +3772,7 @@ pub(crate) mod tests { b'"', None, None, + false, compression_type, )), vec![("a".to_string(), "a".to_string())], diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 84f898431762..d0d0c985b8b6 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -186,6 +186,7 @@ fn try_swapping_with_csv( csv.quote(), csv.escape(), csv.comment(), + csv.newlines_in_values(), csv.file_compression_type, )) as _ }) @@ -1700,6 +1701,7 @@ mod tests { 0, None, None, + false, FileCompressionType::UNCOMPRESSED, )) } @@ -1723,6 +1725,7 @@ mod tests { 0, None, None, + false, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 013155b8400a..6565e3e7d0d2 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1503,6 +1503,7 @@ mod tests { b'"', None, None, + false, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index e8550a79cb0e..5cb1b6ea7017 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -99,6 +99,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result for CsvOptions { quote: proto_opts.quote[0], escape: proto_opts.escape.first().copied(), double_quote: proto_opts.has_header.first().map(|h| *h != 0), + newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0), compression: proto_opts.compression().into(), schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize, date_format: (!proto_opts.date_format.is_empty()) diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index be3cc58b23df..4b34660ae2ef 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1884,6 +1884,9 @@ impl serde::Serialize for CsvOptions { if !self.double_quote.is_empty() { len += 1; } + if !self.newlines_in_values.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1936,6 +1939,10 @@ impl serde::Serialize for CsvOptions { #[allow(clippy::needless_borrow)] struct_ser.serialize_field("doubleQuote", pbjson::private::base64::encode(&self.double_quote).as_str())?; } + if !self.newlines_in_values.is_empty() { + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("newlinesInValues", pbjson::private::base64::encode(&self.newlines_in_values).as_str())?; + } struct_ser.end() } } @@ -1969,6 +1976,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "comment", "double_quote", "doubleQuote", + "newlines_in_values", + "newlinesInValues", ]; #[allow(clippy::enum_variant_names)] @@ -1987,6 +1996,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { NullValue, Comment, DoubleQuote, + NewlinesInValues, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -2022,6 +2032,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "nullValue" | "null_value" => Ok(GeneratedField::NullValue), "comment" => Ok(GeneratedField::Comment), "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), + "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -2055,6 +2066,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut null_value__ = None; let mut comment__ = None; let mut double_quote__ = None; + let mut newlines_in_values__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -2155,6 +2167,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) ; } + GeneratedField::NewlinesInValues => { + if newlines_in_values__.is_some() { + return Err(serde::de::Error::duplicate_field("newlinesInValues")); + } + newlines_in_values__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -2172,6 +2192,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { null_value: null_value__.unwrap_or_default(), comment: comment__.unwrap_or_default(), double_quote: double_quote__.unwrap_or_default(), + newlines_in_values: newlines_in_values__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index b0674ff28d75..9a2770997f15 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -633,6 +633,9 @@ pub struct CsvOptions { /// Indicates if quotes are doubled #[prost(bytes = "vec", tag = "14")] pub double_quote: ::prost::alloc::vec::Vec, + /// Indicates if newlines are supported in values + #[prost(bytes = "vec", tag = "15")] + pub newlines_in_values: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 705a479e0178..9dcb65444a47 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -900,6 +900,9 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { quote: vec![opts.quote], escape: opts.escape.map_or_else(Vec::new, |e| vec![e]), double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]), + newlines_in_values: opts + .newlines_in_values + .map_or_else(Vec::new, |h| vec![h as u8]), compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec as u64, date_format: opts.date_format.clone().unwrap_or_default(), diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index dc551778c5fb..49d9f2dde67f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1007,6 +1007,7 @@ message CsvScanExecNode { oneof optional_comment { string comment = 6; } + bool newlines_in_values = 7; } message AvroScanExecNode { diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index b0674ff28d75..9a2770997f15 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -633,6 +633,9 @@ pub struct CsvOptions { /// Indicates if quotes are doubled #[prost(bytes = "vec", tag = "14")] pub double_quote: ::prost::alloc::vec::Vec, + /// Indicates if newlines are supported in values + #[prost(bytes = "vec", tag = "15")] + pub newlines_in_values: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 8f77c24bd911..25f6646d2a9a 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3605,6 +3605,9 @@ impl serde::Serialize for CsvScanExecNode { if !self.quote.is_empty() { len += 1; } + if self.newlines_in_values { + len += 1; + } if self.optional_escape.is_some() { len += 1; } @@ -3624,6 +3627,9 @@ impl serde::Serialize for CsvScanExecNode { if !self.quote.is_empty() { struct_ser.serialize_field("quote", &self.quote)?; } + if self.newlines_in_values { + struct_ser.serialize_field("newlinesInValues", &self.newlines_in_values)?; + } if let Some(v) = self.optional_escape.as_ref() { match v { csv_scan_exec_node::OptionalEscape::Escape(v) => { @@ -3654,6 +3660,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "hasHeader", "delimiter", "quote", + "newlines_in_values", + "newlinesInValues", "escape", "comment", ]; @@ -3664,6 +3672,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { HasHeader, Delimiter, Quote, + NewlinesInValues, Escape, Comment, } @@ -3691,6 +3700,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "hasHeader" | "has_header" => Ok(GeneratedField::HasHeader), "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), + "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), "escape" => Ok(GeneratedField::Escape), "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -3716,6 +3726,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut has_header__ = None; let mut delimiter__ = None; let mut quote__ = None; + let mut newlines_in_values__ = None; let mut optional_escape__ = None; let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { @@ -3744,6 +3755,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } quote__ = Some(map_.next_value()?); } + GeneratedField::NewlinesInValues => { + if newlines_in_values__.is_some() { + return Err(serde::de::Error::duplicate_field("newlinesInValues")); + } + newlines_in_values__ = Some(map_.next_value()?); + } GeneratedField::Escape => { if optional_escape__.is_some() { return Err(serde::de::Error::duplicate_field("escape")); @@ -3763,6 +3780,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { has_header: has_header__.unwrap_or_default(), delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), + newlines_in_values: newlines_in_values__.unwrap_or_default(), optional_escape: optional_escape__, optional_comment: optional_comment__, }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 605c56fa946a..ba288fe3d1b8 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1542,6 +1542,8 @@ pub struct CsvScanExecNode { pub delimiter: ::prost::alloc::string::String, #[prost(string, tag = "4")] pub quote: ::prost::alloc::string::String, + #[prost(bool, tag = "7")] + pub newlines_in_values: bool, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1220f42ded83..9e17c19ecbc5 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -211,6 +211,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + scan.newlines_in_values, FileCompressionType::UNCOMPRESSED, ))), #[cfg(feature = "parquet")] @@ -1579,6 +1580,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + newlines_in_values: exec.newlines_in_values(), }, )), }); diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index ca3bebe79f27..f7f5aa54dd0d 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -293,3 +293,45 @@ id0 "value0" id1 "value1" id2 "value2" id3 "value3" + +# Handling of newlines in values + +statement ok +SET datafusion.optimizer.repartition_file_min_size = 1; + +statement ok +CREATE EXTERNAL TABLE stored_table_with_newlines_in_values_unsafe ( +col1 TEXT, +col2 TEXT +) STORED AS CSV +LOCATION '../core/tests/data/newlines_in_values.csv'; + +statement error incorrect number of fields +select * from stored_table_with_newlines_in_values_unsafe; + +statement ok +CREATE EXTERNAL TABLE stored_table_with_newlines_in_values_safe ( +col1 TEXT, +col2 TEXT +) STORED AS CSV +LOCATION '../core/tests/data/newlines_in_values.csv' +OPTIONS ('format.newlines_in_values' 'true'); + +query TT +select * from stored_table_with_newlines_in_values_safe; +---- +id message +1 +01)hello +02)world +2 +01)something +02)else +3 +01) +02)many +03)lines +04)make +05)good test +4 unquoted +value end diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index f7b755b01911..c8c0d1d45b97 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -168,6 +168,7 @@ datafusion.catalog.format NULL datafusion.catalog.has_header false datafusion.catalog.information_schema true datafusion.catalog.location NULL +datafusion.catalog.newlines_in_values false datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true @@ -252,6 +253,7 @@ datafusion.catalog.format NULL Type of `TableProvider` to use when loading `defa datafusion.catalog.has_header false Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. datafusion.catalog.information_schema true Should DataFusion provide access to `information_schema` virtual tables for displaying schema information datafusion.catalog.location NULL Location scanned to load tables for `default` schema +datafusion.catalog.newlines_in_values false Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 8d3ecbc98544..5e5de016e375 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -44,6 +44,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | | datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | | datafusion.catalog.has_header | false | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. | +| datafusion.catalog.newlines_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | | datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | | datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | | datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files |