Skip to content

Commit

Permalink
implement direct query for s3 and gcs (#9199)
Browse files Browse the repository at this point in the history
Signed-off-by: Nikolay Ulmasov <[email protected]>
r3stl355 authored Feb 13, 2024
1 parent 2615d1b commit bd1c76c
Showing 4 changed files with 156 additions and 84 deletions.
124 changes: 91 additions & 33 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::object_storage::get_object_store;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
@@ -24,10 +25,9 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use object_store::http::HttpBuilder;
use object_store::ObjectStore;
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use url::Url;

@@ -155,27 +155,24 @@ impl SchemaProvider for DynamicFileSchemaProvider {
// that name, try to treat it as a listing table
let state = self.state.upgrade()?.read().clone();
let table_url = ListingTableUrl::parse(name).ok()?;

// Assure the `http` store for this url is registered if this
// is an `http(s)` listing
// TODO: support for other types, e.g. `s3`, may need to be added
match table_url.scheme() {
"http" | "https" => {
let url: &Url = table_url.as_ref();
match state.runtime_env().object_store_registry.get_store(url) {
Ok(_) => {}
Err(_) => {
let store = Arc::new(
HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()
.ok()?,
) as Arc<dyn ObjectStore>;
state.runtime_env().register_object_store(url, store);
}
}
let url: &Url = table_url.as_ref();

// If the store is already registered for this URL then `get_store`
// will return `Ok` which means we don't need to register it again. However,
// if `get_store` returns an `Err` then it means the corresponding store is
// not registered yet and we need to register it
match state.runtime_env().object_store_registry.get_store(url) {
Ok(_) => { /*Nothing to do here, store for this URL is already registered*/ }
Err(_) => {
// Register the store for this URL. Here we don't have access
// to any command options so the only choice is to use an empty collection
let mut options = HashMap::new();
let store =
get_object_store(&state, &mut options, table_url.scheme(), url)
.await
.unwrap();
state.runtime_env().register_object_store(url, store);
}
_ => {}
}

let config = ListingTableConfig::new(table_url)
@@ -198,15 +195,10 @@ impl SchemaProvider for DynamicFileSchemaProvider {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::prelude::SessionContext;

#[tokio::test]
async fn query_http_location_test() -> Result<()> {
// Perhaps this could be changed to use an existing file but
// that will require a permanently availalble web resource
let domain = "example.com";
let location = format!("http://{domain}/file.parquet");

fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let mut ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
ctx.state().catalog_list(),
@@ -222,12 +214,23 @@ mod tests {
let schema = catalog
.schema(catalog.schema_names().first().unwrap())
.unwrap();
let none = schema.table(&location).await;
(ctx, schema)
}

// That's a non-existing location so expecting None here
assert!(none.is_none());
#[tokio::test]
async fn query_http_location_test() -> Result<()> {
// This is a unit test so not expecting a connection or a file to be
// available
let domain = "example.com";
let location = format!("http://{domain}/file.parquet");

// It should still create an object store for the location
let (ctx, schema) = setup_context();

// That's a non registered table so expecting None here
let table = schema.table(&location).await;
assert!(table.is_none());

// It should still create an object store for the location in the SessionState
let store = ctx
.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;
@@ -240,4 +243,59 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn query_s3_location_test() -> Result<()> {
let bucket = "examples3bucket";
let location = format!("s3://{bucket}/file.parquet");

let (ctx, schema) = setup_context();

let table = schema.table(&location).await;
assert!(table.is_none());

let store = ctx
.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;
assert_eq!(format!("{store}"), format!("AmazonS3({bucket})"));

// The store must be configured for this domain
let expected_bucket = format!("bucket: \"{bucket}\"");
assert!(format!("{store:?}").contains(&expected_bucket));

Ok(())
}

#[tokio::test]
async fn query_gs_location_test() -> Result<()> {
let bucket = "examplegsbucket";
let location = format!("gs://{bucket}/file.parquet");

let (ctx, schema) = setup_context();

let table = schema.table(&location).await;
assert!(table.is_none());

let store = ctx
.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;
assert_eq!(format!("{store}"), format!("GoogleCloudStorage({bucket})"));

// The store must be configured for this domain
let expected_bucket = format!("bucket_name_encoded: \"{bucket}\"");
assert!(format!("{store:?}").contains(&expected_bucket));

Ok(())
}

#[tokio::test]
#[should_panic]
async fn query_invalid_location_test() {
let location = "ts://file.parquet";
let (_ctx, schema) = setup_context();

// This will panic, we cannot prevent that because `schema.table`
// returns an Option
schema.table(location).await;
}
}
60 changes: 11 additions & 49 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
@@ -18,23 +18,20 @@
//! Execution functions
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
use std::{fs::File, sync::Arc};

use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
object_storage::{
get_gcs_object_store_builder, get_oss_object_store_builder,
get_s3_object_store_builder,
},
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};

use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::is_plan_streaming;
use datafusion::error::{DataFusionError, Result};
@@ -45,8 +42,6 @@ use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};

use datafusion::logical_expr::dml::CopyTo;
use datafusion::sql::parser::Statement;
use object_store::http::HttpBuilder;
use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
@@ -280,8 +275,13 @@ async fn register_object_store(
copy_to: &mut CopyTo,
) -> Result<(), DataFusionError> {
let url = ListingTableUrl::parse(copy_to.output_url.as_str())?;
let store =
get_object_store(ctx, &mut HashMap::new(), url.scheme(), url.as_ref()).await?;
let store = get_object_store(
&ctx.state(),
&mut HashMap::new(),
url.scheme(),
url.as_ref(),
)
.await?;
ctx.runtime_env().register_object_store(url.as_ref(), store);
Ok(())
}
@@ -295,50 +295,12 @@ async fn create_external_table(
let url: &Url = table_path.as_ref();

// registering the cloud object store dynamically using cmd.options
let store = get_object_store(ctx, &mut cmd.options, scheme, url).await?;

let store = get_object_store(&ctx.state(), &mut cmd.options, scheme, url).await?;
ctx.runtime_env().register_object_store(url, store);

Ok(())
}

async fn get_object_store(
ctx: &SessionContext,
options: &mut HashMap<String, String>,
scheme: &str,
url: &Url,
) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
let store = match scheme {
"s3" => {
let builder = get_s3_object_store_builder(url, options).await?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"oss" => {
let builder = get_oss_object_store_builder(url, options)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"gs" | "gcs" => {
let builder = get_gcs_object_store_builder(url, options)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"http" | "https" => Arc::new(
HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()?,
) as Arc<dyn ObjectStore>,
_ => {
// for other types, try to get from the object_store_registry
ctx.runtime_env()
.object_store_registry
.get_store(url)
.map_err(|_| {
exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
})?
}
};
Ok(store)
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
42 changes: 42 additions & 0 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
@@ -17,8 +17,12 @@

use async_trait::async_trait;
use aws_credential_types::provider::ProvideCredentials;
use datafusion::common::exec_datafusion_err;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use object_store::aws::AwsCredential;
use object_store::http::HttpBuilder;
use object_store::ObjectStore;
use object_store::{
aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, CredentialProvider,
};
@@ -156,6 +160,44 @@ fn get_bucket_name(url: &Url) -> Result<&str> {
})
}

pub(crate) async fn get_object_store(
state: &SessionState,
options: &mut HashMap<String, String>,
scheme: &str,
url: &Url,
) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
let store = match scheme {
"s3" => {
let builder = get_s3_object_store_builder(url, options).await?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"oss" => {
let builder = get_oss_object_store_builder(url, options)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"gs" | "gcs" => {
let builder = get_gcs_object_store_builder(url, options)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"http" | "https" => Arc::new(
HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()?,
) as Arc<dyn ObjectStore>,
_ => {
// for other types, try to get from the object_store_registry
state
.runtime_env()
.object_store_registry
.get_store(url)
.map_err(|_| {
exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
})?
}
};
Ok(store)
}

#[cfg(test)]
mod tests {
use super::*;
14 changes: 12 additions & 2 deletions docs/source/user-guide/cli.md
Original file line number Diff line number Diff line change
@@ -194,8 +194,9 @@ DataFusion CLI v16.0.0
2 rows in set. Query took 0.007 seconds.
```
You can also query directly from the remote location via HTTP(S) without
registering the location as a table
You can also query directly from any remote location supported by DataFusion without
registering the location as a table.
For example, to read from a remote parquet file via HTTP(S) you can use the following:
```sql
select count(*) from 'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'
@@ -207,6 +208,15 @@ select count(*) from 'https://datasets.clickhouse.com/hits_compatible/athena_par
1 row in set. Query took 0.595 seconds.
```
To read from an AWS S3 or GCS, use `s3` or `gs` as a protocol prefix. For example, this will read a file
in S3 bucket named `my-data-bucket`. Note that this is not a real file location and therefore the query
will fail, you need to use your own file location in S3. Also, you need to set the relevent access credentials
as environmental variables (e.g. for AWS S3 you need to at least `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`)
```sql
select count(*) from 's3://my-data-bucket/athena_partitioned/hits.parquet'
```
## Creating External Tables
It is also possible to create a table backed by files by explicitly

0 comments on commit bd1c76c

Please sign in to comment.