Skip to content

Commit

Permalink
fix: Scanning hive partitioned files where hive columns are partially…
Browse files Browse the repository at this point in the history
… included in the file (#18626)

Co-authored-by: jinliu <[email protected]>
  • Loading branch information
nameexhaustion and Veiasai committed Sep 10, 2024
1 parent 38b376c commit 5ccb238
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 41 deletions.
71 changes: 52 additions & 19 deletions crates/polars-io/src/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use polars_core::series::Series;
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
///
/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`
///
/// # Safety
///
/// num_rows equals the height of the df when the df height is non-zero.
Expand All @@ -15,27 +17,58 @@ pub(crate) fn materialize_hive_partitions<D>(
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
let Some(first) = hive_columns.first() else {
// Insert these hive columns in the order they are stored in the file.
if hive_columns.is_empty() {
return;
};

if reader_schema.index_of(first.name()).is_some() {
// Insert these hive columns in the order they are stored in the file.
for s in hive_columns {
let i = match df.get_columns().binary_search_by_key(
&reader_schema.index_of(s.name()).unwrap_or(usize::MAX),
|s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN),
) {
Ok(i) => i,
Err(i) => i,
};

df.insert_column(i, s.new_from_index(0, num_rows)).unwrap();
}
} else {
for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
}

let hive_columns_iter = hive_columns.iter().map(|s| s.new_from_index(0, num_rows));

if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
// Fast-path - all hive columns are at the end
unsafe { df.get_columns_mut() }.extend(hive_columns_iter);
return;
}

let out_width: usize = df.width() + hive_columns.len();
let df_columns = df.get_columns();
let mut out_columns = Vec::with_capacity(out_width);

// We have a slightly involved algorithm here because `reader_schema` may contain extra
// columns that were excluded from a projection pushdown.

let hive_columns = hive_columns_iter.collect::<Vec<_>>();
// Safety: These are both non-empty at the start
let mut series_arr = [df_columns, hive_columns.as_slice()];
let mut schema_idx_arr = [
reader_schema.index_of(series_arr[0][0].name()).unwrap(),
reader_schema.index_of(series_arr[1][0].name()).unwrap(),
];

loop {
let arg_min = if schema_idx_arr[0] < schema_idx_arr[1] {
0
} else {
1
};

out_columns.push(series_arr[arg_min][0].clone());
series_arr[arg_min] = &series_arr[arg_min][1..];

if series_arr[arg_min].is_empty() {
break;
}

let Some(i) = reader_schema.index_of(series_arr[arg_min][0].name()) else {
break;
};

schema_idx_arr[arg_min] = i;
}

out_columns.extend_from_slice(series_arr[0]);
out_columns.extend_from_slice(series_arr[1]);

*unsafe { df.get_columns_mut() } = out_columns;
}
}
41 changes: 20 additions & 21 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,27 +1068,26 @@ pub(crate) fn maybe_init_projection_excluding_hive(
// Update `with_columns` with a projection so that hive columns aren't loaded from the
// file
let hive_parts = hive_parts?;

let hive_schema = hive_parts.schema();

let (first_hive_name, _) = hive_schema.get_at_index(0)?;

// TODO: Optimize this
let names = match reader_schema {
Either::Left(ref v) => v
.contains(first_hive_name.as_str())
.then(|| v.iter_names_cloned().collect::<Vec<_>>()),
Either::Right(ref v) => v
.contains(first_hive_name.as_str())
.then(|| v.iter_names_cloned().collect()),
};

let names = names?;

Some(
names
.into_iter()
.filter(|x| !hive_schema.contains(x))
.collect::<Arc<[_]>>(),
)
match &reader_schema {
Either::Left(reader_schema) => hive_schema
.iter_names()
.any(|x| reader_schema.contains(x))
.then(|| {
reader_schema
.iter_names_cloned()
.filter(|x| !hive_schema.contains(x))
.collect::<Arc<[_]>>()
}),
Either::Right(reader_schema) => hive_schema
.iter_names()
.any(|x| reader_schema.contains(x))
.then(|| {
reader_schema
.iter_names_cloned()
.filter(|x| !hive_schema.contains(x))
.collect::<Arc<[_]>>()
}),
}
}
5 changes: 4 additions & 1 deletion crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ impl HivePartitions {
}
}

/// Note: Returned hive partitions are ordered by their position in the `reader_schema`
///
/// # Safety
/// `hive_start_idx <= [min path length]`
pub fn hive_partitions_from_paths(
Expand Down Expand Up @@ -198,10 +200,11 @@ pub fn hive_partitions_from_paths(
}

let mut hive_partitions = Vec::with_capacity(paths.len());
let buffers = buffers
let mut buffers = buffers
.into_iter()
.map(|x| x.into_series())
.collect::<PolarsResult<Vec<_>>>()?;
buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX));

#[allow(clippy::needless_range_loop)]
for i in 0..paths.len() {
Expand Down
36 changes: 36 additions & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,42 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None:
)
assert_with_projections(lf, rhs)

# partial cols in file
partial_path = tmp_path / "a=1/b=2/partial_data.bin"
df = pl.DataFrame(
{"x": 1, "b": 2, "y": 1},
schema={"x": pl.Int32, "b": pl.Int16, "y": pl.Int32},
)
write_func(df, partial_path)

rhs = rhs.select(
pl.col("x").cast(pl.Int32),
pl.col("b").cast(pl.Int16),
pl.col("y").cast(pl.Int32),
pl.col("a").cast(pl.Int64),
)

lf = scan_func(partial_path, hive_partitioning=True) # type: ignore[call-arg]
assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs)
assert_with_projections(lf, rhs)

lf = scan_func( # type: ignore[call-arg]
partial_path,
hive_schema={"a": pl.String, "b": pl.String},
hive_partitioning=True,
)
rhs = rhs.select(
pl.col("x").cast(pl.Int32),
pl.col("b").cast(pl.String),
pl.col("y").cast(pl.Int32),
pl.col("a").cast(pl.String),
)
assert_frame_equal(
lf.collect(projection_pushdown=projection_pushdown),
rhs,
)
assert_with_projections(lf, rhs)


@pytest.mark.write_disk
def test_hive_partition_dates(tmp_path: Path) -> None:
Expand Down

0 comments on commit 5ccb238

Please sign in to comment.