Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into mvhsh2common
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 29, 2023
2 parents 8d5aa69 + de15917 commit 502e5f2
Show file tree
Hide file tree
Showing 19 changed files with 867 additions and 223 deletions.
77 changes: 20 additions & 57 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,24 @@ jobs:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Cache Cargo
uses: actions/cache@v3
with:
# these represent dependencies downloaded by cargo
# and thus do not depend on the OS, arch nor rust version.
path: /github/home/.cargo
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable

- name: Cache Cargo
uses: actions/cache@v3
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
./target/
./datafusion-cli/target/
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-benchmark-${{ hashFiles('datafusion/**/Cargo.toml', 'benchmarks/Cargo.toml', 'datafusion-cli/Cargo.toml') }}

- name: Check workspace without default features
run: cargo check --no-default-features -p datafusion

Expand All @@ -84,12 +90,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -109,12 +109,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down Expand Up @@ -211,12 +205,6 @@ jobs:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -241,9 +229,14 @@ jobs:
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
./target/
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
key: cargo-cache-benchmark-${{ hashFiles('datafusion/**/Cargo.toml', 'benchmarks/Cargo.toml') }}
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down Expand Up @@ -377,12 +370,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- uses: actions/setup-python@v4
with:
python-version: "3.8"
Expand Down Expand Up @@ -480,12 +467,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -506,12 +487,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -531,12 +506,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -563,12 +532,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down
60 changes: 35 additions & 25 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,8 @@
use std::str::FromStr;
use std::{any::Any, sync::Arc};

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion_common::FileTypeWriterOptions;
use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema};
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
use futures::{future, stream, StreamExt, TryStreamExt};
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
use super::PartitionedFile;

use crate::datasource::file_format::file_compression_type::{
FileCompressionType, FileTypeExt,
Expand All @@ -54,13 +46,21 @@ use crate::{
logical_expr::Expr,
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};
use datafusion_common::FileType;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use datafusion_common::{
internal_err, plan_err, project_schema, FileType, FileTypeWriterOptions, SchemaExt,
ToDFSchema,
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};

use super::PartitionedFile;

use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};

/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -996,6 +996,9 @@ impl ListingTable {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs::File;

use super::*;
use crate::datasource::{provider_as_source, MemTable};
use crate::execution::options::ArrowReadOptions;
Expand All @@ -1010,14 +1013,13 @@ mod tests {
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
};

use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::assert_contains;
use datafusion_common::GetExt;
use datafusion_expr::LogicalPlanBuilder;
use datafusion_common::{assert_contains, GetExt, ScalarValue};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};

use rstest::*;
use std::collections::HashMap;
use std::fs::File;
use tempfile::TempDir;

/// It creates dummy file and checks if it can create unbounded input executors.
Expand Down Expand Up @@ -2048,6 +2050,7 @@ mod tests {
}
None => SessionContext::new(),
};
let target_partition_number = session_ctx.state().config().target_partitions();

// Create a new schema with one field called "a" of type Int32
let schema = Arc::new(Schema::new(vec![Field::new(
Expand All @@ -2056,6 +2059,12 @@ mod tests {
false,
)]));

let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
Box::new(Expr::Column("column1".into())),
Operator::GtEq,
Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))),
));

// Create a new batch of data to insert into the table
let batch = RecordBatch::try_new(
schema.clone(),
Expand Down Expand Up @@ -2136,8 +2145,10 @@ mod tests {
let source = provider_as_source(source_table);
// Create a table scan logical plan to read from the source table
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
.repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))?
.filter(filter_predicate)?
.build()?;
// Since logical plan contains a filter, increasing parallelism is helpful.
// Therefore, we will have 8 partitions in the final plan.
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
Expand All @@ -2146,7 +2157,6 @@ mod tests {
.state()
.create_physical_plan(&insert_into_table)
.await?;

// Execute the physical plan and collect the results
let res = collect(plan, session_ctx.task_ctx()).await?;
// Insert returns the number of rows written, in our case this would be 6.
Expand Down Expand Up @@ -2178,9 +2188,9 @@ mod tests {
// Assert that the batches read from the file match the expected result.
assert_batches_eq!(expected, &batches);

// Assert that 6 files were added to the table
// Assert that `target_partition_number` many files were added to the table.
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 6);
assert_eq!(num_files, target_partition_number);

// Create a physical plan from the insert plan
let plan = session_ctx
Expand Down Expand Up @@ -2221,9 +2231,9 @@ mod tests {
// Assert that the batches read from the file after the second append match the expected result.
assert_batches_eq!(expected, &batches);

// Assert that another 6 files were added to the table
// Assert that another `target_partition_number` many files were added to the table.
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 12);
assert_eq!(num_files, 2 * target_partition_number);

// Return Ok if the function
Ok(())
Expand Down
Loading

0 comments on commit 502e5f2

Please sign in to comment.