Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into feature/group_by_unnest
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 16, 2024
2 parents 2309e8b + 81aff94 commit b37a1e3
Show file tree
Hide file tree
Showing 44 changed files with 375 additions and 314 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ rand = "0.8"
regex = "1.8"
rstest = "0.21.0"
serde_json = "1"
sqlparser = { version = "0.47", features = ["visitor"] }
sqlparser = { version = "0.48", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 9 additions & 68 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::fmt::Debug;
use std::sync::Arc;

use super::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, SendableRecordBatchStream,
execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
};
use crate::metrics::MetricsSet;
use crate::stream::RecordBatchStreamAdapter;
Expand All @@ -33,7 +33,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::{ArrayRef, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::{exec_err, internal_err, Result};
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, PhysicalSortRequirement,
Expand Down Expand Up @@ -120,46 +120,6 @@ impl DataSinkExec {
}
}

fn execute_input_stream(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input_stream = self.input.execute(partition, context)?;

debug_assert_eq!(
self.sink_schema.fields().len(),
self.input.schema().fields().len()
);

// Find input columns that may violate the not null constraint.
let risky_columns: Vec<_> = self
.sink_schema
.fields()
.iter()
.zip(self.input.schema().fields().iter())
.enumerate()
.filter_map(|(i, (sink_field, input_field))| {
if !sink_field.is_nullable() && input_field.is_nullable() {
Some(i)
} else {
None
}
})
.collect();

if risky_columns.is_empty() {
Ok(input_stream)
} else {
// Check not null constraint on the input stream
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.sink_schema),
input_stream
.map(move |batch| check_not_null_contraits(batch?, &risky_columns)),
)))
}
}

/// Input execution plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
Expand Down Expand Up @@ -269,7 +229,12 @@ impl ExecutionPlan for DataSinkExec {
if partition != 0 {
return internal_err!("DataSinkExec can only be called on partition 0!");
}
let data = self.execute_input_stream(0, Arc::clone(&context))?;
let data = execute_input_stream(
Arc::clone(&self.input),
Arc::clone(&self.sink_schema),
0,
Arc::clone(&context),
)?;

let count_schema = Arc::clone(&self.count_schema);
let sink = Arc::clone(&self.sink);
Expand Down Expand Up @@ -314,27 +279,3 @@ fn make_count_schema() -> SchemaRef {
false,
)]))
}

fn check_not_null_contraits(
batch: RecordBatch,
column_indices: &Vec<usize>,
) -> Result<RecordBatch> {
for &index in column_indices {
if batch.num_columns() <= index {
return exec_err!(
"Invalid batch column count {} expected > {}",
batch.num_columns(),
index
);
}

if batch.column(index).null_count() > 0 {
return exec_err!(
"Invalid batch column at '{}' has null but schema specifies non-nullable",
index
);
}
}

Ok(batch)
}
97 changes: 94 additions & 3 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{exec_datafusion_err, Result};
use datafusion_common::{exec_datafusion_err, exec_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
};

use futures::stream::TryStreamExt;
use futures::stream::{StreamExt, TryStreamExt};
use log::debug;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -97,7 +97,7 @@ pub use datafusion_physical_expr::{
// Backwards compatibility
use crate::common::IPCWriter;
pub use crate::stream::EmptyRecordBatchStream;
use crate::stream::RecordBatchReceiverStream;
use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
Expand Down Expand Up @@ -805,6 +805,97 @@ pub fn execute_stream_partitioned(
Ok(streams)
}

/// Executes an input stream and ensures that the resulting stream adheres to
/// the `not null` constraints specified in the `sink_schema`.
///
/// # Arguments
///
/// * `input` - An execution plan
/// * `sink_schema` - The schema to be applied to the output stream
/// * `partition` - The partition index to be executed
/// * `context` - The task context
///
/// # Returns
///
/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
///
/// This function first executes the given input plan for the specified partition
/// and context. It then checks if there are any columns in the input that might
/// violate the `not null` constraints specified in the `sink_schema`. If there are
/// such columns, it wraps the resulting stream to enforce the `not null` constraints
/// by invoking the `check_not_null_contraits` function on each batch of the stream.
pub fn execute_input_stream(
input: Arc<dyn ExecutionPlan>,
sink_schema: SchemaRef,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input_stream = input.execute(partition, context)?;

debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());

// Find input columns that may violate the not null constraint.
let risky_columns: Vec<_> = sink_schema
.fields()
.iter()
.zip(input.schema().fields().iter())
.enumerate()
.filter_map(|(idx, (sink_field, input_field))| {
(!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
})
.collect();

if risky_columns.is_empty() {
Ok(input_stream)
} else {
// Check not null constraint on the input stream
Ok(Box::pin(RecordBatchStreamAdapter::new(
sink_schema,
input_stream
.map(move |batch| check_not_null_contraits(batch?, &risky_columns)),
)))
}
}

/// Checks a `RecordBatch` for `not null` constraints on specified columns.
///
/// # Arguments
///
/// * `batch` - The `RecordBatch` to be checked
/// * `column_indices` - A vector of column indices that should be checked for
/// `not null` constraints.
///
/// # Returns
///
/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
///
/// This function iterates over the specified column indices and ensures that none
/// of the columns contain null values. If any column contains null values, an error
/// is returned.
pub fn check_not_null_contraits(
batch: RecordBatch,
column_indices: &Vec<usize>,
) -> Result<RecordBatch> {
for &index in column_indices {
if batch.num_columns() <= index {
return exec_err!(
"Invalid batch column count {} expected > {}",
batch.num_columns(),
index
);
}

if batch.column(index).null_count() > 0 {
return exec_err!(
"Invalid batch column at '{}' has null but schema specifies non-nullable",
index
);
}
}

Ok(batch)
}

/// Utility function yielding a string representation of the given [`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
None,
&message.version(),
)?;
Ok(record_batch.column(0).clone())
Ok(Arc::clone(record_batch.column(0)))
}
_ => Err(Error::General("dictionary id not found in schema while deserializing ScalarValue::List".to_string())),
}?;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]

//! Serialize / Deserialize DataFusion Primitive Types to bytes
//!
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
#![deny(clippy::clone_on_ref_ptr)]

//! Serialize / Deserialize DataFusion Plans to bytes
//!
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ pub fn parse_physical_expr(
Some(buf) => codec.try_decode_udf(&e.name, buf)?,
None => registry.udf(e.name.as_str())?,
};
let scalar_fun_def = udf.clone();
let scalar_fun_def = Arc::clone(&udf);

let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?;

Expand Down
18 changes: 9 additions & 9 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
where
Self: Sized,
{
let plan_clone = plan.clone();
let plan_clone = Arc::clone(&plan);
let plan = plan.as_any();

if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
Expand All @@ -1128,7 +1128,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
let expr = exec
.expr()
.iter()
.map(|expr| serialize_physical_expr(expr.0.clone(), extension_codec))
.map(|expr| serialize_physical_expr(Arc::clone(&expr.0), extension_codec))
.collect::<Result<Vec<_>>>()?;
let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
return Ok(protobuf::PhysicalPlanNode {
Expand Down Expand Up @@ -1169,7 +1169,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
protobuf::FilterExecNode {
input: Some(Box::new(input)),
expr: Some(serialize_physical_expr(
exec.predicate().clone(),
Arc::clone(exec.predicate()),
extension_codec,
)?),
default_filter_selectivity: exec.default_selectivity() as u32,
Expand Down Expand Up @@ -1585,7 +1585,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
let predicate = exec
.predicate()
.map(|pred| serialize_physical_expr(pred.clone(), extension_codec))
.map(|pred| serialize_physical_expr(Arc::clone(pred), extension_codec))
.transpose()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
Expand Down Expand Up @@ -1810,13 +1810,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
let window_expr = exec
.window_expr()
.iter()
.map(|e| serialize_physical_window_expr(e.clone(), extension_codec))
.map(|e| serialize_physical_window_expr(Arc::clone(e), extension_codec))
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;

let partition_keys = exec
.partition_keys
.iter()
.map(|e| serialize_physical_expr(e.clone(), extension_codec))
.map(|e| serialize_physical_expr(Arc::clone(e), extension_codec))
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;

return Ok(protobuf::PhysicalPlanNode {
Expand All @@ -1840,13 +1840,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
let window_expr = exec
.window_expr()
.iter()
.map(|e| serialize_physical_window_expr(e.clone(), extension_codec))
.map(|e| serialize_physical_window_expr(Arc::clone(e), extension_codec))
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;

let partition_keys = exec
.partition_keys
.iter()
.map(|e| serialize_physical_expr(e.clone(), extension_codec))
.map(|e| serialize_physical_expr(Arc::clone(e), extension_codec))
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;

let input_order_mode = match &exec.input_order_mode {
Expand Down Expand Up @@ -1949,7 +1949,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
}

let mut buf: Vec<u8> = vec![];
match extension_codec.try_encode(plan_clone.clone(), &mut buf) {
match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
Ok(_) => {
let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
.children()
Expand Down
Loading

0 comments on commit b37a1e3

Please sign in to comment.