From 06161e236de219f6a8016c8ea056ee85cf9f8bf6 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Thu, 5 Sep 2024 16:41:26 +0200 Subject: [PATCH] no more failing tests --- crates/polars-lazy/src/scan/ndjson.rs | 9 +- .../src/executors/scan/parquet.rs | 99 ++++++++++++---- .../src/parquet/encoding/uleb128.rs | 1 + .../src/plans/conversion/dsl_to_ir.rs | 10 +- crates/polars-plan/src/plans/ir/format.rs | 2 +- crates/polars-plan/src/plans/ir/mod.rs | 111 +----------------- py-polars/polars/io/csv/functions.py | 8 +- py-polars/polars/io/ndjson.py | 9 +- py-polars/polars/io/parquet/functions.py | 4 +- py-polars/tests/unit/io/test_parquet.py | 6 +- py-polars/tests/unit/io/test_scan.py | 28 +++-- 11 files changed, 122 insertions(+), 165 deletions(-) diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index 8d71d9a585a2b..6d0492e170dcd 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -128,10 +128,11 @@ impl LazyFileListReader for LazyJsonLineReader { row_index: self.row_index, rechunk: self.rechunk, file_counter: 0, - hive_options: { - let mut options = HiveOptions::default(); - options.enabled = Some(false); - options + hive_options: HiveOptions { + enabled: Some(false), + hive_start_idx: 0, + schema: None, + try_parse_dates: true, }, glob: true, include_file_paths: self.include_file_paths, diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 509ea7ba8c551..7de67aff42849 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -60,40 +60,94 @@ impl ParquetExec { let mut result = vec![]; let step = std::cmp::min(POOL.current_num_threads(), 128); - let slice_info = match self.file_options.slice { - None => ScanSourceSliceInfo { - item_slice: 0..usize::MAX, - source_slice: 0..self.sources.len(), - }, - Some(slice) => { - self.sources - .collect_slice_information(slice, |source| match source { - ScanSourceRef::File(path) => { - ParquetReader::new(std::fs::File::open(path)?).num_rows() - }, - ScanSourceRef::Buffer(buff) => { - ParquetReader::new(std::io::Cursor::new(buff)).num_rows() - }, - })? - }, + // Modified if we have a negative slice + let mut first_source = 0; + + // (offset, end) + let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice { + if slice.0 >= 0 { + (slice.0 as usize, slice.1.saturating_add(slice.0 as usize)) + } else { + // Walk the files in reverse until we find the first file, and then translate the + // slice into a positive-offset equivalent. + let slice_start_as_n_from_end = -slice.0 as usize; + let mut cum_rows = 0; + let chunk_size = 8; + POOL.install(|| { + for path_indexes in (0..self.sources.len()) + .rev() + .collect::>() + .chunks(chunk_size) + { + let row_counts = path_indexes + .into_par_iter() + .map(|&i| { + let memslice = match self.sources.at(i) { + ScanSourceRef::File(path) => { + let file = std::fs::File::open(path)?; + MemSlice::from_mmap(Arc::new(unsafe { + memmap::Mmap::map(&file).unwrap() + })) + }, + ScanSourceRef::Buffer(buff) => { + MemSlice::from_bytes(buff.clone()) + }, + }; + + ParquetReader::new(std::io::Cursor::new(memslice)).num_rows() + }) + .collect::>>()?; + + for (path_idx, rc) in path_indexes.iter().zip(row_counts) { + cum_rows += rc; + + if cum_rows >= slice_start_as_n_from_end { + first_source = *path_idx; + break; + } + } + + if first_source > 0 { + break; + } + } + + PolarsResult::Ok(()) + })?; + + let (start, len) = if slice_start_as_n_from_end > cum_rows { + // We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50 + // rows should only give the first 25 rows. + let first_file_position = slice_start_as_n_from_end - cum_rows; + (0, slice.1.saturating_sub(first_file_position)) + } else { + (cum_rows - slice_start_as_n_from_end, slice.1) + }; + + let end = start.saturating_add(len); + + (start, end) + } + } else { + (0, usize::MAX) }; let mut current_offset = 0; let base_row_index = self.file_options.row_index.take(); // Limit no. of files at a time to prevent open file limits. - for i in slice_info.source_slice.step_by(step) { + for i in (first_source..self.sources.len()).step_by(step) { let end = std::cmp::min(i.saturating_add(step), self.sources.len()); let hive_parts = self.hive_parts.as_ref().map(|x| &x[i..end]); - if current_offset >= slice_info.item_slice.end && !result.is_empty() { + if current_offset >= slice_end && !result.is_empty() { return Ok(result); } // First initialize the readers, predicates and metadata. // This will be used to determine the slices. That way we can actually read all the // files in parallel even if we add row index columns or slices. - let iter = (0..self.sources.len()).into_par_iter().map(|i| { + let iter = (i..end).into_par_iter().map(|i| { let source = self.sources.at(i); let hive_partitions = hive_parts.map(|x| x[i].materialize_partition_columns()); @@ -141,12 +195,7 @@ impl ParquetExec { let cum_rows = *current_offset_ref; ( cum_rows, - split_slice_at_file( - current_offset_ref, - *num_rows, - slice_info.item_slice.start, - slice_info.item_slice.end, - ), + split_slice_at_file(current_offset_ref, *num_rows, slice_offset, slice_end), ) }) .collect::>(); diff --git a/crates/polars-parquet/src/parquet/encoding/uleb128.rs b/crates/polars-parquet/src/parquet/encoding/uleb128.rs index 08459233961ce..0740c9575a151 100644 --- a/crates/polars-parquet/src/parquet/encoding/uleb128.rs +++ b/crates/polars-parquet/src/parquet/encoding/uleb128.rs @@ -1,5 +1,6 @@ // Reads an uleb128 encoded integer with at most 56 bits (8 bytes with 7 bits worth of payload each). /// Returns the integer and the number of bytes that made up this integer. +/// /// If the returned length is bigger than 8 this means the integer required more than 8 bytes and the remaining bytes need to be read sequentially and combined with the return value. /// /// # Safety diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index cce5139c1233e..8bf5c4b118e2b 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -192,7 +192,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut ConversionContext) -> PolarsResult { - expand_scan_paths_with_hive_update(&paths, file_options, cloud_options)? + expand_scan_paths_with_hive_update(paths, file_options, cloud_options)? }, #[cfg(feature = "ipc")] FileScan::Ipc { cloud_options, .. } => { - expand_scan_paths_with_hive_update(&paths, file_options, cloud_options)? + expand_scan_paths_with_hive_update(paths, file_options, cloud_options)? }, #[cfg(feature = "csv")] FileScan::Csv { cloud_options, .. } => { - expand_paths(&paths, file_options.glob, cloud_options.as_ref())? + expand_paths(paths, file_options.glob, cloud_options.as_ref())? }, #[cfg(feature = "json")] FileScan::NDJson { cloud_options, .. } => { - expand_paths(&paths, file_options.glob, cloud_options.as_ref())? + expand_paths(paths, file_options.glob, cloud_options.as_ref())? }, FileScan::Anonymous { .. } => unreachable!(), // Invariant: Anonymous scans are already expanded. }; diff --git a/crates/polars-plan/src/plans/ir/format.rs b/crates/polars-plan/src/plans/ir/format.rs index 9a0af169d5563..d5607e07f6c80 100644 --- a/crates/polars-plan/src/plans/ir/format.rs +++ b/crates/polars-plan/src/plans/ir/format.rs @@ -243,7 +243,7 @@ impl<'a> IRDisplay<'a> { write_scan( f, scan_type.into(), - &sources, + sources, indent, n_columns, file_info.schema.len(), diff --git a/crates/polars-plan/src/plans/ir/mod.rs b/crates/polars-plan/src/plans/ir/mod.rs index 95a7a5aaf3740..328efce28be94 100644 --- a/crates/polars-plan/src/plans/ir/mod.rs +++ b/crates/polars-plan/src/plans/ir/mod.rs @@ -13,7 +13,6 @@ pub use format::{ExprIRDisplay, IRDisplay}; use hive::HivePartitions; use polars_core::error::feature_gated; use polars_core::prelude::*; -use polars_core::POOL; use polars_io::file_cache::FileCacheEntry; use polars_utils::idx_vec::UnitVec; use polars_utils::mmap::MemSlice; @@ -103,7 +102,7 @@ impl ScanSources { } pub fn as_paths(&self) -> &[PathBuf] { match self { - Self::Files(paths) => &paths, + Self::Files(paths) => paths, Self::Buffers(_) => unimplemented!(), } } @@ -138,7 +137,7 @@ impl ScanSources { pub fn is_cloud_url(&self) -> bool { match self { - Self::Files(paths) => paths.first().map_or(false, |p| polars_io::is_cloud_url(p)), + Self::Files(paths) => paths.first().map_or(false, polars_io::is_cloud_url), Self::Buffers(_) => false, } } @@ -171,114 +170,10 @@ impl ScanSources { } } - /// Normalize the slice and collect information as to what rows and parts of the source are - /// used in this slice. - pub fn collect_slice_information( - &self, - slice: (i64, usize), - map_to_num_rows: impl Fn(ScanSourceRef) -> PolarsResult + Send + Sync, - ) -> PolarsResult { - fn slice_to_start_end( - offset: i64, - length: usize, - num_rows: usize, - ) -> std::ops::Range { - if offset < 0 { - let slice_start_as_n_from_end = -offset as usize; - let (start, len) = if slice_start_as_n_from_end > num_rows { - // We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50 - // rows should only give the first 25 rows. - let start_position = slice_start_as_n_from_end - num_rows; - (0, length.saturating_sub(start_position)) - } else { - (num_rows - slice_start_as_n_from_end, length) - }; - - let end = start.saturating_add(len); - - start..end - } else { - let offset = offset as usize; - offset.min(num_rows)..(offset + length).min(num_rows) - } - } - - let (offset, length) = slice; - - if self.is_empty() { - return Ok(ScanSourceSliceInfo { - item_slice: 0..0, - source_slice: 0..0, - }); - } - - if self.len() == 1 { - let num_rows = map_to_num_rows(self.get(0).unwrap())?; - let item_slice = slice_to_start_end(offset, length, num_rows); - let source_slice = if item_slice.is_empty() { 0..0 } else { 0..1 }; - - Ok(ScanSourceSliceInfo { - item_slice, - source_slice, - }) - } else { - use rayon::prelude::*; - - // Walk the files in reverse until we find the first file, and then translate the - // slice into a positive-offset equivalent. - const CHUNK_SIZE: usize = 8; - let mut row_counts = Vec::with_capacity(self.len()); - - POOL.install(|| { - for idx_end in (0..self.len()).step_by(CHUNK_SIZE) { - let idx_start = idx_end.saturating_sub(CHUNK_SIZE); - - row_counts.extend( - (idx_start..=idx_end) - .into_par_iter() - .map(|i| map_to_num_rows(self.at(i))) - .collect::>>()? - .into_iter() - .rev(), - ); - } - - PolarsResult::Ok(()) - })?; - - let num_rows = row_counts.iter().sum::(); - - let item_slice = slice_to_start_end(offset, length, num_rows); - - let mut source_start = self.len() - 1; - let mut source_end = 0; - - let mut sum = 0; - for (i, row_count) in row_counts.iter().rev().enumerate() { - if sum < item_slice.end { - source_end = usize::max(source_end, i); - } - - sum += row_count; - - if sum >= item_slice.start { - source_start = usize::min(source_start, i); - } - } - - let source_slice = source_start..source_end + 1; - - Ok(ScanSourceSliceInfo { - item_slice, - source_slice, - }) - } - } - pub fn get(&self, idx: usize) -> Option { match self { ScanSources::Files(paths) => paths.get(idx).map(|p| ScanSourceRef::File(p)), - ScanSources::Buffers(buffers) => buffers.get(idx).map(|b| ScanSourceRef::Buffer(b)), + ScanSources::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer), } } diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 77cd73e0aa5f1..257522831cd30 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -1232,9 +1232,11 @@ def with_column_names(cols: list[str]) -> list[str]: if isinstance(source, (str, Path)): source = normalize_filepath(source, check_not_directory=False) - elif isinstance(source, (IO, BytesIO)): - pass - elif isinstance(source, list) and isinstance(source[0], BytesIO): + elif ( + isinstance(source, (IO, BytesIO)) + or isinstance(source, list) + and isinstance(source[0], BytesIO) + ): pass else: source = [ diff --git a/py-polars/polars/io/ndjson.py b/py-polars/polars/io/ndjson.py index 63032b5dc688f..166e990ba25df 100644 --- a/py-polars/polars/io/ndjson.py +++ b/py-polars/polars/io/ndjson.py @@ -166,7 +166,14 @@ def read_ndjson( @deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") @deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def scan_ndjson( - source: str | Path | IO[str] | IO[bytes] | list[str] | list[Path] | list[IO[str]] | list[IO[bytes]], + source: str + | Path + | IO[str] + | IO[bytes] + | list[str] + | list[Path] + | list[IO[str]] + | list[IO[bytes]], *, schema: SchemaDefinition | None = None, schema_overrides: SchemaDefinition | None = None, diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 583b8fddf3265..2eda346e7c264 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -422,7 +422,9 @@ def scan_parquet( if isinstance(source, (str, Path)): source = normalize_filepath(source, check_not_directory=False) - elif isinstance(source, io.BytesIO) or (isinstance(source, list) and isinstance(source[0], io.BytesIO)): + elif isinstance(source, io.BytesIO) or ( + isinstance(source, list) and isinstance(source[0], io.BytesIO) + ): pass else: source = [ diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 65b4460104fd6..be51ef1a99e61 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -12,7 +12,7 @@ import pyarrow.dataset as ds import pyarrow.parquet as pq import pytest -from hypothesis import HealthCheck, given, settings +from hypothesis import given from hypothesis import strategies as st import polars as pl @@ -1559,9 +1559,7 @@ def test_predicate_filtering( offset=st.integers(0, 100), length=st.integers(0, 100), ) -def test_slice_roundtrip( - df: pl.DataFrame, offset: int, length: int -) -> None: +def test_slice_roundtrip(df: pl.DataFrame, offset: int, length: int) -> None: offset %= df.height + 1 length %= df.height - offset + 1 diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index cb33344a1fce9..a254daaeaa12e 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -1,5 +1,6 @@ from __future__ import annotations +import io from dataclasses import dataclass from functools import partial from math import ceil @@ -9,7 +10,6 @@ import pytest import polars as pl -import io from polars.testing.asserts.frame import assert_frame_equal if TYPE_CHECKING: @@ -699,39 +699,41 @@ def test_async_path_expansion_bracket_17629(tmp_path: Path) -> None: ) def test_scan_in_memory(method: str) -> None: f = io.BytesIO() - df = pl.DataFrame({ - 'a': [1, 2, 3], - 'b': ['x', 'y', 'z'], - }) + df = pl.DataFrame( + { + "a": [1, 2, 3], + "b": ["x", "y", "z"], + } + ) - (getattr(df, f'write_{method}'))(f) + (getattr(df, f"write_{method}"))(f) f.seek(0) - result = (getattr(pl, f'scan_{method}'))(f).collect() + result = (getattr(pl, f"scan_{method}"))(f).collect() assert_frame_equal(df, result) f.seek(0) - result = (getattr(pl, f'scan_{method}'))(f).slice(1, 2).collect() + result = (getattr(pl, f"scan_{method}"))(f).slice(1, 2).collect() assert_frame_equal(df.slice(1, 2), result) f.seek(0) - result = (getattr(pl, f'scan_{method}'))(f).slice(-1, 1).collect() + result = (getattr(pl, f"scan_{method}"))(f).slice(-1, 1).collect() assert_frame_equal(df.slice(-1, 1), result) g = io.BytesIO() - (getattr(df, f'write_{method}'))(g) + (getattr(df, f"write_{method}"))(g) f.seek(0) g.seek(0) - result = (getattr(pl, f'scan_{method}'))([f, g]).collect() + result = (getattr(pl, f"scan_{method}"))([f, g]).collect() assert_frame_equal(df.vstack(df), result) f.seek(0) g.seek(0) - result = (getattr(pl, f'scan_{method}'))([f, g]).slice(1, 2).collect() + result = (getattr(pl, f"scan_{method}"))([f, g]).slice(1, 2).collect() assert_frame_equal(df.vstack(df).slice(1, 2), result) f.seek(0) g.seek(0) - result = (getattr(pl, f'scan_{method}'))([f, g]).slice(-1, 1).collect() + result = (getattr(pl, f"scan_{method}"))([f, g]).slice(-1, 1).collect() assert_frame_equal(df.vstack(df).slice(-1, 1), result)