From 5ccb238d62e2e3e471718462714a1c61d97c72c3 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Tue, 10 Sep 2024 17:23:08 +1000 Subject: [PATCH] fix: Scanning hive partitioned files where hive columns are partially included in the file (#18626) Co-authored-by: jinliu --- crates/polars-io/src/hive.rs | 71 ++++++++++++++----- .../src/plans/conversion/dsl_to_ir.rs | 41 ++++++----- crates/polars-plan/src/plans/hive.rs | 5 +- py-polars/tests/unit/io/test_hive.py | 36 ++++++++++ 4 files changed, 112 insertions(+), 41 deletions(-) diff --git a/crates/polars-io/src/hive.rs b/crates/polars-io/src/hive.rs index b027e6d1d054..17ace26d6be7 100644 --- a/crates/polars-io/src/hive.rs +++ b/crates/polars-io/src/hive.rs @@ -5,6 +5,8 @@ use polars_core::series::Series; /// We have a special num_rows arg, as df can be empty when a projection contains /// only hive partition columns. /// +/// The `hive_partition_columns` must be ordered by their position in the `reader_schema` +/// /// # Safety /// /// num_rows equals the height of the df when the df height is non-zero. @@ -15,27 +17,58 @@ pub(crate) fn materialize_hive_partitions( num_rows: usize, ) { if let Some(hive_columns) = hive_partition_columns { - let Some(first) = hive_columns.first() else { + // Insert these hive columns in the order they are stored in the file. + if hive_columns.is_empty() { return; - }; - - if reader_schema.index_of(first.name()).is_some() { - // Insert these hive columns in the order they are stored in the file. - for s in hive_columns { - let i = match df.get_columns().binary_search_by_key( - &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), - |s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN), - ) { - Ok(i) => i, - Err(i) => i, - }; - - df.insert_column(i, s.new_from_index(0, num_rows)).unwrap(); - } - } else { - for s in hive_columns { - unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) }; + } + + let hive_columns_iter = hive_columns.iter().map(|s| s.new_from_index(0, num_rows)); + + if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 { + // Fast-path - all hive columns are at the end + unsafe { df.get_columns_mut() }.extend(hive_columns_iter); + return; + } + + let out_width: usize = df.width() + hive_columns.len(); + let df_columns = df.get_columns(); + let mut out_columns = Vec::with_capacity(out_width); + + // We have a slightly involved algorithm here because `reader_schema` may contain extra + // columns that were excluded from a projection pushdown. + + let hive_columns = hive_columns_iter.collect::>(); + // Safety: These are both non-empty at the start + let mut series_arr = [df_columns, hive_columns.as_slice()]; + let mut schema_idx_arr = [ + reader_schema.index_of(series_arr[0][0].name()).unwrap(), + reader_schema.index_of(series_arr[1][0].name()).unwrap(), + ]; + + loop { + let arg_min = if schema_idx_arr[0] < schema_idx_arr[1] { + 0 + } else { + 1 + }; + + out_columns.push(series_arr[arg_min][0].clone()); + series_arr[arg_min] = &series_arr[arg_min][1..]; + + if series_arr[arg_min].is_empty() { + break; } + + let Some(i) = reader_schema.index_of(series_arr[arg_min][0].name()) else { + break; + }; + + schema_idx_arr[arg_min] = i; } + + out_columns.extend_from_slice(series_arr[0]); + out_columns.extend_from_slice(series_arr[1]); + + *unsafe { df.get_columns_mut() } = out_columns; } } diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 658dc9989eb5..a908378e6f5c 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -1068,27 +1068,26 @@ pub(crate) fn maybe_init_projection_excluding_hive( // Update `with_columns` with a projection so that hive columns aren't loaded from the // file let hive_parts = hive_parts?; - let hive_schema = hive_parts.schema(); - let (first_hive_name, _) = hive_schema.get_at_index(0)?; - - // TODO: Optimize this - let names = match reader_schema { - Either::Left(ref v) => v - .contains(first_hive_name.as_str()) - .then(|| v.iter_names_cloned().collect::>()), - Either::Right(ref v) => v - .contains(first_hive_name.as_str()) - .then(|| v.iter_names_cloned().collect()), - }; - - let names = names?; - - Some( - names - .into_iter() - .filter(|x| !hive_schema.contains(x)) - .collect::>(), - ) + match &reader_schema { + Either::Left(reader_schema) => hive_schema + .iter_names() + .any(|x| reader_schema.contains(x)) + .then(|| { + reader_schema + .iter_names_cloned() + .filter(|x| !hive_schema.contains(x)) + .collect::>() + }), + Either::Right(reader_schema) => hive_schema + .iter_names() + .any(|x| reader_schema.contains(x)) + .then(|| { + reader_schema + .iter_names_cloned() + .filter(|x| !hive_schema.contains(x)) + .collect::>() + }), + } } diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 3fc7531ea2b3..a711aeb11848 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -57,6 +57,8 @@ impl HivePartitions { } } +/// Note: Returned hive partitions are ordered by their position in the `reader_schema` +/// /// # Safety /// `hive_start_idx <= [min path length]` pub fn hive_partitions_from_paths( @@ -198,10 +200,11 @@ pub fn hive_partitions_from_paths( } let mut hive_partitions = Vec::with_capacity(paths.len()); - let buffers = buffers + let mut buffers = buffers .into_iter() .map(|x| x.into_series()) .collect::>>()?; + buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX)); #[allow(clippy::needless_range_loop)] for i in 0..paths.len() { diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index ad285b82f3b3..a01a2ef6e59d 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -554,6 +554,42 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: ) assert_with_projections(lf, rhs) + # partial cols in file + partial_path = tmp_path / "a=1/b=2/partial_data.bin" + df = pl.DataFrame( + {"x": 1, "b": 2, "y": 1}, + schema={"x": pl.Int32, "b": pl.Int16, "y": pl.Int32}, + ) + write_func(df, partial_path) + + rhs = rhs.select( + pl.col("x").cast(pl.Int32), + pl.col("b").cast(pl.Int16), + pl.col("y").cast(pl.Int32), + pl.col("a").cast(pl.Int64), + ) + + lf = scan_func(partial_path, hive_partitioning=True) # type: ignore[call-arg] + assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs) + assert_with_projections(lf, rhs) + + lf = scan_func( # type: ignore[call-arg] + partial_path, + hive_schema={"a": pl.String, "b": pl.String}, + hive_partitioning=True, + ) + rhs = rhs.select( + pl.col("x").cast(pl.Int32), + pl.col("b").cast(pl.String), + pl.col("y").cast(pl.Int32), + pl.col("a").cast(pl.String), + ) + assert_frame_equal( + lf.collect(projection_pushdown=projection_pushdown), + rhs, + ) + assert_with_projections(lf, rhs) + @pytest.mark.write_disk def test_hive_partition_dates(tmp_path: Path) -> None: