Skip to content

Commit

Permalink
feat: Parallel collecting parquet files statistics #7573 (#7595)
Browse files Browse the repository at this point in the history
* feat: parallel collecting parquet files statistics #7573

* fix: cargo clippy format

* docs: add doc for execution.meta_fetch_concurrency

* feat: change the default value to 32 for execution.meta_fetch_concurrency
  • Loading branch information
hengfeiyang committed Sep 21, 2023
1 parent bcdda39 commit c7347ce
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 33 deletions.
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ config_namespace! {
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024

/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32
}
}

Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ use crate::physical_plan::{
Statistics,
};

/// The number of files to read in parallel when inferring schema
const SCHEMA_INFERENCE_CONCURRENCY: usize = 32;

/// The Apache Parquet `FileFormat` implementation
///
/// Note it is recommended these are instead configured on the [`ConfigOptions`]
Expand Down Expand Up @@ -176,7 +173,7 @@ impl FileFormat for ParquetFormat {
let schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| fetch_schema(store.as_ref(), object, self.metadata_size_hint))
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
.buffered(SCHEMA_INFERENCE_CONCURRENCY)
.buffered(state.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await?;

Expand Down
64 changes: 35 additions & 29 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,38 +937,44 @@ impl ListingTable {
let file_list = stream::iter(file_list).flatten();

// collect the statistics if required by the config
let files = file_list.then(|part_file| async {
let part_file = part_file?;
let mut statistics_result = Statistics::default();
if self.options.collect_stat {
let statistics_cache = self.collected_statistics.clone();
match statistics_cache.get_with_extra(
&part_file.object_meta.location,
&part_file.object_meta,
) {
Some(statistics) => statistics_result = statistics.as_ref().clone(),
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
&store,
self.file_schema.clone(),
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let mut statistics_result = Statistics::default();
if self.options.collect_stat {
let statistics_cache = self.collected_statistics.clone();
match statistics_cache.get_with_extra(
&part_file.object_meta.location,
&part_file.object_meta,
) {
Some(statistics) => {
statistics_result = statistics.as_ref().clone()
}
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
&store,
self.file_schema.clone(),
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
&part_file.object_meta,
);
statistics_result = statistics;
);
statistics_result = statistics;
}
}
}
}
Ok((part_file, statistics_result)) as Result<(PartitionedFile, Statistics)>
});
Ok((part_file, statistics_result))
as Result<(PartitionedFile, Statistics)>
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);

let (files, statistics) =
get_statistics_with_limit(files, self.schema(), limit).await?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.meta_fetch_concurrency 32
datafusion.execution.parquet.allow_single_file_parallelism false
datafusion.execution.parquet.bloom_filter_enabled false
datafusion.execution.parquet.bloom_filter_fpp NULL
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system |
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
Expand Down

0 comments on commit c7347ce

Please sign in to comment.