Skip to content

Commit

Permalink
Merge branch 'up_fusion' into substr_index
Browse files Browse the repository at this point in the history
  • Loading branch information
Syleechan committed Nov 21, 2023
2 parents b296f19 + f310db3 commit f3abde6
Show file tree
Hide file tree
Showing 15 changed files with 1,625 additions and 493 deletions.
10 changes: 1 addition & 9 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,15 +388,7 @@ mod tests {
// Ensure that local files are also registered
let sql =
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();

if let DataFusionError::IoError(e) = err {
assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
} else {
return Err(err);
}
create_external_table_test(location, &sql).await.unwrap();

Ok(())
}
Expand Down
110 changes: 90 additions & 20 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ pub struct ListingTableUrl {
impl ListingTableUrl {
/// Parse a provided string as a `ListingTableUrl`
///
/// A URL can either refer to a single object, or a collection of objects with a
/// common prefix, with the presence of a trailing `/` indicating a collection.
///
/// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
/// `file:///foo/` refers to all the files under the directory `/foo` and its
/// subdirectories.
///
/// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
/// wherease `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
/// S3 bucket `BUCKET`
///
/// # URL Encoding
///
/// URL paths are expected to be URL-encoded. That is, the URL for a file named `bar%2Efoo`
Expand All @@ -58,29 +69,29 @@ impl ListingTableUrl {
/// # Paths without a Scheme
///
/// If no scheme is provided, or the string is an absolute filesystem path
/// as determined [`std::path::Path::is_absolute`], the string will be
/// as determined by [`std::path::Path::is_absolute`], the string will be
/// interpreted as a path on the local filesystem using the operating
/// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
///
/// If the path contains any of `'?', '*', '['`, it will be considered
/// a glob expression and resolved as described in the section below.
///
/// Otherwise, the path will be resolved to an absolute path, returning
/// an error if it does not exist, and converted to a [file URI]
/// Otherwise, the path will be resolved to an absolute path based on the current
/// working directory, and converted to a [file URI].
///
/// If you wish to specify a path that does not exist on the local
/// machine you must provide it as a fully-qualified [file URI]
/// e.g. `file:///myfile.txt`
/// If the path already exists in the local filesystem this will be used to determine if this
/// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
/// of a trailing path delimiter will be used to indicate a directory. For the avoidance
/// of ambiguity it is recommended users always include trailing `/` when intending to
/// refer to a directory.
///
/// ## Glob File Paths
///
/// If no scheme is provided, and the path contains a glob expression, it will
/// be resolved as follows.
///
/// The string up to the first path segment containing a glob expression will be extracted,
/// and resolved in the same manner as a normal scheme-less path. That is, resolved to
/// an absolute path on the local filesystem, returning an error if it does not exist,
/// and converted to a [file URI]
/// and resolved in the same manner as a normal scheme-less path above.
///
/// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
/// filter when listing files from object storage
Expand Down Expand Up @@ -130,7 +141,7 @@ impl ListingTableUrl {

/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
let (path, glob) = match split_glob_expression(s) {
Some((prefix, glob)) => {
let glob = Pattern::new(glob)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Expand All @@ -139,15 +150,12 @@ impl ListingTableUrl {
None => (s, None),
};

let path = std::path::Path::new(prefix).canonicalize()?;
let url = if path.is_dir() {
Url::from_directory_path(path)
} else {
Url::from_file_path(path)
}
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {s}")))?;
// TODO: Currently we do not have an IO-related error variant that accepts ()
// or a string. Once we have such a variant, change the error type above.
let url = url_from_filesystem_path(path).ok_or_else(|| {
DataFusionError::External(
format!("Failed to convert path to URL: {path}").into(),
)
})?;

Self::try_new(url, glob)
}

Expand All @@ -162,7 +170,10 @@ impl ListingTableUrl {
self.url.scheme()
}

/// Return the prefix from which to list files
/// Return the URL path not excluding any glob expression
///
/// If [`Self::is_collection`], this is the listing prefix
/// Otherwise, this is the path to the object
pub fn prefix(&self) -> &Path {
&self.prefix
}
Expand Down Expand Up @@ -249,6 +260,34 @@ impl ListingTableUrl {
}
}

/// Creates a file URL from a potentially relative filesystem path
fn url_from_filesystem_path(s: &str) -> Option<Url> {
let path = std::path::Path::new(s);
let is_dir = match path.exists() {
true => path.is_dir(),
// Fallback to inferring from trailing separator
false => std::path::is_separator(s.chars().last()?),
};

let from_absolute_path = |p| {
let first = match is_dir {
true => Url::from_directory_path(p).ok(),
false => Url::from_file_path(p).ok(),
}?;

// By default from_*_path preserve relative path segments
// We therefore parse the URL again to resolve these
Url::parse(first.as_str()).ok()
};

if path.is_absolute() {
return from_absolute_path(path);
}

let absolute = std::env::current_dir().ok()?.join(path);
from_absolute_path(&absolute)
}

impl AsRef<str> for ListingTableUrl {
fn as_ref(&self) -> &str {
self.url.as_ref()
Expand Down Expand Up @@ -349,6 +388,37 @@ mod tests {

let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);

let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");

let url =
ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
assert_eq!(url.prefix.as_ref(), "baz/test.txt");

let workdir = std::env::current_dir().unwrap();
let t = workdir.join("non-existent");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("non-existent").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("non-existent"));

let t = workdir.parent().unwrap();
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("..").unwrap();
assert_eq!(a, b);

let t = t.join("bar");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("../bar").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("bar"));

let t = t.join(".").join("foo").join("..").join("baz");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("bar/baz"));
}

#[test]
Expand Down
149 changes: 35 additions & 114 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1748,70 +1748,27 @@ pub fn array_ndims(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(result) as ArrayRef)
}

macro_rules! non_list_contains {
($ARRAY:expr, $SUB_ARRAY:expr, $ARRAY_TYPE:ident) => {{
let sub_array = downcast_arg!($SUB_ARRAY, $ARRAY_TYPE);
let mut boolean_builder = BooleanArray::builder($ARRAY.len());

for (arr, elem) in $ARRAY.iter().zip(sub_array.iter()) {
if let (Some(arr), Some(elem)) = (arr, elem) {
let arr = downcast_arg!(arr, $ARRAY_TYPE);
let res = arr.iter().dedup().flatten().any(|x| x == elem);
boolean_builder.append_value(res);
}
}
Ok(Arc::new(boolean_builder.finish()))
}};
}

/// Array_has SQL function
pub fn array_has(args: &[ArrayRef]) -> Result<ArrayRef> {
let array = as_list_array(&args[0])?;
let element = &args[1];

check_datatypes("array_has", &[array.values(), element])?;
match element.data_type() {
DataType::List(_) => {
let sub_array = as_list_array(element)?;
let mut boolean_builder = BooleanArray::builder(array.len());

for (arr, elem) in array.iter().zip(sub_array.iter()) {
if let (Some(arr), Some(elem)) = (arr, elem) {
let list_arr = as_list_array(&arr)?;
let res = list_arr.iter().dedup().flatten().any(|x| *x == *elem);
boolean_builder.append_value(res);
}
}
Ok(Arc::new(boolean_builder.finish()))
}
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
non_list_contains!(array, element, $ARRAY_TYPE)
};
}
call_array_function!(data_type, false)
}
}
}

macro_rules! array_has_any_non_list_check {
($ARRAY:expr, $SUB_ARRAY:expr, $ARRAY_TYPE:ident) => {{
let arr = downcast_arg!($ARRAY, $ARRAY_TYPE);
let sub_arr = downcast_arg!($SUB_ARRAY, $ARRAY_TYPE);
let mut boolean_builder = BooleanArray::builder(array.len());

let mut res = false;
for elem in sub_arr.iter().dedup() {
if let Some(elem) = elem {
res |= arr.iter().dedup().flatten().any(|x| x == elem);
} else {
return internal_err!(
"array_has_any does not support Null type for element in sub_array"
);
}
let converter = RowConverter::new(vec![SortField::new(array.value_type())])?;
let r_values = converter.convert_columns(&[element.clone()])?;
for (row_idx, arr) in array.iter().enumerate() {
if let Some(arr) = arr {
let arr_values = converter.convert_columns(&[arr])?;
let res = arr_values
.iter()
.dedup()
.any(|x| x == r_values.row(row_idx));
boolean_builder.append_value(res);
}
res
}};
}
Ok(Arc::new(boolean_builder.finish()))
}

/// Array_has_any SQL function
Expand All @@ -1820,55 +1777,27 @@ pub fn array_has_any(args: &[ArrayRef]) -> Result<ArrayRef> {

let array = as_list_array(&args[0])?;
let sub_array = as_list_array(&args[1])?;

let mut boolean_builder = BooleanArray::builder(array.len());

let converter = RowConverter::new(vec![SortField::new(array.value_type())])?;
for (arr, sub_arr) in array.iter().zip(sub_array.iter()) {
if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) {
let res = match arr.data_type() {
DataType::List(_) => {
let arr = downcast_arg!(arr, ListArray);
let sub_arr = downcast_arg!(sub_arr, ListArray);

let mut res = false;
for elem in sub_arr.iter().dedup().flatten() {
res |= arr.iter().dedup().flatten().any(|x| *x == *elem);
}
res
let arr_values = converter.convert_columns(&[arr])?;
let sub_arr_values = converter.convert_columns(&[sub_arr])?;

let mut res = false;
for elem in sub_arr_values.iter().dedup() {
res |= arr_values.iter().dedup().any(|x| x == elem);
if res {
break;
}
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
array_has_any_non_list_check!(arr, sub_arr, $ARRAY_TYPE)
};
}
call_array_function!(data_type, false)
}
};
}
boolean_builder.append_value(res);
}
}
Ok(Arc::new(boolean_builder.finish()))
}

macro_rules! array_has_all_non_list_check {
($ARRAY:expr, $SUB_ARRAY:expr, $ARRAY_TYPE:ident) => {{
let arr = downcast_arg!($ARRAY, $ARRAY_TYPE);
let sub_arr = downcast_arg!($SUB_ARRAY, $ARRAY_TYPE);

let mut res = true;
for elem in sub_arr.iter().dedup() {
if let Some(elem) = elem {
res &= arr.iter().dedup().flatten().any(|x| x == elem);
} else {
return internal_err!(
"array_has_all does not support Null type for element in sub_array"
);
}
}
res
}};
}

/// Array_has_all SQL function
pub fn array_has_all(args: &[ArrayRef]) -> Result<ArrayRef> {
check_datatypes("array_has_all", &[&args[0], &args[1]])?;
Expand All @@ -1877,28 +1806,20 @@ pub fn array_has_all(args: &[ArrayRef]) -> Result<ArrayRef> {
let sub_array = as_list_array(&args[1])?;

let mut boolean_builder = BooleanArray::builder(array.len());

let converter = RowConverter::new(vec![SortField::new(array.value_type())])?;
for (arr, sub_arr) in array.iter().zip(sub_array.iter()) {
if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) {
let res = match arr.data_type() {
DataType::List(_) => {
let arr = downcast_arg!(arr, ListArray);
let sub_arr = downcast_arg!(sub_arr, ListArray);

let mut res = true;
for elem in sub_arr.iter().dedup().flatten() {
res &= arr.iter().dedup().flatten().any(|x| *x == *elem);
}
res
}
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
array_has_all_non_list_check!(arr, sub_arr, $ARRAY_TYPE)
};
}
call_array_function!(data_type, false)
let arr_values = converter.convert_columns(&[arr])?;
let sub_arr_values = converter.convert_columns(&[sub_arr])?;

let mut res = true;
for elem in sub_arr_values.iter().dedup() {
res &= arr_values.iter().dedup().any(|x| x == elem);
if !res {
break;
}
};
}
boolean_builder.append_value(res);
}
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{any::Any, usize, vec};
use crate::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices,
calculate_join_output_ordering, get_final_indices_from_bit_map,
need_produce_result_in_final,
need_produce_result_in_final, JoinHashMap, JoinHashMapType,
};
use crate::DisplayAs;
use crate::{
Expand All @@ -35,7 +35,6 @@ use crate::{
expressions::Column,
expressions::PhysicalSortExpr,
hash_utils::create_hashes,
joins::hash_join_utils::{JoinHashMap, JoinHashMapType},
joins::utils::{
adjust_right_output_partitioning, build_join_schema, check_join_is_valid,
estimate_join_statistics, partitioned_join_output_partitioning,
Expand Down
Loading

0 comments on commit f3abde6

Please sign in to comment.