Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into feature/generate_Grou…
Browse files Browse the repository at this point in the history
…pByHash_output_in_multiple_rb
  • Loading branch information
alamb committed Aug 8, 2024
2 parents c6d640b + 2521043 commit 296b450
Show file tree
Hide file tree
Showing 61 changed files with 1,161 additions and 670 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.10.1", default-features = false }
object_store = { version = "0.10.2", default-features = false }
parking_lot = "0.12"
parquet = { version = "52.2.0", default-features = false, features = [
"arrow",
Expand Down
6 changes: 2 additions & 4 deletions datafusion-cli/examples/cli-session-context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl CliSessionContext for MyUnionerContext {
#[tokio::main]
/// Runs the example.
pub async fn main() {
let mut my_ctx = MyUnionerContext::default();
let my_ctx = MyUnionerContext::default();

let mut print_options = PrintOptions {
format: datafusion_cli::print_format::PrintFormat::Automatic,
Expand All @@ -91,7 +91,5 @@ pub async fn main() {
color: true,
};

exec_from_repl(&mut my_ctx, &mut print_options)
.await
.unwrap();
exec_from_repl(&my_ctx, &mut print_options).await.unwrap();
}
2 changes: 1 addition & 1 deletion datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ mod tests {
use datafusion::prelude::SessionContext;

fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let mut ctx = SessionContext::new();
let ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub enum OutputFormat {
impl Command {
pub async fn execute(
&self,
ctx: &mut dyn CliSessionContext,
ctx: &dyn CliSessionContext,
print_options: &mut PrintOptions,
) -> Result<()> {
match self {
Expand Down
16 changes: 8 additions & 8 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use tokio::signal;

/// run and execute SQL statements and commands, against a context with the given print options
pub async fn exec_from_commands(
ctx: &mut dyn CliSessionContext,
ctx: &dyn CliSessionContext,
commands: Vec<String>,
print_options: &PrintOptions,
) -> Result<()> {
Expand All @@ -62,7 +62,7 @@ pub async fn exec_from_commands(

/// run and execute SQL statements and commands from a file, against a context with the given print options
pub async fn exec_from_lines(
ctx: &mut dyn CliSessionContext,
ctx: &dyn CliSessionContext,
reader: &mut BufReader<File>,
print_options: &PrintOptions,
) -> Result<()> {
Expand Down Expand Up @@ -102,7 +102,7 @@ pub async fn exec_from_lines(
}

pub async fn exec_from_files(
ctx: &mut dyn CliSessionContext,
ctx: &dyn CliSessionContext,
files: Vec<String>,
print_options: &PrintOptions,
) -> Result<()> {
Expand All @@ -121,7 +121,7 @@ pub async fn exec_from_files(

/// run and execute SQL statements and commands against a context with the given print options
pub async fn exec_from_repl(
ctx: &mut dyn CliSessionContext,
ctx: &dyn CliSessionContext,
print_options: &mut PrintOptions,
) -> rustyline::Result<()> {
let mut rl = Editor::new()?;
Expand Down Expand Up @@ -204,7 +204,7 @@ pub async fn exec_from_repl(
}

pub(super) async fn exec_and_print(
ctx: &mut dyn CliSessionContext,
ctx: &dyn CliSessionContext,
print_options: &PrintOptions,
sql: String,
) -> Result<()> {
Expand Down Expand Up @@ -300,7 +300,7 @@ fn config_file_type_from_str(ext: &str) -> Option<ConfigFileType> {
}

async fn create_plan(
ctx: &mut dyn CliSessionContext,
ctx: &dyn CliSessionContext,
statement: Statement,
) -> Result<LogicalPlan, DataFusionError> {
let mut plan = ctx.session_state().statement_to_plan(statement).await?;
Expand Down Expand Up @@ -473,7 +473,7 @@ mod tests {
"cos://bucket/path/file.parquet",
"gcs://bucket/path/file.parquet",
];
let mut ctx = SessionContext::new();
let ctx = SessionContext::new();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
Expand All @@ -488,7 +488,7 @@ mod tests {
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
//Should not fail
let mut plan = create_plan(&mut ctx, statement).await?;
let mut plan = create_plan(&ctx, statement).await?;
if let LogicalPlan::Copy(copy_to) = &mut plan {
assert_eq!(copy_to.output_url, location);
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
Expand Down
10 changes: 5 additions & 5 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn main_inner() -> Result<()> {

let runtime_env = create_runtime_env(rt_config.clone())?;

let mut ctx =
let ctx =
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that knows how to open files
Expand Down Expand Up @@ -212,20 +212,20 @@ async fn main_inner() -> Result<()> {

if commands.is_empty() && files.is_empty() {
if !rc.is_empty() {
exec::exec_from_files(&mut ctx, rc, &print_options).await?;
exec::exec_from_files(&ctx, rc, &print_options).await?;
}
// TODO maybe we can have thiserror for cli but for now let's keep it simple
return exec::exec_from_repl(&mut ctx, &mut print_options)
return exec::exec_from_repl(&ctx, &mut print_options)
.await
.map_err(|e| DataFusionError::External(Box::new(e)));
}

if !files.is_empty() {
exec::exec_from_files(&mut ctx, files, &print_options).await?;
exec::exec_from_files(&ctx, files, &print_options).await?;
}

if !commands.is_empty() {
exec::exec_from_commands(&mut ctx, commands, &print_options).await?;
exec::exec_from_commands(&ctx, commands, &print_options).await?;
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<()> {
let dir_a = prepare_example_data()?;
let dir_b = prepare_example_data()?;

let mut ctx = SessionContext::new();
let ctx = SessionContext::new();
let state = ctx.state();
let catlist = Arc::new(CustomCatalogProviderList::new());

Expand Down
16 changes: 16 additions & 0 deletions datafusion-examples/examples/planner_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use datafusion::error::Result;
use datafusion::physical_plan::displayable;
use datafusion::physical_planner::DefaultPhysicalPlanner;
use datafusion::prelude::*;
use datafusion_expr::{LogicalPlan, PlanType};

Expand Down Expand Up @@ -123,5 +124,20 @@ async fn to_physical_plan_step_by_step_demo(
.plan
);

// Call the physical optimizer with an existing physical plan (in this
// case the plan is already optimized, but an unoptimized plan would
// typically be used in this context)
// Note that this is not part of the trait but a public method
// on DefaultPhysicalPlanner. Not all planners will provide this feature.
let planner = DefaultPhysicalPlanner::default();
let physical_plan =
planner.optimize_physical_plan(physical_plan, &ctx.state(), |_, _| {})?;
println!(
"Optimized physical plan:\n\n{}\n\n",
displayable(physical_plan.as_ref())
.to_stringified(false, PlanType::InitialPhysicalPlan)
.plan
);

Ok(())
}
14 changes: 7 additions & 7 deletions datafusion/core/benches/filter_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::executor::block_on;
use std::sync::Arc;
use tokio::runtime::Runtime;

async fn query(ctx: &mut SessionContext, sql: &str) {
async fn query(ctx: &SessionContext, sql: &str) {
let rt = Runtime::new().unwrap();

// execute the query
Expand Down Expand Up @@ -70,25 +70,25 @@ fn criterion_benchmark(c: &mut Criterion) {
let batch_size = 4096; // 2^12

c.bench_function("filter_array", |b| {
let mut ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| block_on(query(&mut ctx, "select f32, f64 from t where f32 >= f64")))
let ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| block_on(query(&ctx, "select f32, f64 from t where f32 >= f64")))
});

c.bench_function("filter_scalar", |b| {
let mut ctx = create_context(array_len, batch_size).unwrap();
let ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| {
block_on(query(
&mut ctx,
&ctx,
"select f32, f64 from t where f32 >= 250 and f64 > 250",
))
})
});

c.bench_function("filter_scalar in list", |b| {
let mut ctx = create_context(array_len, batch_size).unwrap();
let ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| {
block_on(query(
&mut ctx,
&ctx,
"select f32, f64 from t where f32 in (10, 20, 30, 40)",
))
})
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1550,7 +1550,7 @@ impl DataFrame {
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// # use datafusion_common::ScalarValue;
/// let mut ctx = SessionContext::new();
/// let ctx = SessionContext::new();
/// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
/// let results = ctx
/// .sql("SELECT a FROM example WHERE b = $1")
Expand Down Expand Up @@ -2649,8 +2649,8 @@ mod tests {

#[tokio::test]
async fn registry() -> Result<()> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
let ctx = SessionContext::new();
register_aggregate_csv(&ctx, "aggregate_test_100").await?;

// declare the udf
let my_fn: ScalarFunctionImplementation =
Expand Down Expand Up @@ -2783,8 +2783,8 @@ mod tests {

/// Create a logical plan from a SQL query
async fn create_plan(sql: &str) -> Result<LogicalPlan> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
let ctx = SessionContext::new();
register_aggregate_csv(&ctx, "aggregate_test_100").await?;
Ok(ctx.sql(sql).await?.into_unoptimized_plan())
}

Expand Down Expand Up @@ -3147,9 +3147,9 @@ mod tests {
"datafusion.sql_parser.enable_ident_normalization".to_owned(),
"false".to_owned(),
)]))?;
let mut ctx = SessionContext::new_with_config(config);
let ctx = SessionContext::new_with_config(config);
let name = "aggregate_test_100";
register_aggregate_csv(&mut ctx, name).await?;
register_aggregate_csv(&ctx, name).await?;
let df = ctx.table(name);

let df = df
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,14 @@ mod tests {
async fn write_parquet_with_small_rg_size() -> Result<()> {
// This test verifies writing a parquet file with small rg size
// relative to datafusion.execution.batch_size does not panic
let mut ctx = SessionContext::new_with_config(
SessionConfig::from_string_hash_map(HashMap::from_iter(
let ctx = SessionContext::new_with_config(SessionConfig::from_string_hash_map(
HashMap::from_iter(
[("datafusion.execution.batch_size", "10")]
.iter()
.map(|(s1, s2)| (s1.to_string(), s2.to_string())),
))?,
);
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
),
)?);
register_aggregate_csv(&ctx, "aggregate_test_100").await?;
let test_df = ctx.table("aggregate_test_100").await?;

let output_path = "file://local/test.parquet";
Expand Down
21 changes: 17 additions & 4 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Helper functions for the table implementation
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;

use super::PartitionedFile;
Expand Down Expand Up @@ -138,10 +139,22 @@ pub fn split_files(

// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
partitioned_files
.chunks(chunk_size)
.map(|c| c.to_vec())
.collect()
let mut chunks = Vec::with_capacity(n);
let mut current_chunk = Vec::with_capacity(chunk_size);
for file in partitioned_files.drain(..) {
current_chunk.push(file);
if current_chunk.len() == chunk_size {
let full_chunk =
mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
chunks.push(full_chunk);
}
}

if !current_chunk.is_empty() {
chunks.push(current_chunk)
}

chunks
}

struct Partition {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct PartitionedFile {
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: impl Into<String>, size: u64) -> Self {
Expand Down
26 changes: 14 additions & 12 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,15 +973,16 @@ impl ListingTable {
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
let mut part_file = part_file?;
let part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
Ok((part_file, statistics))
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
as Result<(PartitionedFile, Statistics)>
Ok((
part_file,
Arc::new(Statistics::new_unknown(&self.file_schema)),
))
}
})
.boxed()
Expand Down Expand Up @@ -1011,12 +1012,12 @@ impl ListingTable {
ctx: &SessionState,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> Result<Statistics> {
let statistics_cache = self.collected_statistics.clone();
return match statistics_cache
) -> Result<Arc<Statistics>> {
match self
.collected_statistics
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
Some(statistics) => Ok(statistics.as_ref().clone()),
Some(statistics) => Ok(statistics.clone()),
None => {
let statistics = self
.options
Expand All @@ -1028,14 +1029,15 @@ impl ListingTable {
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
let statistics = Arc::new(statistics);
self.collected_statistics.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
statistics.clone(),
&part_file.object_meta,
);
Ok(statistics)
}
};
}
}
}

Expand Down
Loading

0 comments on commit 296b450

Please sign in to comment.