From 538e0d158b231339cf0757e56d938ad3067abfc0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 30 Oct 2024 16:22:51 -0400 Subject: [PATCH] FFI initial implementation (#12920) * Initial commit of FFI table provider code * Add table type * Make struct pub * Implementing supports_filters_pushdown * Move plan properties over to its own file * Adding release function * Adding release functions to additional structs * Resolve memory leaks * Rename ForeignExecutionPlan for consistency * Resolving memory leak issues * Remove debug statements. Create runtime for block_on operations * Switching over to stable abi and async-ffi * Make consistent the use of Foreign and FFI on struct names * Apply prettier * Format for linter * Add doc-comment * Add option to specify table provider does not support pushdown filters to avoid extra work for some providers * Remove setting default features in cargo file * Tokio only needed for unit tests * Provide log errors rather than failing silently on schema requests * Set default features for datafusion to false in ffi crate * Using TryFrom or From instead of implementing new when there is only one parameter * Move arrow wrappers into their own file * Add documentation * Small adjustment to documentation * Add license text * Fix unnecessary qualification * taplo format --- Cargo.toml | 2 + datafusion/ffi/Cargo.toml | 51 +++ datafusion/ffi/README.md | 81 ++++ datafusion/ffi/src/arrow_wrappers.rs | 70 ++++ datafusion/ffi/src/execution_plan.rs | 361 ++++++++++++++++++ datafusion/ffi/src/lib.rs | 29 ++ datafusion/ffi/src/plan_properties.rs | 297 +++++++++++++++ datafusion/ffi/src/record_batch_stream.rs | 176 +++++++++ datafusion/ffi/src/session_config.rs | 187 +++++++++ datafusion/ffi/src/table_provider.rs | 443 ++++++++++++++++++++++ datafusion/ffi/src/table_source.rs | 87 +++++ 11 files changed, 1784 insertions(+) create mode 100644 datafusion/ffi/Cargo.toml create mode 100644 datafusion/ffi/README.md create mode 100644 datafusion/ffi/src/arrow_wrappers.rs create mode 100644 datafusion/ffi/src/execution_plan.rs create mode 100644 datafusion/ffi/src/lib.rs create mode 100644 datafusion/ffi/src/plan_properties.rs create mode 100644 datafusion/ffi/src/record_batch_stream.rs create mode 100644 datafusion/ffi/src/session_config.rs create mode 100644 datafusion/ffi/src/table_provider.rs create mode 100644 datafusion/ffi/src/table_source.rs diff --git a/Cargo.toml b/Cargo.toml index 0a7184ad2d99..2f8896f7d90c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "datafusion/expr", "datafusion/expr-common", "datafusion/execution", + "datafusion/ffi", "datafusion/functions", "datafusion/functions-aggregate", "datafusion/functions-aggregate-common", @@ -99,6 +100,7 @@ datafusion-common-runtime = { path = "datafusion/common-runtime", version = "42. datafusion-execution = { path = "datafusion/execution", version = "42.1.0" } datafusion-expr = { path = "datafusion/expr", version = "42.1.0" } datafusion-expr-common = { path = "datafusion/expr-common", version = "42.1.0" } +datafusion-ffi = { path = "datafusion/ffi", version = "42.1.0" } datafusion-functions = { path = "datafusion/functions", version = "42.1.0" } datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "42.1.0" } datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "42.1.0" } diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml new file mode 100644 index 000000000000..119747342515 --- /dev/null +++ b/datafusion/ffi/Cargo.toml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-ffi" +description = "Foreign Function Interface implementation for DataFusion" +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +# Specify MSRV here as `cargo msrv` doesn't support workspace version +rust-version = "1.76" + +[lints] +workspace = true + +[lib] +name = "datafusion_ffi" +path = "src/lib.rs" + +[dependencies] +abi_stable = "0.11.3" +arrow = { workspace = true, features = ["ffi"] } +async-ffi = { version = "0.5.0", features = ["abi_stable"] } +async-trait = { workspace = true } +datafusion = { workspace = true, default-features = false } +datafusion-proto = { workspace = true } +doc-comment = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +prost = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } diff --git a/datafusion/ffi/README.md b/datafusion/ffi/README.md new file mode 100644 index 000000000000..ba4bb8b961a1 --- /dev/null +++ b/datafusion/ffi/README.md @@ -0,0 +1,81 @@ + + +# `datafusion-ffi`: Apache DataFusion Foreign Function Interface + +This crate contains code to allow interoperability of Apache [DataFusion] +with functions from other languages using a stable interface. + +See [API Docs] for details and examples. + +We expect this crate may be used by both sides of the FFI. This allows users +to create modules that can interoperate with the necessity of using the same +version of DataFusion. The driving use case has been the `datafusion-python` +repository, but many other use cases may exist. We envision at least two +use cases. + +1. `datafusion-python` which will use the FFI to provide external services such + as a `TableProvider` without needing to re-export the entire `datafusion-python` + code base. With `datafusion-ffi` these packages do not need `datafusion-python` + as a dependency at all. +2. Users may want to create a modular interface that allows runtime loading of + libraries. + +## Struct Layout + +In this crate we have a variety of structs which closely mimic the behavior of +their internal counterparts. In the following example, we will refer to the +`TableProvider`, but the same pattern exists for other structs. + +Each of the exposted structs in this crate is provided with a variant prefixed +with `Foreign`. This variant is designed to be used by the consumer of the +foreign code. The `Foreign` structs should _never_ access the `private_data` +fields. Instead they should only access the data returned through the function +calls defined on the `FFI_` structs. The second purpose of the `Foreign` +structs is to contain additional data that may be needed by the traits that +are implemented on them. Some of these traits require borrowing data which +can be far more convienent to be locally stored. + +For example, we have a struct `FFI_TableProvider` to give access to the +`TableProvider` functions like `table_type()` and `scan()`. If we write a +library that wishes to expose it's `TableProvider`, then we can access the +private data that contains the Arc reference to the `TableProvider` via +`FFI_TableProvider`. This data is local to the library. + +If we have a program that accesses a `TableProvider` via FFI, then it +will use `ForeignTableProvider`. When using `ForeignTableProvider` we **must** +not attempt to access the `private_data` field in `FFI_TableProvider`. If a +user is testing locally, you may be able to successfully access this field, but +it will only work if you are building against the exact same version of +`DataFusion` for both libraries **and** the same compiler. It will not work +in general. + +It is worth noting that which library is the `local` and which is `foreign` +depends on which interface we are considering. For example, suppose we have a +Python library called `my_provider` that exposes a `TableProvider` called +`MyProvider` via `FFI_TableProvider`. Within the library `my_provider` we can +access the `private_data` via `FFI_TableProvider`. We connect this to +`datafusion-python`, where we access it as a `ForeignTableProvider`. Now when +we call `scan()` on this interface, we have to pass it a `FFI_SessionConfig`. +The `SessionConfig` is local to `datafusion-python` and **not** `my_provider`. +It is important to be careful when expanding these functions to be certain which +side of the interface each object refers to. + +[datafusion]: https://datafusion.apache.org +[api docs]: http://docs.rs/datafusion-ffi/latest diff --git a/datafusion/ffi/src/arrow_wrappers.rs b/datafusion/ffi/src/arrow_wrappers.rs new file mode 100644 index 000000000000..c5add8782c51 --- /dev/null +++ b/datafusion/ffi/src/arrow_wrappers.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use abi_stable::StableAbi; +use arrow::{ + datatypes::{Schema, SchemaRef}, + ffi::{FFI_ArrowArray, FFI_ArrowSchema}, +}; +use log::error; + +/// This is a wrapper struct around FFI_ArrowSchema simply to indicate +/// to the StableAbi macros that the underlying struct is FFI safe. +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct WrappedSchema(#[sabi(unsafe_opaque_field)] pub FFI_ArrowSchema); + +impl From for WrappedSchema { + fn from(value: SchemaRef) -> Self { + let ffi_schema = match FFI_ArrowSchema::try_from(value.as_ref()) { + Ok(s) => s, + Err(e) => { + error!("Unable to convert DataFusion Schema to FFI_ArrowSchema in FFI_PlanProperties. {}", e); + FFI_ArrowSchema::empty() + } + }; + + WrappedSchema(ffi_schema) + } +} + +impl From for SchemaRef { + fn from(value: WrappedSchema) -> Self { + let schema = match Schema::try_from(&value.0) { + Ok(s) => s, + Err(e) => { + error!("Unable to convert from FFI_ArrowSchema to DataFusion Schema in FFI_PlanProperties. {}", e); + Schema::empty() + } + }; + Arc::new(schema) + } +} + +/// This is a wrapper struct for FFI_ArrowArray to indicate to StableAbi +/// that the struct is FFI Safe. For convenience, we also include the +/// schema needed to create a record batch from the array. +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct WrappedArray { + #[sabi(unsafe_opaque_field)] + pub array: FFI_ArrowArray, + + pub schema: WrappedSchema, +} diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs new file mode 100644 index 000000000000..d10eda8990b8 --- /dev/null +++ b/datafusion/ffi/src/execution_plan.rs @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ffi::c_void, pin::Pin, sync::Arc}; + +use abi_stable::{ + std_types::{RResult, RString, RVec}, + StableAbi, +}; +use datafusion::error::Result; +use datafusion::{ + error::DataFusionError, + execution::{SendableRecordBatchStream, TaskContext}, + physical_plan::{DisplayAs, ExecutionPlan, PlanProperties}, +}; + +use crate::{ + plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream, +}; + +/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_ExecutionPlan { + /// Return the plan properties + pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties, + + /// Return a vector of children plans + pub children: unsafe extern "C" fn(plan: &Self) -> RVec, + + /// Return the plan name. + pub name: unsafe extern "C" fn(plan: &Self) -> RString, + + /// Execute the plan and return a record batch stream. Errors + /// will be returned as a string. + pub execute: unsafe extern "C" fn( + plan: &Self, + partition: usize, + ) -> RResult, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignExecutionPlan`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_ExecutionPlan {} +unsafe impl Sync for FFI_ExecutionPlan {} + +pub struct ExecutionPlanPrivateData { + pub plan: Arc, + pub context: Arc, +} + +unsafe extern "C" fn properties_fn_wrapper( + plan: &FFI_ExecutionPlan, +) -> FFI_PlanProperties { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan = &(*private_data).plan; + + plan.properties().into() +} + +unsafe extern "C" fn children_fn_wrapper( + plan: &FFI_ExecutionPlan, +) -> RVec { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan = &(*private_data).plan; + let ctx = &(*private_data).context; + + let children: Vec<_> = plan + .children() + .into_iter() + .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx))) + .collect(); + + children.into() +} + +unsafe extern "C" fn execute_fn_wrapper( + plan: &FFI_ExecutionPlan, + partition: usize, +) -> RResult { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan = &(*private_data).plan; + let ctx = &(*private_data).context; + + match plan.execute(partition, Arc::clone(ctx)) { + Ok(rbs) => RResult::ROk(rbs.into()), + Err(e) => RResult::RErr( + format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(), + ), + } +} +unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan = &(*private_data).plan; + + plan.name().into() +} + +unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { + let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData); + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan_data = &(*private_data); + + FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), Arc::clone(&plan_data.context)) +} + +impl Clone for FFI_ExecutionPlan { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl FFI_ExecutionPlan { + /// This function is called on the provider's side. + pub fn new(plan: Arc, context: Arc) -> Self { + let private_data = Box::new(ExecutionPlanPrivateData { plan, context }); + + Self { + properties: properties_fn_wrapper, + children: children_fn_wrapper, + name: name_fn_wrapper, + execute: execute_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + } + } +} + +impl Drop for FFI_ExecutionPlan { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +/// This struct is used to access an execution plan provided by a foreign +/// library across a FFI boundary. +/// +/// The ForeignExecutionPlan is to be used by the caller of the plan, so it has +/// no knowledge or access to the private data. All interaction with the plan +/// must occur through the functions defined in FFI_ExecutionPlan. +#[derive(Debug)] +pub struct ForeignExecutionPlan { + name: String, + plan: FFI_ExecutionPlan, + properties: PlanProperties, + children: Vec>, +} + +unsafe impl Send for ForeignExecutionPlan {} +unsafe impl Sync for ForeignExecutionPlan {} + +impl DisplayAs for ForeignExecutionPlan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!( + f, + "FFI_ExecutionPlan(number_of_children={})", + self.children.len(), + ) + } +} + +impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan { + type Error = DataFusionError; + + fn try_from(plan: &FFI_ExecutionPlan) -> Result { + unsafe { + let name = (plan.name)(plan).into(); + + let properties: PlanProperties = (plan.properties)(plan).try_into()?; + + let children_rvec = (plan.children)(plan); + let children: Result> = children_rvec + .iter() + .map(ForeignExecutionPlan::try_from) + .map(|child| child.map(|c| Arc::new(c) as Arc)) + .collect(); + + Ok(Self { + name, + plan: plan.clone(), + properties, + children: children?, + }) + } + } +} + +impl ExecutionPlan for ForeignExecutionPlan { + fn name(&self) -> &str { + &self.name + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + self.children + .iter() + .map(|p| p as &Arc) + .collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(ForeignExecutionPlan { + plan: self.plan.clone(), + name: self.name.clone(), + children, + properties: self.properties.clone(), + })) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + unsafe { + match (self.plan.execute)(&self.plan, partition) { + RResult::ROk(stream) => { + let stream = Pin::new(Box::new(stream)) as SendableRecordBatchStream; + Ok(stream) + } + RResult::RErr(e) => Err(DataFusionError::Execution(format!( + "Error occurred during FFI call to FFI_ExecutionPlan execute. {}", + e + ))), + } + } + } +} + +#[cfg(test)] +mod tests { + use datafusion::{physical_plan::Partitioning, prelude::SessionContext}; + + use super::*; + + #[derive(Debug)] + pub struct EmptyExec { + props: PlanProperties, + } + + impl EmptyExec { + pub fn new(schema: arrow::datatypes::SchemaRef) -> Self { + Self { + props: PlanProperties::new( + datafusion::physical_expr::EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(3), + datafusion::physical_plan::ExecutionMode::Unbounded, + ), + } + } + } + + impl DisplayAs for EmptyExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + _f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + unimplemented!() + } + } + + impl ExecutionPlan for EmptyExec { + fn name(&self) -> &'static str { + "empty-exec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.props + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + unimplemented!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + + fn statistics(&self) -> Result { + unimplemented!() + } + } + + #[test] + fn test_round_trip_ffi_execution_plan() -> Result<()> { + use arrow::datatypes::{DataType, Field, Schema}; + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + let ctx = SessionContext::new(); + + let original_plan = Arc::new(EmptyExec::new(schema)); + let original_name = original_plan.name().to_string(); + + let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx()); + + let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?; + + assert!(original_name == foreign_plan.name()); + + Ok(()) + } +} diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs new file mode 100644 index 000000000000..4a74e65dc671 --- /dev/null +++ b/datafusion/ffi/src/lib.rs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143 +#![deny(clippy::clone_on_ref_ptr)] + +pub mod arrow_wrappers; +pub mod execution_plan; +pub mod plan_properties; +pub mod record_batch_stream; +pub mod session_config; +pub mod table_provider; +pub mod table_source; + +#[cfg(doctest)] +doc_comment::doctest!("../README.md", readme_example_test); diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs new file mode 100644 index 000000000000..722681ae4a1d --- /dev/null +++ b/datafusion/ffi/src/plan_properties.rs @@ -0,0 +1,297 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ffi::c_void, sync::Arc}; + +use abi_stable::{ + std_types::{ + RResult::{self, RErr, ROk}, + RStr, RVec, + }, + StableAbi, +}; +use arrow::datatypes::SchemaRef; +use datafusion::{ + error::{DataFusionError, Result}, + physical_expr::EquivalenceProperties, + physical_plan::{ExecutionMode, PlanProperties}, + prelude::SessionContext, +}; +use datafusion_proto::{ + physical_plan::{ + from_proto::{parse_physical_sort_exprs, parse_protobuf_partitioning}, + to_proto::{serialize_partitioning, serialize_physical_sort_exprs}, + DefaultPhysicalExtensionCodec, + }, + protobuf::{Partitioning, PhysicalSortExprNodeCollection}, +}; +use prost::Message; + +use crate::arrow_wrappers::WrappedSchema; + +/// A stable struct for sharing [`PlanProperties`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_PlanProperties { + /// The output partitioning is a [`Partitioning`] protobuf message serialized + /// into bytes to pass across the FFI boundary. + pub output_partitioning: + unsafe extern "C" fn(plan: &Self) -> RResult, RStr<'static>>, + + /// Return the execution mode of the plan. + pub execution_mode: unsafe extern "C" fn(plan: &Self) -> FFI_ExecutionMode, + + /// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message + /// serialized into bytes to pass across the FFI boundary. + pub output_ordering: + unsafe extern "C" fn(plan: &Self) -> RResult, RStr<'static>>, + + /// Return the schema of the plan. + pub schema: unsafe extern "C" fn(plan: &Self) -> WrappedSchema, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Internal data. This is only to be accessed by the provider of the plan. + /// The foreign library should never attempt to access this data. + pub private_data: *mut c_void, +} + +struct PlanPropertiesPrivateData { + props: PlanProperties, +} + +unsafe extern "C" fn output_partitioning_fn_wrapper( + properties: &FFI_PlanProperties, +) -> RResult, RStr<'static>> { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + + let codec = DefaultPhysicalExtensionCodec {}; + let partitioning_data = + match serialize_partitioning(props.output_partitioning(), &codec) { + Ok(p) => p, + Err(_) => { + return RErr( + "unable to serialize output_partitioning in FFI_PlanProperties" + .into(), + ) + } + }; + let output_partitioning = partitioning_data.encode_to_vec(); + + ROk(output_partitioning.into()) +} + +unsafe extern "C" fn execution_mode_fn_wrapper( + properties: &FFI_PlanProperties, +) -> FFI_ExecutionMode { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + props.execution_mode().into() +} + +unsafe extern "C" fn output_ordering_fn_wrapper( + properties: &FFI_PlanProperties, +) -> RResult, RStr<'static>> { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + + let codec = DefaultPhysicalExtensionCodec {}; + let output_ordering = + match props.output_ordering() { + Some(ordering) => { + let physical_sort_expr_nodes = + match serialize_physical_sort_exprs(ordering.to_owned(), &codec) { + Ok(v) => v, + Err(_) => return RErr( + "unable to serialize output_ordering in FFI_PlanProperties" + .into(), + ), + }; + + let ordering_data = PhysicalSortExprNodeCollection { + physical_sort_expr_nodes, + }; + + ordering_data.encode_to_vec() + } + None => Vec::default(), + }; + ROk(output_ordering.into()) +} + +unsafe extern "C" fn schema_fn_wrapper(properties: &FFI_PlanProperties) -> WrappedSchema { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + + let schema: SchemaRef = Arc::clone(props.eq_properties.schema()); + schema.into() +} + +unsafe extern "C" fn release_fn_wrapper(props: &mut FFI_PlanProperties) { + let private_data = + Box::from_raw(props.private_data as *mut PlanPropertiesPrivateData); + drop(private_data); +} + +impl Drop for FFI_PlanProperties { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl From<&PlanProperties> for FFI_PlanProperties { + fn from(props: &PlanProperties) -> Self { + let private_data = Box::new(PlanPropertiesPrivateData { + props: props.clone(), + }); + + FFI_PlanProperties { + output_partitioning: output_partitioning_fn_wrapper, + execution_mode: execution_mode_fn_wrapper, + output_ordering: output_ordering_fn_wrapper, + schema: schema_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + } + } +} + +impl TryFrom for PlanProperties { + type Error = DataFusionError; + + fn try_from(ffi_props: FFI_PlanProperties) -> Result { + let ffi_schema = unsafe { (ffi_props.schema)(&ffi_props) }; + let schema = (&ffi_schema.0).try_into()?; + + // TODO Extend FFI to get the registry and codex + let default_ctx = SessionContext::new(); + let codex = DefaultPhysicalExtensionCodec {}; + + let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) }; + let orderings = match ffi_orderings { + ROk(ordering_vec) => { + let proto_output_ordering = + PhysicalSortExprNodeCollection::decode(ordering_vec.as_ref()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + Some(parse_physical_sort_exprs( + &proto_output_ordering.physical_sort_expr_nodes, + &default_ctx, + &schema, + &codex, + )?) + } + RErr(e) => return Err(DataFusionError::Plan(e.to_string())), + }; + + let ffi_partitioning = unsafe { (ffi_props.output_partitioning)(&ffi_props) }; + let partitioning = match ffi_partitioning { + ROk(partitioning_vec) => { + let proto_output_partitioning = + Partitioning::decode(partitioning_vec.as_ref()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + parse_protobuf_partitioning( + Some(&proto_output_partitioning), + &default_ctx, + &schema, + &codex, + )? + .ok_or(DataFusionError::Plan( + "Unable to deserialize partitioning protobuf in FFI_PlanProperties" + .to_string(), + )) + } + RErr(e) => Err(DataFusionError::Plan(e.to_string())), + }?; + + let execution_mode: ExecutionMode = + unsafe { (ffi_props.execution_mode)(&ffi_props).into() }; + + let eq_properties = match orderings { + Some(ordering) => { + EquivalenceProperties::new_with_orderings(Arc::new(schema), &[ordering]) + } + None => EquivalenceProperties::new(Arc::new(schema)), + }; + + Ok(PlanProperties::new( + eq_properties, + partitioning, + execution_mode, + )) + } +} + +/// FFI safe version of [`ExecutionMode`]. +#[repr(C)] +#[allow(non_camel_case_types)] +#[derive(Clone, StableAbi)] +pub enum FFI_ExecutionMode { + Bounded, + Unbounded, + PipelineBreaking, +} + +impl From for FFI_ExecutionMode { + fn from(value: ExecutionMode) -> Self { + match value { + ExecutionMode::Bounded => FFI_ExecutionMode::Bounded, + ExecutionMode::Unbounded => FFI_ExecutionMode::Unbounded, + ExecutionMode::PipelineBreaking => FFI_ExecutionMode::PipelineBreaking, + } + } +} + +impl From for ExecutionMode { + fn from(value: FFI_ExecutionMode) -> Self { + match value { + FFI_ExecutionMode::Bounded => ExecutionMode::Bounded, + FFI_ExecutionMode::Unbounded => ExecutionMode::Unbounded, + FFI_ExecutionMode::PipelineBreaking => ExecutionMode::PipelineBreaking, + } + } +} + +#[cfg(test)] +mod tests { + use datafusion::physical_plan::Partitioning; + + use super::*; + + #[test] + fn test_round_trip_ffi_plan_properties() -> Result<()> { + use arrow::datatypes::{DataType, Field, Schema}; + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + + let original_props = PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(3), + ExecutionMode::Unbounded, + ); + + let local_props_ptr = FFI_PlanProperties::from(&original_props); + + let foreign_props: PlanProperties = local_props_ptr.try_into()?; + + assert!(format!("{:?}", foreign_props) == format!("{:?}", original_props)); + + Ok(()) + } +} diff --git a/datafusion/ffi/src/record_batch_stream.rs b/datafusion/ffi/src/record_batch_stream.rs new file mode 100644 index 000000000000..c944e56c5cde --- /dev/null +++ b/datafusion/ffi/src/record_batch_stream.rs @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ffi::c_void, task::Poll}; + +use abi_stable::{ + std_types::{ROption, RResult, RString}, + StableAbi, +}; +use arrow::array::{Array, RecordBatch}; +use arrow::{ + array::{make_array, StructArray}, + ffi::{from_ffi, to_ffi}, +}; +use async_ffi::{ContextExt, FfiContext, FfiPoll}; +use datafusion::error::Result; +use datafusion::{ + error::DataFusionError, + execution::{RecordBatchStream, SendableRecordBatchStream}, +}; +use futures::{Stream, TryStreamExt}; + +use crate::arrow_wrappers::{WrappedArray, WrappedSchema}; + +/// A stable struct for sharing [`RecordBatchStream`] across FFI boundaries. +/// We use the async-ffi crate for handling async calls across libraries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_RecordBatchStream { + /// This mirrors the `poll_next` of [`RecordBatchStream`] but does so + /// in a FFI safe manner. + pub poll_next: + unsafe extern "C" fn( + stream: &Self, + cx: &mut FfiContext, + ) -> FfiPoll>>, + + /// Return the schema of the record batch + pub schema: unsafe extern "C" fn(stream: &Self) -> WrappedSchema, + + /// Internal data. This is only to be accessed by the provider of the plan. + /// The foreign library should never attempt to access this data. + pub private_data: *mut c_void, +} + +impl From for FFI_RecordBatchStream { + fn from(stream: SendableRecordBatchStream) -> Self { + FFI_RecordBatchStream { + poll_next: poll_next_fn_wrapper, + schema: schema_fn_wrapper, + private_data: Box::into_raw(Box::new(stream)) as *mut c_void, + } + } +} + +unsafe impl Send for FFI_RecordBatchStream {} + +unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> WrappedSchema { + let stream = stream.private_data as *const SendableRecordBatchStream; + + (*stream).schema().into() +} + +fn record_batch_to_wrapped_array( + record_batch: RecordBatch, +) -> RResult { + let struct_array = StructArray::from(record_batch); + match to_ffi(&struct_array.to_data()) { + Ok((array, schema)) => RResult::ROk(WrappedArray { + array, + schema: WrappedSchema(schema), + }), + Err(e) => RResult::RErr(e.to_string().into()), + } +} + +// probably want to use pub unsafe fn from_ffi(array: FFI_ArrowArray, schema: &FFI_ArrowSchema) -> Result { +fn maybe_record_batch_to_wrapped_stream( + record_batch: Option>, +) -> ROption> { + match record_batch { + Some(Ok(record_batch)) => { + ROption::RSome(record_batch_to_wrapped_array(record_batch)) + } + Some(Err(e)) => ROption::RSome(RResult::RErr(e.to_string().into())), + None => ROption::RNone, + } +} + +unsafe extern "C" fn poll_next_fn_wrapper( + stream: &FFI_RecordBatchStream, + cx: &mut FfiContext, +) -> FfiPoll>> { + let stream = stream.private_data as *mut SendableRecordBatchStream; + + let poll_result = cx.with_context(|std_cx| { + (*stream) + .try_poll_next_unpin(std_cx) + .map(maybe_record_batch_to_wrapped_stream) + }); + + poll_result.into() +} + +impl RecordBatchStream for FFI_RecordBatchStream { + fn schema(&self) -> arrow::datatypes::SchemaRef { + let wrapped_schema = unsafe { (self.schema)(self) }; + wrapped_schema.into() + } +} + +fn wrapped_array_to_record_batch(array: WrappedArray) -> Result { + let array_data = + unsafe { from_ffi(array.array, &array.schema.0).map_err(DataFusionError::from)? }; + let array = make_array(array_data); + let struct_array = array + .as_any() + .downcast_ref::() + .ok_or(DataFusionError::Execution( + "Unexpected array type during record batch collection in FFI_RecordBatchStream" + .to_string(), + ))?; + + Ok(struct_array.into()) +} + +fn maybe_wrapped_array_to_record_batch( + array: ROption>, +) -> Option> { + match array { + ROption::RSome(RResult::ROk(wrapped_array)) => { + Some(wrapped_array_to_record_batch(wrapped_array)) + } + ROption::RSome(RResult::RErr(e)) => { + Some(Err(DataFusionError::Execution(e.to_string()))) + } + ROption::RNone => None, + } +} + +impl Stream for FFI_RecordBatchStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let poll_result = + unsafe { cx.with_ffi_context(|ffi_cx| (self.poll_next)(&self, ffi_cx)) }; + + match poll_result { + FfiPoll::Ready(array) => { + Poll::Ready(maybe_wrapped_array_to_record_batch(array)) + } + FfiPoll::Pending => Poll::Pending, + FfiPoll::Panicked => Poll::Ready(Some(Err(DataFusionError::Execution( + "Error occurred during poll_next on FFI_RecordBatchStream".to_string(), + )))), + } + } +} diff --git a/datafusion/ffi/src/session_config.rs b/datafusion/ffi/src/session_config.rs new file mode 100644 index 000000000000..aea03cf94e0a --- /dev/null +++ b/datafusion/ffi/src/session_config.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + collections::HashMap, + ffi::{c_char, c_void, CString}, +}; + +use abi_stable::{ + std_types::{RHashMap, RString}, + StableAbi, +}; +use datafusion::{config::ConfigOptions, error::Result}; +use datafusion::{error::DataFusionError, prelude::SessionConfig}; + +/// A stable struct for sharing [`SessionConfig`] across FFI boundaries. +/// Instead of attempting to expose the entire SessionConfig interface, we +/// convert the config options into a map from a string to string and pass +/// those values across the FFI boundary. On the receiver side, we +/// reconstruct a SessionConfig from those values. +/// +/// It is possible that using different versions of DataFusion across the +/// FFI boundary could have differing expectations of the config options. +/// This is a limitation of this approach, but exposing the entire +/// SessionConfig via a FFI interface would be extensive and provide limited +/// value over this version. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_SessionConfig { + /// Return a hash map from key to value of the config options represented + /// by string values. + pub config_options: unsafe extern "C" fn(config: &Self) -> RHashMap, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignSessionConfig`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_SessionConfig {} +unsafe impl Sync for FFI_SessionConfig {} + +unsafe extern "C" fn config_options_fn_wrapper( + config: &FFI_SessionConfig, +) -> RHashMap { + let private_data = config.private_data as *mut SessionConfigPrivateData; + let config_options = &(*private_data).config; + + let mut options = RHashMap::default(); + for config_entry in config_options.entries() { + if let Some(value) = config_entry.value { + options.insert(config_entry.key.into(), value.into()); + } + } + + options +} + +unsafe extern "C" fn release_fn_wrapper(config: &mut FFI_SessionConfig) { + let private_data = + Box::from_raw(config.private_data as *mut SessionConfigPrivateData); + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper(config: &FFI_SessionConfig) -> FFI_SessionConfig { + let old_private_data = config.private_data as *mut SessionConfigPrivateData; + let old_config = &(*old_private_data).config; + + let private_data = Box::new(SessionConfigPrivateData { + config: old_config.clone(), + }); + + FFI_SessionConfig { + config_options: config_options_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + } +} + +struct SessionConfigPrivateData { + pub config: ConfigOptions, +} + +impl From<&SessionConfig> for FFI_SessionConfig { + fn from(session: &SessionConfig) -> Self { + let mut config_keys = Vec::new(); + let mut config_values = Vec::new(); + for config_entry in session.options().entries() { + if let Some(value) = config_entry.value { + let key_cstr = CString::new(config_entry.key).unwrap_or_default(); + let key_ptr = key_cstr.into_raw() as *const c_char; + config_keys.push(key_ptr); + + config_values + .push(CString::new(value).unwrap_or_default().into_raw() + as *const c_char); + } + } + + let private_data = Box::new(SessionConfigPrivateData { + config: session.options().clone(), + }); + + Self { + config_options: config_options_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + } + } +} + +impl Clone for FFI_SessionConfig { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl Drop for FFI_SessionConfig { + fn drop(&mut self) { + unsafe { (self.release)(self) }; + } +} + +/// A wrapper struct for accessing [`SessionConfig`] across a FFI boundary. +/// The [`SessionConfig`] will be generated from a hash map of the config +/// options in the provider and will be reconstructed on this side of the +/// interface.s +pub struct ForeignSessionConfig(pub SessionConfig); + +impl TryFrom<&FFI_SessionConfig> for ForeignSessionConfig { + type Error = DataFusionError; + + fn try_from(config: &FFI_SessionConfig) -> Result { + let config_options = unsafe { (config.config_options)(config) }; + + let mut options_map = HashMap::new(); + config_options.iter().for_each(|kv_pair| { + options_map.insert(kv_pair.0.to_string(), kv_pair.1.to_string()); + }); + + Ok(Self(SessionConfig::from_string_hash_map(&options_map)?)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_round_trip_ffi_session_config() -> Result<()> { + let session_config = SessionConfig::new(); + let original_options = session_config.options().entries(); + + let ffi_config: FFI_SessionConfig = (&session_config).into(); + + let foreign_config: ForeignSessionConfig = (&ffi_config).try_into()?; + + let returned_options = foreign_config.0.options().entries(); + + assert!(original_options.len() == returned_options.len()); + + Ok(()) + } +} diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs new file mode 100644 index 000000000000..011ad96e423d --- /dev/null +++ b/datafusion/ffi/src/table_provider.rs @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, ffi::c_void, sync::Arc}; + +use abi_stable::{ + std_types::{ROption, RResult, RString, RVec}, + StableAbi, +}; +use arrow::datatypes::SchemaRef; +use async_ffi::{FfiFuture, FutureExt}; +use async_trait::async_trait; +use datafusion::{ + catalog::{Session, TableProvider}, + datasource::TableType, + error::DataFusionError, + execution::session_state::SessionStateBuilder, + logical_expr::TableProviderFilterPushDown, + physical_plan::ExecutionPlan, + prelude::{Expr, SessionContext}, +}; +use datafusion_proto::{ + logical_plan::{ + from_proto::parse_exprs, to_proto::serialize_exprs, DefaultLogicalExtensionCodec, + }, + protobuf::LogicalExprList, +}; +use prost::Message; + +use crate::{ + arrow_wrappers::WrappedSchema, + session_config::ForeignSessionConfig, + table_source::{FFI_TableProviderFilterPushDown, FFI_TableType}, +}; + +use super::{ + execution_plan::{FFI_ExecutionPlan, ForeignExecutionPlan}, + session_config::FFI_SessionConfig, +}; +use datafusion::error::Result; + +/// A stable struct for sharing [`TableProvider`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_TableProvider { + /// Return the table schema + pub schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema, + + /// Perform a scan on the table. See [`TableProvider`] for detailed usage information. + /// + /// # Arguments + /// + /// * `provider` - the table provider + /// * `session_config` - session configuration + /// * `projections` - if specified, only a subset of the columns are returned + /// * `filters_serialized` - filters to apply to the scan, which are a + /// [`LogicalExprList`] protobuf message serialized into bytes to pass + /// across the FFI boundary. + /// * `limit` - if specified, limit the number of rows returned + pub scan: unsafe extern "C" fn( + provider: &Self, + session_config: &FFI_SessionConfig, + projections: RVec, + filters_serialized: RVec, + limit: ROption, + ) -> FfiFuture>, + + /// Return the type of table. See [`TableType`] for options. + pub table_type: unsafe extern "C" fn(provider: &Self) -> FFI_TableType, + + /// Based upon the input filters, identify which are supported. The filters + /// are a [`LogicalExprList`] protobuf message serialized into bytes to pass + /// across the FFI boundary. + pub supports_filters_pushdown: Option< + unsafe extern "C" fn( + provider: &FFI_TableProvider, + filters_serialized: RVec, + ) + -> RResult, RString>, + >, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignExecutionPlan`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_TableProvider {} +unsafe impl Sync for FFI_TableProvider {} + +struct ProviderPrivateData { + provider: Arc, +} + +unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema { + let private_data = provider.private_data as *const ProviderPrivateData; + let provider = &(*private_data).provider; + + provider.schema().into() +} + +unsafe extern "C" fn table_type_fn_wrapper( + provider: &FFI_TableProvider, +) -> FFI_TableType { + let private_data = provider.private_data as *const ProviderPrivateData; + let provider = &(*private_data).provider; + + provider.table_type().into() +} + +fn supports_filters_pushdown_internal( + provider: &Arc, + filters_serialized: &[u8], +) -> Result> { + let default_ctx = SessionContext::new(); + let codec = DefaultLogicalExtensionCodec {}; + + let filters = match filters_serialized.is_empty() { + true => vec![], + false => { + let proto_filters = LogicalExprList::decode(filters_serialized) + .map_err(|e| DataFusionError::Plan(e.to_string()))?; + + parse_exprs(proto_filters.expr.iter(), &default_ctx, &codec)? + } + }; + let filters_borrowed: Vec<&Expr> = filters.iter().collect(); + + let results: RVec<_> = provider + .supports_filters_pushdown(&filters_borrowed)? + .iter() + .map(|v| v.into()) + .collect(); + + Ok(results) +} + +unsafe extern "C" fn supports_filters_pushdown_fn_wrapper( + provider: &FFI_TableProvider, + filters_serialized: RVec, +) -> RResult, RString> { + let private_data = provider.private_data as *const ProviderPrivateData; + let provider = &(*private_data).provider; + + supports_filters_pushdown_internal(provider, &filters_serialized) + .map_err(|e| e.to_string().into()) + .into() +} + +unsafe extern "C" fn scan_fn_wrapper( + provider: &FFI_TableProvider, + session_config: &FFI_SessionConfig, + projections: RVec, + filters_serialized: RVec, + limit: ROption, +) -> FfiFuture> { + let private_data = provider.private_data as *mut ProviderPrivateData; + let internal_provider = &(*private_data).provider; + let session_config = session_config.clone(); + + async move { + let config = match ForeignSessionConfig::try_from(&session_config) { + Ok(c) => c, + Err(e) => return RResult::RErr(e.to_string().into()), + }; + let session = SessionStateBuilder::new() + .with_default_features() + .with_config(config.0) + .build(); + let ctx = SessionContext::new_with_state(session); + + let filters = match filters_serialized.is_empty() { + true => vec![], + false => { + let default_ctx = SessionContext::new(); + let codec = DefaultLogicalExtensionCodec {}; + + let proto_filters = + match LogicalExprList::decode(filters_serialized.as_ref()) { + Ok(f) => f, + Err(e) => return RResult::RErr(e.to_string().into()), + }; + + match parse_exprs(proto_filters.expr.iter(), &default_ctx, &codec) { + Ok(f) => f, + Err(e) => return RResult::RErr(e.to_string().into()), + } + } + }; + + let projections: Vec<_> = projections.into_iter().collect(); + let maybe_projections = match projections.is_empty() { + true => None, + false => Some(&projections), + }; + + let plan = match internal_provider + .scan(&ctx.state(), maybe_projections, &filters, limit.into()) + .await + { + Ok(p) => p, + Err(e) => return RResult::RErr(e.to_string().into()), + }; + + RResult::ROk(FFI_ExecutionPlan::new(plan, ctx.task_ctx())) + } + .into_ffi() +} + +unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) { + let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider { + let old_private_data = provider.private_data as *const ProviderPrivateData; + + let private_data = Box::into_raw(Box::new(ProviderPrivateData { + provider: Arc::clone(&(*old_private_data).provider), + })) as *mut c_void; + + FFI_TableProvider { + schema: schema_fn_wrapper, + scan: scan_fn_wrapper, + table_type: table_type_fn_wrapper, + supports_filters_pushdown: provider.supports_filters_pushdown, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + private_data, + } +} + +impl Drop for FFI_TableProvider { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl FFI_TableProvider { + /// Creates a new [`FFI_TableProvider`]. + pub fn new( + provider: Arc, + can_support_pushdown_filters: bool, + ) -> Self { + let private_data = Box::new(ProviderPrivateData { provider }); + + Self { + schema: schema_fn_wrapper, + scan: scan_fn_wrapper, + table_type: table_type_fn_wrapper, + supports_filters_pushdown: match can_support_pushdown_filters { + true => Some(supports_filters_pushdown_fn_wrapper), + false => None, + }, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + } + } +} + +/// This wrapper struct exists on the reciever side of the FFI interface, so it has +/// no guarantees about being able to access the data in `private_data`. Any functions +/// defined on this struct must only use the stable functions provided in +/// FFI_TableProvider to interact with the foreign table provider. +#[derive(Debug)] +pub struct ForeignTableProvider(FFI_TableProvider); + +unsafe impl Send for ForeignTableProvider {} +unsafe impl Sync for ForeignTableProvider {} + +impl From<&FFI_TableProvider> for ForeignTableProvider { + fn from(provider: &FFI_TableProvider) -> Self { + Self(provider.clone()) + } +} + +impl Clone for FFI_TableProvider { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +#[async_trait] +impl TableProvider for ForeignTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + let wrapped_schema = unsafe { (self.0.schema)(&self.0) }; + wrapped_schema.into() + } + + fn table_type(&self) -> TableType { + unsafe { (self.0.table_type)(&self.0).into() } + } + + async fn scan( + &self, + session: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let session_config: FFI_SessionConfig = session.config().into(); + + let projections: Option> = + projection.map(|p| p.iter().map(|v| v.to_owned()).collect()); + + let codec = DefaultLogicalExtensionCodec {}; + let filter_list = LogicalExprList { + expr: serialize_exprs(filters, &codec)?, + }; + let filters_serialized = filter_list.encode_to_vec().into(); + + let plan = unsafe { + let maybe_plan = (self.0.scan)( + &self.0, + &session_config, + projections.unwrap_or_default(), + filters_serialized, + limit.into(), + ) + .await; + + match maybe_plan { + RResult::ROk(p) => ForeignExecutionPlan::try_from(&p)?, + RResult::RErr(_) => { + return Err(DataFusionError::Internal( + "Unable to perform scan via FFI".to_string(), + )) + } + } + }; + + Ok(Arc::new(plan)) + } + + /// Tests whether the table provider can make use of a filter expression + /// to optimise data retrieval. + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + unsafe { + let pushdown_fn = match self.0.supports_filters_pushdown { + Some(func) => func, + None => { + return Ok(vec![ + TableProviderFilterPushDown::Unsupported; + filters.len() + ]) + } + }; + + let codec = DefaultLogicalExtensionCodec {}; + + let expr_list = LogicalExprList { + expr: serialize_exprs(filters.iter().map(|f| f.to_owned()), &codec)?, + }; + let serialized_filters = expr_list.encode_to_vec(); + + let pushdowns = pushdown_fn(&self.0, serialized_filters.into()); + + match pushdowns { + RResult::ROk(p) => Ok(p.iter().map(|v| v.into()).collect()), + RResult::RErr(e) => Err(DataFusionError::Plan(e.to_string())), + } + } + } +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::Schema; + use datafusion::prelude::{col, lit}; + + use super::*; + + #[tokio::test] + async fn test_round_trip_ffi_table_provider() -> Result<()> { + use arrow::datatypes::Field; + use datafusion::arrow::{ + array::Float32Array, datatypes::DataType, record_batch::RecordBatch, + }; + use datafusion::datasource::MemTable; + + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + + // define data in two partitions + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))], + )?; + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Float32Array::from(vec![64.0]))], + )?; + + let ctx = SessionContext::new(); + + let provider = + Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?); + + let ffi_provider = FFI_TableProvider::new(provider, true); + + let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into(); + + ctx.register_table("t", Arc::new(foreign_table_provider))?; + + let df = ctx.table("t").await?; + + df.select(vec![col("a")])? + .filter(col("a").gt(lit(3.0)))? + .show() + .await?; + + Ok(()) + } +} diff --git a/datafusion/ffi/src/table_source.rs b/datafusion/ffi/src/table_source.rs new file mode 100644 index 000000000000..a59836622ee6 --- /dev/null +++ b/datafusion/ffi/src/table_source.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use abi_stable::StableAbi; +use datafusion::{datasource::TableType, logical_expr::TableProviderFilterPushDown}; + +/// FFI safe version of [`TableProviderFilterPushDown`]. +#[repr(C)] +#[derive(StableAbi)] +#[allow(non_camel_case_types)] +pub enum FFI_TableProviderFilterPushDown { + Unsupported, + Inexact, + Exact, +} + +impl From<&FFI_TableProviderFilterPushDown> for TableProviderFilterPushDown { + fn from(value: &FFI_TableProviderFilterPushDown) -> Self { + match value { + FFI_TableProviderFilterPushDown::Unsupported => { + TableProviderFilterPushDown::Unsupported + } + FFI_TableProviderFilterPushDown::Inexact => { + TableProviderFilterPushDown::Inexact + } + FFI_TableProviderFilterPushDown::Exact => TableProviderFilterPushDown::Exact, + } + } +} + +impl From<&TableProviderFilterPushDown> for FFI_TableProviderFilterPushDown { + fn from(value: &TableProviderFilterPushDown) -> Self { + match value { + TableProviderFilterPushDown::Unsupported => { + FFI_TableProviderFilterPushDown::Unsupported + } + TableProviderFilterPushDown::Inexact => { + FFI_TableProviderFilterPushDown::Inexact + } + TableProviderFilterPushDown::Exact => FFI_TableProviderFilterPushDown::Exact, + } + } +} + +/// FFI safe version of [`TableType`]. +#[repr(C)] +#[allow(non_camel_case_types)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, StableAbi)] +pub enum FFI_TableType { + Base, + View, + Temporary, +} + +impl From for TableType { + fn from(value: FFI_TableType) -> Self { + match value { + FFI_TableType::Base => TableType::Base, + FFI_TableType::View => TableType::View, + FFI_TableType::Temporary => TableType::Temporary, + } + } +} + +impl From for FFI_TableType { + fn from(value: TableType) -> Self { + match value { + TableType::Base => FFI_TableType::Base, + TableType::View => FFI_TableType::View, + TableType::Temporary => FFI_TableType::Temporary, + } + } +}