Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into avro-struct-union
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 29, 2023
2 parents 8ecb471 + de15917 commit 48a6b04
Show file tree
Hide file tree
Showing 52 changed files with 4,557 additions and 3,679 deletions.
77 changes: 20 additions & 57 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,24 @@ jobs:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Cache Cargo
uses: actions/cache@v3
with:
# these represent dependencies downloaded by cargo
# and thus do not depend on the OS, arch nor rust version.
path: /github/home/.cargo
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable

- name: Cache Cargo
uses: actions/cache@v3
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
./target/
./datafusion-cli/target/
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-benchmark-${{ hashFiles('datafusion/**/Cargo.toml', 'benchmarks/Cargo.toml', 'datafusion-cli/Cargo.toml') }}

- name: Check workspace without default features
run: cargo check --no-default-features -p datafusion

Expand All @@ -84,12 +90,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -109,12 +109,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down Expand Up @@ -211,12 +205,6 @@ jobs:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -241,9 +229,14 @@ jobs:
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
./target/
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
key: cargo-cache-benchmark-${{ hashFiles('datafusion/**/Cargo.toml', 'benchmarks/Cargo.toml') }}
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down Expand Up @@ -377,12 +370,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- uses: actions/setup-python@v4
with:
python-version: "3.8"
Expand Down Expand Up @@ -480,12 +467,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -506,12 +487,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -531,12 +506,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -563,12 +532,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ default = ["parquet"]
pyarrow = ["pyo3", "arrow/pyarrow"]

[dependencies]
apache-avro = { version = "0.15", default-features = false, features = ["snappy"], optional = true }
apache-avro = { version = "0.16", default-features = false, features = ["snappy"], optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
chrono = { workspace = true }
Expand Down
104 changes: 104 additions & 0 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use std::borrow::{Borrow, Cow};
use std::cmp::Ordering;
use std::collections::HashSet;
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -429,6 +430,64 @@ pub mod datafusion_strsim {
}
}

/// Merges collections `first` and `second`, removes duplicates and sorts the
/// result, returning it as a [`Vec`].
pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
first: impl IntoIterator<Item = T>,
second: impl IntoIterator<Item = S>,
) -> Vec<usize> {
let mut result: Vec<_> = first
.into_iter()
.map(|e| *e.borrow())
.chain(second.into_iter().map(|e| *e.borrow()))
.collect::<HashSet<_>>()
.into_iter()
.collect();
result.sort();
result
}

/// Calculates the set difference between sequences `first` and `second`,
/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
first: impl IntoIterator<Item = T>,
second: impl IntoIterator<Item = S>,
) -> Vec<usize> {
let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
first
.into_iter()
.map(|e| *e.borrow())
.filter(|e| !set.contains(e))
.collect()
}

/// Checks whether the given index sequence is monotonically non-decreasing.
pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) -> bool {
// TODO: Remove this function when `is_sorted` graduates from Rust nightly.
let mut previous = 0;
for item in sequence.into_iter() {
let current = *item.borrow();
if current < previous {
return false;
}
previous = current;
}
true
}

/// Find indices of each element in `targets` inside `items`. If one of the
/// elements is absent in `items`, returns an error.
pub fn find_indices<T: PartialEq, S: Borrow<T>>(
items: &[T],
targets: impl IntoIterator<Item = S>,
) -> Result<Vec<usize>> {
targets
.into_iter()
.map(|target| items.iter().position(|e| target.borrow().eq(e)))
.collect::<Option<_>>()
.ok_or_else(|| DataFusionError::Execution("Target not found".to_string()))
}

#[cfg(test)]
mod tests {
use crate::ScalarValue;
Expand Down Expand Up @@ -747,4 +806,49 @@ mod tests {
"cloned `Arc` should point to same data as the original"
);
}

#[test]
fn test_merge_and_order_indices() {
assert_eq!(
merge_and_order_indices([0, 3, 4], [1, 3, 5]),
vec![0, 1, 3, 4, 5]
);
// Result should be ordered, even if inputs are not
assert_eq!(
merge_and_order_indices([3, 0, 4], [5, 1, 3]),
vec![0, 1, 3, 4, 5]
);
}

#[test]
fn test_set_difference() {
assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
// return value should have same ordering with the in1
assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
}

#[test]
fn test_is_sorted() {
assert!(is_sorted::<usize>([]));
assert!(is_sorted([0]));
assert!(is_sorted([0, 3, 4]));
assert!(is_sorted([0, 1, 2]));
assert!(is_sorted([0, 1, 4]));
assert!(is_sorted([0usize; 0]));
assert!(is_sorted([1, 2]));
assert!(!is_sorted([3, 2]));
}

#[test]
fn test_find_indices() -> Result<()> {
assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
Ok(())
}
}
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ unicode_expressions = ["datafusion-physical-expr/unicode_expressions", "datafusi

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.15", optional = true }
apache-avro = { version = "0.16", optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,15 +1287,16 @@ impl TableProvider for DataFrameTableProvider {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut expr = LogicalPlanBuilder::from(self.plan.clone());
if let Some(p) = projection {
expr = expr.select(p.iter().copied())?
}

// Add filter when given
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(filter) = filter {
expr = expr.filter(filter)?
}

if let Some(p) = projection {
expr = expr.select(p.iter().copied())?
}

// add a limit if given
if let Some(l) = limit {
expr = expr.limit(0, Some(l))?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use apache_avro::{
types::Value,
AvroResult, Error as AvroError, Reader as AvroReader,
};
use arrow::array::{BinaryArray, GenericListArray};
use arrow::array::{BinaryArray, FixedSizeBinaryArray, GenericListArray};
use arrow::datatypes::{Fields, SchemaRef};
use arrow::error::ArrowError::SchemaError;
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -734,6 +734,15 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.collect::<BinaryArray>(),
)
as ArrayRef,
DataType::FixedSizeBinary(ref size) => {
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
rows.iter().map(|row| {
let maybe_value = self.field_lookup(field.name(), row);
maybe_value.and_then(|v| resolve_fixed(v, *size as usize))
}),
*size,
)?) as ArrayRef
}
DataType::List(ref list_field) => {
match list_field.data_type() {
DataType::Dictionary(ref key_ty, _) => {
Expand Down Expand Up @@ -899,6 +908,7 @@ fn resolve_string(v: &Value) -> ArrowResult<Option<String>> {
Value::Bytes(bytes) => String::from_utf8(bytes.to_vec())
.map_err(AvroError::ConvertToUtf8)
.map(Some),
Value::Enum(_, s) => Ok(Some(s.clone())),
Value::Null => Ok(None),
other => Err(AvroError::GetString(other.into())),
}
Expand Down Expand Up @@ -941,6 +951,20 @@ fn resolve_bytes(v: &Value) -> Option<Vec<u8>> {
})
}

fn resolve_fixed(v: &Value, size: usize) -> Option<Vec<u8>> {
let v = if let Value::Union(_, b) = v { b } else { v };
match v {
Value::Fixed(n, bytes) => {
if *n == size {
Some(bytes.clone())
} else {
None
}
}
_ => None,
}
}

fn resolve_boolean(value: &Value) -> Option<bool> {
let v = if let Value::Union(_, b) = value {
b
Expand Down
Loading

0 comments on commit 48a6b04

Please sign in to comment.