Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/doc_link
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 10, 2025
2 parents 7c9fc39 + 334d6ec commit d2bdc2b
Show file tree
Hide file tree
Showing 25 changed files with 482 additions and 193 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async-trait = "0.1.73"
bigdecimal = "0.4.7"
bytes = "1.4"
chrono = { version = "0.4.38", default-features = false }
ctor = "0.2.0"
ctor = "0.2.9"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "44.0.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "44.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ url = "2.5.4"

[dev-dependencies]
assert_cmd = "2.0"
ctor = "0.2.0"
ctor = "0.2.9"
predicates = "3.0"
rstest = "0.22"

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ mod tests {
fn test_py_scalar() {
init_python();

// TODO: remove this attribute when bumping pyo3 to v0.23.0
// See: <https://github.com/PyO3/pyo3/blob/v0.23.0/guide/src/migration.md#gil-refs-feature-removed>
#[allow(unexpected_cfgs)]
Python::with_gil(|py| {
let scalar_float = ScalarValue::Float64(Some(12.34));
let py_float = scalar_float.into_py(py).call_method0(py, "as_py").unwrap();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,8 +1148,8 @@ impl ListingTable {
/// This method first checks if the statistics for the given file are already cached.
/// If they are, it returns the cached statistics.
/// If they are not, it infers the statistics from the file and stores them in the cache.
async fn do_collect_statistics<'a>(
&'a self,
async fn do_collect_statistics(
&self,
ctx: &SessionState,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ impl SessionContext {
Ok(table)
}

async fn find_and_deregister<'a>(
async fn find_and_deregister(
&self,
table_ref: impl Into<TableReference>,
table_type: TableType,
Expand Down Expand Up @@ -1481,10 +1481,7 @@ impl SessionContext {
/// provided reference.
///
/// [`register_table`]: SessionContext::register_table
pub async fn table<'a>(
&self,
table_ref: impl Into<TableReference>,
) -> Result<DataFrame> {
pub async fn table(&self, table_ref: impl Into<TableReference>) -> Result<DataFrame> {
let table_ref: TableReference = table_ref.into();
let provider = self.table_provider(table_ref.clone()).await?;
let plan = LogicalPlanBuilder::scan(
Expand All @@ -1511,7 +1508,7 @@ impl SessionContext {
}

/// Return a [`TableProvider`] for the specified table.
pub async fn table_provider<'a>(
pub async fn table_provider(
&self,
table_ref: impl Into<TableReference>,
) -> Result<Arc<dyn TableProvider>> {
Expand Down

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ pub use datafusion_expr_common::columnar_value::ColumnarValue;
pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
pub use datafusion_expr_common::operator::Operator;
pub use datafusion_expr_common::signature::{
ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD,
ArrayFunctionSignature, Signature, TypeSignature, TypeSignatureClass, Volatility,
TIMEZONE_WILDCARD,
};
pub use datafusion_expr_common::type_coercion::binary;
pub use expr::{
Expand Down
33 changes: 25 additions & 8 deletions datafusion/expr/src/type_coercion/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,18 +438,11 @@ fn get_valid_types(
}

fn function_length_check(length: usize, expected_length: usize) -> Result<()> {
if length < 1 {
return plan_err!(
"The signature expected at least one argument but received {expected_length}"
);
}

if length != expected_length {
return plan_err!(
"The signature expected {length} arguments but received {expected_length}"
"The signature expected {expected_length} arguments but received {length}"
);
}

Ok(())
}

Expand Down Expand Up @@ -939,6 +932,7 @@ mod tests {

use super::*;
use arrow::datatypes::Field;
use datafusion_common::assert_contains;

#[test]
fn test_string_conversion() {
Expand Down Expand Up @@ -1027,6 +1021,29 @@ mod tests {
Ok(())
}

#[test]
fn test_get_valid_types_length_check() -> Result<()> {
let signature = TypeSignature::Numeric(1);

let err = get_valid_types(&signature, &[]).unwrap_err();
assert_contains!(
err.to_string(),
"The signature expected 1 arguments but received 0"
);

let err = get_valid_types(
&signature,
&[DataType::Int32, DataType::Int32, DataType::Int32],
)
.unwrap_err();
assert_contains!(
err.to_string(),
"The signature expected 1 arguments but received 3"
);

Ok(())
}

#[test]
fn test_fixed_list_wildcard_coerce() -> Result<()> {
let inner = Arc::new(Field::new_list_field(DataType::Int32, false));
Expand Down
5 changes: 3 additions & 2 deletions datafusion/functions-window/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,10 @@ impl PartitionEvaluator for NthValueEvaluator {
})
.unwrap_or_default();
if valid_indices.is_empty() {
return ScalarValue::try_from(arr.data_type());
None
} else {
Some(valid_indices)
}
Some(valid_indices)
} else {
None
};
Expand Down
5 changes: 5 additions & 0 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ harness = false
name = "strpos"
required-features = ["unicode_expressions"]

[[bench]]
harness = false
name = "reverse"
required-features = ["unicode_expressions"]

[[bench]]
harness = false
name = "trunc"
Expand Down
90 changes: 90 additions & 0 deletions datafusion/functions/benches/reverse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

extern crate criterion;

use arrow::array::OffsetSizeTrait;
use arrow::util::bench_util::{
create_string_array_with_len, create_string_view_array_with_len,
};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::ColumnarValue;
use datafusion_functions::unicode;
use std::sync::Arc;

fn create_args<O: OffsetSizeTrait>(
size: usize,
str_len: usize,
force_view_types: bool,
) -> Vec<ColumnarValue> {
if force_view_types {
let string_array =
Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false));

vec![ColumnarValue::Array(string_array)]
} else {
let string_array =
Arc::new(create_string_array_with_len::<O>(size, 0.1, str_len));

vec![ColumnarValue::Array(string_array)]
}
}

fn criterion_benchmark(c: &mut Criterion) {
let reverse = unicode::reverse();
for size in [1024, 4096] {
let str_len = 8;

let args = create_args::<i32>(size, str_len, true);
c.bench_function(
format!("reverse_string_view [size={}, str_len={}]", size, str_len).as_str(),
|b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(reverse.invoke_batch(&args, str_len))
})
},
);

let str_len = 32;

let args = create_args::<i32>(size, str_len, true);
c.bench_function(
format!("reverse_string_view [size={}, str_len={}]", size, str_len).as_str(),
|b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(reverse.invoke_batch(&args, str_len))
})
},
);

let args = create_args::<i32>(size, str_len, false);
c.bench_function(
format!("reverse_string [size={}, str_len={}]", size, str_len).as_str(),
|b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(reverse.invoke_batch(&args, str_len))
})
},
);
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
25 changes: 16 additions & 9 deletions datafusion/functions/src/unicode/reverse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use std::sync::Arc;

use crate::utils::{make_scalar_function, utf8_to_str_type};
use arrow::array::{
Array, ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray,
OffsetSizeTrait,
Array, ArrayRef, AsArray, GenericStringBuilder, OffsetSizeTrait, StringArrayType,
};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, Result};
Expand Down Expand Up @@ -105,8 +104,7 @@ impl ScalarUDFImpl for ReverseFunc {
}
}

/// Reverses the order of the characters in the string.
/// reverse('abcde') = 'edcba'
/// Reverses the order of the characters in the string `reverse('abcde') = 'edcba'`.
/// The implementation uses UTF-8 code points as characters
pub fn reverse<T: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
if args[0].data_type() == &Utf8View {
Expand All @@ -116,14 +114,23 @@ pub fn reverse<T: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}

fn reverse_impl<'a, T: OffsetSizeTrait, V: ArrayAccessor<Item = &'a str>>(
fn reverse_impl<'a, T: OffsetSizeTrait, V: StringArrayType<'a>>(
string_array: V,
) -> Result<ArrayRef> {
let result = ArrayIter::new(string_array)
.map(|string| string.map(|string: &str| string.chars().rev().collect::<String>()))
.collect::<GenericStringArray<T>>();
let mut builder = GenericStringBuilder::<T>::with_capacity(string_array.len(), 1024);

let mut reversed = String::new();
for string in string_array.iter() {
if let Some(s) = string {
reversed.extend(s.chars().rev());
builder.append_value(&reversed);
reversed.clear();
} else {
builder.append_null();
}
}

Ok(Arc::new(result) as ArrayRef)
Ok(Arc::new(builder.finish()) as ArrayRef)
}

#[cfg(test)]
Expand Down
4 changes: 1 addition & 3 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ pub trait DynEq {

impl<T: Eq + Any> DynEq for T {
fn dyn_eq(&self, other: &dyn Any) -> bool {
other
.downcast_ref::<Self>()
.map_or(false, |other| other == self)
other.downcast_ref::<Self>() == Some(self)
}
}

Expand Down
37 changes: 34 additions & 3 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ impl PhysicalSortRequirement {
/// Returns whether this requirement is equal or more specific than `other`.
pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
self.expr.eq(&other.expr)
&& other.options.map_or(true, |other_opts| {
self.options.map_or(false, |opts| opts == other_opts)
})
&& other
.options
.map_or(true, |other_opts| self.options == Some(other_opts))
}

#[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")]
Expand Down Expand Up @@ -409,6 +409,22 @@ impl LexOrdering {
.map(PhysicalSortExpr::from)
.collect()
}

/// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression.
///
/// This function filters duplicate entries that have same physical
/// expression inside, ignoring [`SortOptions`]. For example:
///
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
pub fn collapse(self) -> Self {
let mut output = LexOrdering::default();
for item in self {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
output
}
}

impl From<Vec<PhysicalSortExpr>> for LexOrdering {
Expand Down Expand Up @@ -540,6 +556,21 @@ impl LexRequirement {
.collect(),
)
}

/// Constructs a duplicate-free `LexOrderingReq` by filtering out
/// duplicate entries that have same physical expression inside.
///
/// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a
/// Some(ASC)]`.
pub fn collapse(self) -> Self {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in self {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
LexRequirement::new(output)
}
}

impl From<LexOrdering> for LexRequirement {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
use super::{add_offset_to_expr, ProjectionMapping};
use crate::{
expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr, PhysicalSortRequirement,
Expand Down Expand Up @@ -526,12 +526,13 @@ impl EquivalenceGroup {
&self,
sort_reqs: &LexRequirement,
) -> LexRequirement {
collapse_lex_req(LexRequirement::new(
LexRequirement::new(
sort_reqs
.iter()
.map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
.collect(),
))
)
.collapse()
}

/// Projects `expr` according to the given projection mapping.
Expand Down
Loading

0 comments on commit d2bdc2b

Please sign in to comment.