Skip to content

Commit

Permalink
Merge commit '4ad4f90d86c57226a4e0fb1f79dfaaf0d404c273' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-3
  • Loading branch information
appletreeisyellow committed Apr 26, 2024
2 parents 94cd921 + 4ad4f90 commit 37c446a
Show file tree
Hide file tree
Showing 49 changed files with 3,555 additions and 2,725 deletions.
21 changes: 21 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ main() {
tpch_mem10)
run_tpch_mem "10"
;;
tpch_smj)
run_tpch_smj "1"
;;
tpch_smj10)
run_tpch_smj "10"
;;
parquet)
run_parquet
;;
Expand Down Expand Up @@ -320,6 +326,21 @@ run_tpch() {
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch benchmark with sort merge join
run_tpch_smj() {
SCALE_FACTOR=$1
if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi
TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

RESULTS_FILE="${RESULTS_DIR}/tpch_smj_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch SMJ benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join false --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch in memory
run_tpch_mem() {
SCALE_FACTOR=$1
Expand Down
15 changes: 13 additions & 2 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use log::info;
use structopt::StructOpt;

// hack to avoid `default_value is meaningless for bool` errors
type BoolDefaultTrue = bool;

/// Run the tpch benchmark.
///
/// This benchmarks is derived from the [TPC-H][1] version
Expand Down Expand Up @@ -81,6 +84,11 @@ pub struct RunOpt {
/// Whether to disable collection of statistics (and cost based optimizations) or not.
#[structopt(short = "S", long = "disable-statistics")]
disable_statistics: bool,

/// If true then hash join used, if false then sort merge join
/// True by default.
#[structopt(short = "j", long = "prefer_hash_join", default_value = "true")]
prefer_hash_join: BoolDefaultTrue,
}

const TPCH_QUERY_START_ID: usize = 1;
Expand All @@ -107,10 +115,11 @@ impl RunOpt {
}

async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self
let mut config = self
.common
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -304,7 +313,7 @@ mod tests {
use super::*;

use datafusion::common::exec_err;
use datafusion::error::{DataFusionError, Result};
use datafusion::error::Result;
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes,
physical_plan_to_bytes,
Expand Down Expand Up @@ -339,6 +348,7 @@ mod tests {
mem_table: false,
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
Expand Down Expand Up @@ -371,6 +381,7 @@ mod tests {
mem_table: false,
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
Expand Down
8 changes: 4 additions & 4 deletions datafusion-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@

[DataFusion](https://arrow.apache.org/datafusion/) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

The DataFusion CLI is a command line utility that runs SQL queries using the DataFusion engine.
DataFusion CLI (`datafusion-cli`) is a small command line utility that runs SQL queries using the DataFusion engine.

# Frequently Asked Questions

## Where can I find more information?

Answer: See the [`datafusion-cli` documentation](https://arrow.apache.org/datafusion/user-guide/cli.html) for further information.
See the [`datafusion-cli` documentation](https://arrow.apache.org/datafusion/user-guide/cli.html) for further information.

## How do I make my IDE work with `datafusion-cli`?

Answer: "open" the `datafusion/datafusion-cli` project as its own top level
"open" the `datafusion/datafusion-cli` project as its own top level
project in my IDE (rather than opening `datafusion`)

The reason `datafusion-cli` is not listed as part of the workspace in the main
The reason `datafusion-cli` is not part of the main workspace in
[`datafusion Cargo.toml`] file is that `datafusion-cli` is a binary and has a
checked in `Cargo.lock` file to ensure reproducible builds.

Expand Down
121 changes: 113 additions & 8 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use datafusion::common::config::{
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit,
};
use datafusion::common::{exec_datafusion_err, exec_err, internal_err};
use datafusion::common::{config_err, exec_datafusion_err, exec_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
Expand All @@ -39,17 +39,26 @@ pub async fn get_s3_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
let AwsOptions {
access_key_id,
secret_access_key,
session_token,
region,
endpoint,
allow_http,
} = aws_options;

let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);

if let (Some(access_key_id), Some(secret_access_key)) =
(&aws_options.access_key_id, &aws_options.secret_access_key)
(access_key_id, secret_access_key)
{
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);

if let Some(session_token) = &aws_options.session_token {
if let Some(session_token) = session_token {
builder = builder.with_token(session_token);
}
} else {
Expand All @@ -72,10 +81,30 @@ pub async fn get_s3_object_store_builder(
builder = builder.with_credentials(credentials);
}

if let Some(region) = &aws_options.region {
if let Some(region) = region {
builder = builder.with_region(region);
}

if let Some(endpoint) = endpoint {
// Make a nicer error if the user hasn't allowed http and the endpoint
// is http as the default message is "URL scheme is not allowed"
if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
return config_err!(
"Invalid endpoint: {endpoint}. \
HTTP is not allowed for S3 endpoints. \
To allow HTTP, set 'aws.allow_http' to true"
);
}
}

builder = builder.with_endpoint(endpoint);
}

if let Some(allow_http) = allow_http {
builder = builder.with_allow_http(*allow_http);
}

Ok(builder)
}

Expand Down Expand Up @@ -188,6 +217,8 @@ pub struct AwsOptions {
pub region: Option<String>,
/// OSS or COS Endpoint
pub endpoint: Option<String>,
/// Allow HTTP (otherwise will always use https)
pub allow_http: Option<bool>,
}

impl ExtensionOptions for AwsOptions {
Expand Down Expand Up @@ -219,11 +250,14 @@ impl ExtensionOptions for AwsOptions {
"region" => {
self.region.set(rem, value)?;
}
"oss" | "cos" => {
"oss" | "cos" | "endpoint" => {
self.endpoint.set(rem, value)?;
}
"allow_http" => {
self.allow_http.set(rem, value)?;
}
_ => {
return internal_err!("Config value \"{}\" not found on AwsOptions", rem);
return config_err!("Config value \"{}\" not found on AwsOptions", rem);
}
}
Ok(())
Expand Down Expand Up @@ -262,6 +296,7 @@ impl ExtensionOptions for AwsOptions {
self.session_token.visit(&mut v, "session_token", "");
self.region.visit(&mut v, "region", "");
self.endpoint.visit(&mut v, "endpoint", "");
self.allow_http.visit(&mut v, "allow_http", "");
v.0
}
}
Expand Down Expand Up @@ -307,7 +342,7 @@ impl ExtensionOptions for GcpOptions {
self.application_credentials_path.set(rem, value)?;
}
_ => {
return internal_err!("Config value \"{}\" not found on GcpOptions", rem);
return config_err!("Config value \"{}\" not found on GcpOptions", rem);
}
}
Ok(())
Expand Down Expand Up @@ -479,12 +514,21 @@ mod tests {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let region = "fake_us-east-2";
let endpoint = "endpoint33";
let session_token = "fake_session_token";
let location = "s3://bucket/path/file.parquet";

let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.region' '{region}', 'aws.session_token' {session_token}) LOCATION '{location}'");
let sql = format!(
"CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
('aws.access_key_id' '{access_key_id}', \
'aws.secret_access_key' '{secret_access_key}', \
'aws.region' '{region}', \
'aws.session_token' {session_token}, \
'aws.endpoint' '{endpoint}'\
) LOCATION '{location}'"
);

let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;
Expand All @@ -501,6 +545,7 @@ mod tests {
(AmazonS3ConfigKey::AccessKeyId, access_key_id),
(AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
(AmazonS3ConfigKey::Region, region),
(AmazonS3ConfigKey::Endpoint, endpoint),
(AmazonS3ConfigKey::Token, session_token),
];
for (key, value) in config {
Expand All @@ -513,6 +558,66 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn s3_object_store_builder_allow_http_error() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let endpoint = "http://endpoint33";
let location = "s3://bucket/path/file.parquet";

let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
let sql = format!(
"CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
('aws.access_key_id' '{access_key_id}', \
'aws.secret_access_key' '{secret_access_key}', \
'aws.endpoint' '{endpoint}'\
) LOCATION '{location}'"
);

let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
register_options(&ctx, scheme);
let mut table_options = ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
let err = get_s3_object_store_builder(table_url.as_ref(), aws_options)
.await
.unwrap_err();

assert_eq!(err.to_string(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

// Now add `allow_http` to the options and check if it works
let sql = format!(
"CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
('aws.access_key_id' '{access_key_id}', \
'aws.secret_access_key' '{secret_access_key}', \
'aws.endpoint' '{endpoint}',\
'aws.allow_http' 'true'\
) LOCATION '{location}'"
);

let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
register_options(&ctx, scheme);
let mut table_options = ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
// ensure this isn't an error
get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

Ok(())
}

#[tokio::test]
async fn oss_object_store_builder() -> Result<()> {
let access_key_id = "fake_access_key_id";
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ use crate::{
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
use crate::{functions, functions_aggregate, functions_array};

#[cfg(feature = "array_expressions")]
use crate::functions_array;
use crate::{functions, functions_aggregate};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,8 @@ fn add_hash_on_top(
n_target: usize,
) -> Result<DistributionContext> {
// Early return if hash repartition is unnecessary
if n_target == 1 {
// `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
return Ok(input);
}

Expand Down
Loading

0 comments on commit 37c446a

Please sign in to comment.