Skip to content

Commit

Permalink
Merge commit '4109f581ce9bca956e01f13ff16b30d59720e96b' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-june-week-3
  • Loading branch information
appletreeisyellow committed Jun 25, 2024
2 parents 3372aca + 4109f58 commit bb55b3a
Show file tree
Hide file tree
Showing 77 changed files with 3,352 additions and 2,113 deletions.
97 changes: 97 additions & 0 deletions datafusion-cli/examples/cli-session-context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Shows an example of a custom session context that unions the input plan with itself.
//! To run this example, use `cargo run --example cli-session-context` from within the `datafusion-cli` directory.

use std::sync::Arc;

use datafusion::{
dataframe::DataFrame,
error::DataFusionError,
execution::{context::SessionState, TaskContext},
logical_expr::{LogicalPlan, LogicalPlanBuilder},
prelude::SessionContext,
};
use datafusion_cli::{
cli_context::CliSessionContext, exec::exec_from_repl, print_options::PrintOptions,
};
use object_store::ObjectStore;

/// This is a toy example of a custom session context that unions the input plan with itself.
struct MyUnionerContext {
ctx: SessionContext,
}

impl Default for MyUnionerContext {
fn default() -> Self {
Self {
ctx: SessionContext::new(),
}
}
}

#[async_trait::async_trait]
impl CliSessionContext for MyUnionerContext {
fn task_ctx(&self) -> Arc<TaskContext> {
self.ctx.task_ctx()
}

fn session_state(&self) -> SessionState {
self.ctx.state()
}

fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>> {
self.ctx.register_object_store(url, object_store)
}

fn register_table_options_extension_from_scheme(&self, _scheme: &str) {
unimplemented!()
}

async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError> {
let new_plan = LogicalPlanBuilder::from(plan.clone())
.union(plan.clone())?
.build()?;

self.ctx.execute_logical_plan(new_plan).await
}
}

#[tokio::main]
/// Runs the example.
pub async fn main() {
let mut my_ctx = MyUnionerContext::default();

let mut print_options = PrintOptions {
format: datafusion_cli::print_format::PrintFormat::Automatic,
quiet: false,
maxrows: datafusion_cli::print_options::MaxRows::Unlimited,
color: true,
};

exec_from_repl(&mut my_ctx, &mut print_options)
.await
.unwrap();
}
98 changes: 98 additions & 0 deletions datafusion-cli/src/cli_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion::{
dataframe::DataFrame,
error::DataFusionError,
execution::{context::SessionState, TaskContext},
logical_expr::LogicalPlan,
prelude::SessionContext,
};
use object_store::ObjectStore;

use crate::object_storage::{AwsOptions, GcpOptions};

#[async_trait::async_trait]
/// The CLI session context trait provides a way to have a session context that can be used with datafusion's CLI code.
pub trait CliSessionContext {
/// Get an atomic reference counted task context.
fn task_ctx(&self) -> Arc<TaskContext>;

/// Get the session state.
fn session_state(&self) -> SessionState;

/// Register an object store with the session context.
fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>>;

/// Register table options extension from scheme.
fn register_table_options_extension_from_scheme(&self, scheme: &str);

/// Execute a logical plan and return a DataFrame.
async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError>;
}

#[async_trait::async_trait]
impl CliSessionContext for SessionContext {
fn task_ctx(&self) -> Arc<TaskContext> {
self.task_ctx()
}

fn session_state(&self) -> SessionState {
self.state()
}

fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>> {
self.register_object_store(url, object_store)
}

fn register_table_options_extension_from_scheme(&self, scheme: &str) {
match scheme {
// For Amazon S3 or Alibaba Cloud OSS
"s3" | "oss" | "cos" => {
// Register AWS specific table options in the session context:
self.register_table_options_extension(AwsOptions::default())
}
// For Google Cloud Storage
"gs" | "gcs" => {
// Register GCP specific table options in the session context:
self.register_table_options_extension(GcpOptions::default())
}
// For unsupported schemes, do nothing:
_ => {}
}
}

async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError> {
self.execute_logical_plan(plan).await
}
}
4 changes: 2 additions & 2 deletions datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Command within CLI

use crate::cli_context::CliSessionContext;
use crate::exec::{exec_and_print, exec_from_lines};
use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
Expand All @@ -28,7 +29,6 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::exec_err;
use datafusion::common::instant::Instant;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
Expand All @@ -55,7 +55,7 @@ pub enum OutputFormat {
impl Command {
pub async fn execute(
&self,
ctx: &mut SessionContext,
ctx: &mut dyn CliSessionContext,
print_options: &mut PrintOptions,
) -> Result<()> {
match self {
Expand Down
28 changes: 15 additions & 13 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use std::io::prelude::*;
use std::io::BufReader;
use std::str::FromStr;

use crate::cli_context::CliSessionContext;
use crate::helper::split_from_semicolon;
use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
object_storage::{get_object_store, register_options},
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};

Expand All @@ -38,7 +39,6 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::prelude::SessionContext;
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

Expand All @@ -50,7 +50,7 @@ use tokio::signal;

/// run and execute SQL statements and commands, against a context with the given print options
pub async fn exec_from_commands(
ctx: &mut SessionContext,
ctx: &mut dyn CliSessionContext,
commands: Vec<String>,
print_options: &PrintOptions,
) -> Result<()> {
Expand All @@ -63,7 +63,7 @@ pub async fn exec_from_commands(

/// run and execute SQL statements and commands from a file, against a context with the given print options
pub async fn exec_from_lines(
ctx: &mut SessionContext,
ctx: &mut dyn CliSessionContext,
reader: &mut BufReader<File>,
print_options: &PrintOptions,
) -> Result<()> {
Expand Down Expand Up @@ -103,7 +103,7 @@ pub async fn exec_from_lines(
}

pub async fn exec_from_files(
ctx: &mut SessionContext,
ctx: &mut dyn CliSessionContext,
files: Vec<String>,
print_options: &PrintOptions,
) -> Result<()> {
Expand All @@ -122,7 +122,7 @@ pub async fn exec_from_files(

/// run and execute SQL statements and commands against a context with the given print options
pub async fn exec_from_repl(
ctx: &mut SessionContext,
ctx: &mut dyn CliSessionContext,
print_options: &mut PrintOptions,
) -> rustyline::Result<()> {
let mut rl = Editor::new()?;
Expand Down Expand Up @@ -205,7 +205,7 @@ pub async fn exec_from_repl(
}

pub(super) async fn exec_and_print(
ctx: &mut SessionContext,
ctx: &mut dyn CliSessionContext,
print_options: &PrintOptions,
sql: String,
) -> Result<()> {
Expand Down Expand Up @@ -292,10 +292,10 @@ impl AdjustedPrintOptions {
}

async fn create_plan(
ctx: &mut SessionContext,
ctx: &mut dyn CliSessionContext,
statement: Statement,
) -> Result<LogicalPlan, DataFusionError> {
let mut plan = ctx.state().statement_to_plan(statement).await?;
let mut plan = ctx.session_state().statement_to_plan(statement).await?;

// Note that cmd is a mutable reference so that create_external_table function can remove all
// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
Expand Down Expand Up @@ -354,7 +354,7 @@ async fn create_plan(
/// alteration fails, or if the object store cannot be retrieved and registered
/// successfully.
pub(crate) async fn register_object_store_and_config_extensions(
ctx: &SessionContext,
ctx: &dyn CliSessionContext,
location: &String,
options: &HashMap<String, String>,
format: Option<FileType>,
Expand All @@ -369,17 +369,18 @@ pub(crate) async fn register_object_store_and_config_extensions(
let url = table_path.as_ref();

// Register the options based on the scheme extracted from the location
register_options(ctx, scheme);
ctx.register_table_options_extension_from_scheme(scheme);

// Clone and modify the default table options based on the provided options
let mut table_options = ctx.state().default_table_options().clone();
let mut table_options = ctx.session_state().default_table_options().clone();
if let Some(format) = format {
table_options.set_file_format(format);
}
table_options.alter_with_string_hash_map(options)?;

// Retrieve the appropriate object store based on the scheme, URL, and modified table options
let store = get_object_store(&ctx.state(), scheme, url, &table_options).await?;
let store =
get_object_store(&ctx.session_state(), scheme, url, &table_options).await?;

// Register the retrieved object store in the session context's runtime environment
ctx.register_object_store(url, store);
Expand All @@ -394,6 +395,7 @@ mod tests {
use datafusion::common::config::FormatOptions;
use datafusion::common::plan_err;

use datafusion::prelude::SessionContext;
use url::Url;

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");

pub mod catalog;
pub mod cli_context;
pub mod command;
pub mod exec;
pub mod functions;
pub mod helper;
pub mod highlighter;
pub mod object_storage;
pub mod pool_type;
pub mod print_format;
pub mod print_options;
Loading

0 comments on commit bb55b3a

Please sign in to comment.