Skip to content

Commit

Permalink
no more failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Sep 5, 2024
1 parent 2aca59a commit 06161e2
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 165 deletions.
9 changes: 5 additions & 4 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ impl LazyFileListReader for LazyJsonLineReader {
row_index: self.row_index,
rechunk: self.rechunk,
file_counter: 0,
hive_options: {
let mut options = HiveOptions::default();
options.enabled = Some(false);
options
hive_options: HiveOptions {
enabled: Some(false),
hive_start_idx: 0,
schema: None,
try_parse_dates: true,
},
glob: true,
include_file_paths: self.include_file_paths,
Expand Down
99 changes: 74 additions & 25 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,40 +60,94 @@ impl ParquetExec {
let mut result = vec![];

let step = std::cmp::min(POOL.current_num_threads(), 128);
let slice_info = match self.file_options.slice {
None => ScanSourceSliceInfo {
item_slice: 0..usize::MAX,
source_slice: 0..self.sources.len(),
},
Some(slice) => {
self.sources
.collect_slice_information(slice, |source| match source {
ScanSourceRef::File(path) => {
ParquetReader::new(std::fs::File::open(path)?).num_rows()
},
ScanSourceRef::Buffer(buff) => {
ParquetReader::new(std::io::Cursor::new(buff)).num_rows()
},
})?
},
// Modified if we have a negative slice
let mut first_source = 0;

// (offset, end)
let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice {
if slice.0 >= 0 {
(slice.0 as usize, slice.1.saturating_add(slice.0 as usize))
} else {
// Walk the files in reverse until we find the first file, and then translate the
// slice into a positive-offset equivalent.
let slice_start_as_n_from_end = -slice.0 as usize;
let mut cum_rows = 0;
let chunk_size = 8;
POOL.install(|| {
for path_indexes in (0..self.sources.len())
.rev()
.collect::<Vec<_>>()
.chunks(chunk_size)
{
let row_counts = path_indexes
.into_par_iter()
.map(|&i| {
let memslice = match self.sources.at(i) {
ScanSourceRef::File(path) => {
let file = std::fs::File::open(path)?;
MemSlice::from_mmap(Arc::new(unsafe {
memmap::Mmap::map(&file).unwrap()
}))
},
ScanSourceRef::Buffer(buff) => {
MemSlice::from_bytes(buff.clone())
},
};

ParquetReader::new(std::io::Cursor::new(memslice)).num_rows()
})
.collect::<PolarsResult<Vec<_>>>()?;

for (path_idx, rc) in path_indexes.iter().zip(row_counts) {
cum_rows += rc;

if cum_rows >= slice_start_as_n_from_end {
first_source = *path_idx;
break;
}
}

if first_source > 0 {
break;
}
}

PolarsResult::Ok(())
})?;

let (start, len) = if slice_start_as_n_from_end > cum_rows {
// We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50
// rows should only give the first 25 rows.
let first_file_position = slice_start_as_n_from_end - cum_rows;
(0, slice.1.saturating_sub(first_file_position))
} else {
(cum_rows - slice_start_as_n_from_end, slice.1)
};

let end = start.saturating_add(len);

(start, end)
}
} else {
(0, usize::MAX)
};

let mut current_offset = 0;
let base_row_index = self.file_options.row_index.take();
// Limit no. of files at a time to prevent open file limits.

for i in slice_info.source_slice.step_by(step) {
for i in (first_source..self.sources.len()).step_by(step) {
let end = std::cmp::min(i.saturating_add(step), self.sources.len());
let hive_parts = self.hive_parts.as_ref().map(|x| &x[i..end]);

if current_offset >= slice_info.item_slice.end && !result.is_empty() {
if current_offset >= slice_end && !result.is_empty() {
return Ok(result);
}

// First initialize the readers, predicates and metadata.
// This will be used to determine the slices. That way we can actually read all the
// files in parallel even if we add row index columns or slices.
let iter = (0..self.sources.len()).into_par_iter().map(|i| {
let iter = (i..end).into_par_iter().map(|i| {
let source = self.sources.at(i);
let hive_partitions = hive_parts.map(|x| x[i].materialize_partition_columns());

Expand Down Expand Up @@ -141,12 +195,7 @@ impl ParquetExec {
let cum_rows = *current_offset_ref;
(
cum_rows,
split_slice_at_file(
current_offset_ref,
*num_rows,
slice_info.item_slice.start,
slice_info.item_slice.end,
),
split_slice_at_file(current_offset_ref, *num_rows, slice_offset, slice_end),
)
})
.collect::<Vec<_>>();
Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/src/parquet/encoding/uleb128.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Reads an uleb128 encoded integer with at most 56 bits (8 bytes with 7 bits worth of payload each).
/// Returns the integer and the number of bytes that made up this integer.
///
/// If the returned length is bigger than 8 this means the integer required more than 8 bytes and the remaining bytes need to be read sequentially and combined with the return value.
///
/// # Safety
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut ConversionContext) -> PolarsResult<No
let mut owned = None;

hive_partitions_from_paths(
&sources.as_paths(),
sources.as_paths(),
file_options.hive_options.hive_start_idx,
file_options.hive_options.schema.clone(),
match resolved_file_info.reader_schema.as_ref().unwrap() {
Expand Down Expand Up @@ -897,19 +897,19 @@ impl DslScanSources {
let expanded_sources = match &scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet { cloud_options, .. } => {
expand_scan_paths_with_hive_update(&paths, file_options, cloud_options)?
expand_scan_paths_with_hive_update(paths, file_options, cloud_options)?
},
#[cfg(feature = "ipc")]
FileScan::Ipc { cloud_options, .. } => {
expand_scan_paths_with_hive_update(&paths, file_options, cloud_options)?
expand_scan_paths_with_hive_update(paths, file_options, cloud_options)?
},
#[cfg(feature = "csv")]
FileScan::Csv { cloud_options, .. } => {
expand_paths(&paths, file_options.glob, cloud_options.as_ref())?
expand_paths(paths, file_options.glob, cloud_options.as_ref())?
},
#[cfg(feature = "json")]
FileScan::NDJson { cloud_options, .. } => {
expand_paths(&paths, file_options.glob, cloud_options.as_ref())?
expand_paths(paths, file_options.glob, cloud_options.as_ref())?
},
FileScan::Anonymous { .. } => unreachable!(), // Invariant: Anonymous scans are already expanded.
};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl<'a> IRDisplay<'a> {
write_scan(
f,
scan_type.into(),
&sources,
sources,
indent,
n_columns,
file_info.schema.len(),
Expand Down
111 changes: 3 additions & 108 deletions crates/polars-plan/src/plans/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub use format::{ExprIRDisplay, IRDisplay};
use hive::HivePartitions;
use polars_core::error::feature_gated;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_io::file_cache::FileCacheEntry;
use polars_utils::idx_vec::UnitVec;
use polars_utils::mmap::MemSlice;
Expand Down Expand Up @@ -103,7 +102,7 @@ impl ScanSources {
}
pub fn as_paths(&self) -> &[PathBuf] {
match self {
Self::Files(paths) => &paths,
Self::Files(paths) => paths,
Self::Buffers(_) => unimplemented!(),
}
}
Expand Down Expand Up @@ -138,7 +137,7 @@ impl ScanSources {

pub fn is_cloud_url(&self) -> bool {
match self {
Self::Files(paths) => paths.first().map_or(false, |p| polars_io::is_cloud_url(p)),
Self::Files(paths) => paths.first().map_or(false, polars_io::is_cloud_url),
Self::Buffers(_) => false,
}
}
Expand Down Expand Up @@ -171,114 +170,10 @@ impl ScanSources {
}
}

/// Normalize the slice and collect information as to what rows and parts of the source are
/// used in this slice.
pub fn collect_slice_information(
&self,
slice: (i64, usize),
map_to_num_rows: impl Fn(ScanSourceRef) -> PolarsResult<usize> + Send + Sync,
) -> PolarsResult<ScanSourceSliceInfo> {
fn slice_to_start_end(
offset: i64,
length: usize,
num_rows: usize,
) -> std::ops::Range<usize> {
if offset < 0 {
let slice_start_as_n_from_end = -offset as usize;
let (start, len) = if slice_start_as_n_from_end > num_rows {
// We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50
// rows should only give the first 25 rows.
let start_position = slice_start_as_n_from_end - num_rows;
(0, length.saturating_sub(start_position))
} else {
(num_rows - slice_start_as_n_from_end, length)
};

let end = start.saturating_add(len);

start..end
} else {
let offset = offset as usize;
offset.min(num_rows)..(offset + length).min(num_rows)
}
}

let (offset, length) = slice;

if self.is_empty() {
return Ok(ScanSourceSliceInfo {
item_slice: 0..0,
source_slice: 0..0,
});
}

if self.len() == 1 {
let num_rows = map_to_num_rows(self.get(0).unwrap())?;
let item_slice = slice_to_start_end(offset, length, num_rows);
let source_slice = if item_slice.is_empty() { 0..0 } else { 0..1 };

Ok(ScanSourceSliceInfo {
item_slice,
source_slice,
})
} else {
use rayon::prelude::*;

// Walk the files in reverse until we find the first file, and then translate the
// slice into a positive-offset equivalent.
const CHUNK_SIZE: usize = 8;
let mut row_counts = Vec::with_capacity(self.len());

POOL.install(|| {
for idx_end in (0..self.len()).step_by(CHUNK_SIZE) {
let idx_start = idx_end.saturating_sub(CHUNK_SIZE);

row_counts.extend(
(idx_start..=idx_end)
.into_par_iter()
.map(|i| map_to_num_rows(self.at(i)))
.collect::<PolarsResult<Vec<_>>>()?
.into_iter()
.rev(),
);
}

PolarsResult::Ok(())
})?;

let num_rows = row_counts.iter().sum::<usize>();

let item_slice = slice_to_start_end(offset, length, num_rows);

let mut source_start = self.len() - 1;
let mut source_end = 0;

let mut sum = 0;
for (i, row_count) in row_counts.iter().rev().enumerate() {
if sum < item_slice.end {
source_end = usize::max(source_end, i);
}

sum += row_count;

if sum >= item_slice.start {
source_start = usize::min(source_start, i);
}
}

let source_slice = source_start..source_end + 1;

Ok(ScanSourceSliceInfo {
item_slice,
source_slice,
})
}
}

pub fn get(&self, idx: usize) -> Option<ScanSourceRef> {
match self {
ScanSources::Files(paths) => paths.get(idx).map(|p| ScanSourceRef::File(p)),
ScanSources::Buffers(buffers) => buffers.get(idx).map(|b| ScanSourceRef::Buffer(b)),
ScanSources::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),
}
}

Expand Down
8 changes: 5 additions & 3 deletions py-polars/polars/io/csv/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,9 +1232,11 @@ def with_column_names(cols: list[str]) -> list[str]:

if isinstance(source, (str, Path)):
source = normalize_filepath(source, check_not_directory=False)
elif isinstance(source, (IO, BytesIO)):
pass
elif isinstance(source, list) and isinstance(source[0], BytesIO):
elif (
isinstance(source, (IO, BytesIO))
or isinstance(source, list)
and isinstance(source[0], BytesIO)
):
pass
else:
source = [
Expand Down
9 changes: 8 additions & 1 deletion py-polars/polars/io/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,14 @@ def read_ndjson(
@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4")
@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4")
def scan_ndjson(
source: str | Path | IO[str] | IO[bytes] | list[str] | list[Path] | list[IO[str]] | list[IO[bytes]],
source: str
| Path
| IO[str]
| IO[bytes]
| list[str]
| list[Path]
| list[IO[str]]
| list[IO[bytes]],
*,
schema: SchemaDefinition | None = None,
schema_overrides: SchemaDefinition | None = None,
Expand Down
4 changes: 3 additions & 1 deletion py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ def scan_parquet(

if isinstance(source, (str, Path)):
source = normalize_filepath(source, check_not_directory=False)
elif isinstance(source, io.BytesIO) or (isinstance(source, list) and isinstance(source[0], io.BytesIO)):
elif isinstance(source, io.BytesIO) or (
isinstance(source, list) and isinstance(source[0], io.BytesIO)
):
pass
else:
source = [
Expand Down
6 changes: 2 additions & 4 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pytest
from hypothesis import HealthCheck, given, settings
from hypothesis import given
from hypothesis import strategies as st

import polars as pl
Expand Down Expand Up @@ -1559,9 +1559,7 @@ def test_predicate_filtering(
offset=st.integers(0, 100),
length=st.integers(0, 100),
)
def test_slice_roundtrip(
df: pl.DataFrame, offset: int, length: int
) -> None:
def test_slice_roundtrip(df: pl.DataFrame, offset: int, length: int) -> None:
offset %= df.height + 1
length %= df.height - offset + 1

Expand Down
Loading

0 comments on commit 06161e2

Please sign in to comment.