Skip to content

Commit

Permalink
Don't canonicalize ListingTableUrl (apache#7994)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 31, 2023
1 parent 0d4dc36 commit 72778b1
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 95 deletions.
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,14 +855,17 @@ impl TableProvider for ListingTable {
let input_partitions = input.output_partitioning().partition_count();
let writer_mode = match self.options.insert_mode {
ListingTableInsertMode::AppendToFile => {
if input_partitions > file_groups.len() {
if file_groups.is_empty() && self.options.single_file {
// This is a hack, longer term append should be split out (#7994)
crate::datasource::file_format::write::FileWriterMode::PutMultipart
} else if input_partitions > file_groups.len() {
return plan_err!(
"Cannot append {input_partitions} partitions to {} files!",
file_groups.len()
);
} else {
crate::datasource::file_format::write::FileWriterMode::Append
}

crate::datasource::file_format::write::FileWriterMode::Append
}
ListingTableInsertMode::AppendNewFiles => {
crate::datasource::file_format::write::FileWriterMode::PutMultipart
Expand Down
135 changes: 89 additions & 46 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fs;
use std::path::{Component, PathBuf};

use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
Expand Down Expand Up @@ -46,37 +46,49 @@ pub struct ListingTableUrl {
impl ListingTableUrl {
/// Parse a provided string as a `ListingTableUrl`
///
/// A URL can either refer to a single object, or a collection of objects with a
/// common prefix, with the presence of a trailing `/` indicating a collection.
///
/// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
/// `file:///foo/` refers to all the files under the directory `/foo` and its
/// subdirectories.
///
/// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
/// wherease `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
/// S3 bucket `BUCKET`
///
/// # Paths without a Scheme
///
/// If no scheme is provided, or the string is an absolute filesystem path
/// as determined [`std::path::Path::is_absolute`], the string will be
/// as determined by [`std::path::Path::is_absolute`], the string will be
/// interpreted as a path on the local filesystem using the operating
/// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
///
/// If the path contains any of `'?', '*', '['`, it will be considered
/// a glob expression and resolved as described in the section below.
///
/// Otherwise, the path will be resolved to an absolute path, returning
/// an error if it does not exist, and converted to a [file URI]
/// Otherwise, the path will be resolved to an absolute path based on the current
/// working directory, and converted to a [file URI].
///
/// If you wish to specify a path that does not exist on the local
/// machine you must provide it as a fully-qualified [file URI]
/// e.g. `file:///myfile.txt`
/// If the path already exists in the local filesystem this will be used to determine if this
/// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
/// of a trailing path delimiter will be used to indicate a directory. For the avoidance
/// of ambiguity it is recommended users always include trailing `/` when intending to
/// refer to a directory.
///
/// ## Glob File Paths
///
/// If no scheme is provided, and the path contains a glob expression, it will
/// be resolved as follows.
///
/// The string up to the first path segment containing a glob expression will be extracted,
/// and resolved in the same manner as a normal scheme-less path. That is, resolved to
/// an absolute path on the local filesystem, returning an error if it does not exist,
/// and converted to a [file URI]
/// and resolved in the same manner as a normal scheme-less path above.
///
/// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
/// filter when listing files from object storage
///
/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
/// [URL]: https://url.spec.whatwg.org/
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
let s = s.as_ref();

Expand All @@ -92,32 +104,6 @@ impl ListingTableUrl {
}
}

/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
) -> Result<Self> {
let s = s.as_ref();
let is_valid_url = Url::parse(s).is_ok();

match is_valid_url {
true => ListingTableUrl::parse(s),
false => {
let path = std::path::PathBuf::from(s);
if !path.exists() {
if is_directory {
fs::create_dir_all(path)?;
} else {
fs::File::create(path)?;
}
}
ListingTableUrl::parse(s)
}
}
}

/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
Expand All @@ -129,15 +115,9 @@ impl ListingTableUrl {
None => (s, None),
};

let path = std::path::Path::new(prefix).canonicalize()?;
let url = if path.is_dir() {
Url::from_directory_path(path)
} else {
Url::from_file_path(path)
}
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {s}")))?;
// TODO: Currently we do not have an IO-related error variant that accepts ()
// or a string. Once we have such a variant, change the error type above.
let url = url_from_path(prefix).ok_or_else(|| {
DataFusionError::Internal(format!("Can not open path: {s}"))
})?;
Ok(Self::new(url, glob))
}

Expand Down Expand Up @@ -214,7 +194,12 @@ impl ListingTableUrl {
}
}
},
false => futures::stream::once(store.head(&self.prefix)).boxed(),
false => futures::stream::once(store.head(&self.prefix))
.filter(|r| {
let p = !matches!(r, Err(object_store::Error::NotFound { .. }));
futures::future::ready(p)
})
.boxed(),
};
Ok(list
.try_filter(move |meta| {
Expand Down Expand Up @@ -257,6 +242,43 @@ impl std::fmt::Display for ListingTableUrl {
}
}

fn url_from_path(s: &str) -> Option<Url> {
let path = std::path::Path::new(s);
let is_dir = match path.exists() {
true => path.is_dir(),
// Fallback to inferring from trailing separator
false => std::path::is_separator(s.chars().last()?),
};
let url = |p| match is_dir {
true => Url::from_directory_path(p).ok(),
false => Url::from_file_path(p).ok(),
};

if path.is_absolute() {
return url(path);
}

let base = std::env::current_dir().ok()?;
let absolute = resolve_path(base, path)?;
url(absolute.as_path())
}

fn resolve_path(mut base: PathBuf, path: &std::path::Path) -> Option<PathBuf> {
for component in path.components() {
match component {
Component::Prefix(_) | Component::RootDir => return None,
Component::Normal(p) => base.push(p),
Component::CurDir => {} // Do nothing
Component::ParentDir => {
if !base.pop() {
return None;
}
}
}
}
Some(base)
}

const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];

/// Splits `path` at the first path segment containing a glob expression, returning
Expand Down Expand Up @@ -368,4 +390,25 @@ mod tests {
Some(("/a/b/c//", "alltypes_plain*.parquet")),
);
}

#[test]
fn test_resolve_path() {
let r = resolve_path("/foo/bar".into(), "../baz.txt".as_ref()).unwrap();
assert_eq!(r.to_str().unwrap(), "/foo/baz.txt");

let r = resolve_path("/foo/bar".into(), "./baz.txt".as_ref()).unwrap();
assert_eq!(r.to_str().unwrap(), "/foo/bar/baz.txt");

let r = resolve_path("/foo/bar".into(), "../../../baz.txt".as_ref());
assert_eq!(r, None);
}

#[test]
fn test_url_from_path() {
let url = url_from_path("foo/bar").unwrap();
assert!(url.path().ends_with("foo/bar"));

let url = url_from_path("foo/bar/").unwrap();
assert!(url.path().ends_with("foo/bar/"));
}
}
11 changes: 1 addition & 10 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ impl TableProviderFactory for ListingTableFactory {
.unwrap_or(false)
};

let create_local_path = statement_options
.take_bool_option("create_local_path")?
.unwrap_or(false);
let single_file = statement_options
.take_bool_option("single_file")?
.unwrap_or(false);
Expand Down Expand Up @@ -205,13 +202,7 @@ impl TableProviderFactory for ListingTableFactory {
FileType::AVRO => file_type_writer_options,
};

let table_path = match create_local_path {
true => ListingTableUrl::parse_create_local_if_not_exists(
&cmd.location,
!single_file,
),
false => ListingTableUrl::parse(&cmd.location),
}?;
let table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,7 @@ impl DefaultPhysicalPlanner {
copy_options,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;

// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
// TODO: add additional configurable options for if existing files should be overwritten or
// appended to
let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();

let schema: Schema = (**input.schema()).clone().into();
Expand Down
34 changes: 10 additions & 24 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
b varchar,
)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned'
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
PARTITIONED BY (b)
OPTIONS(
create_local_path 'true',
insert_mode 'append_new_files',
);

Expand Down Expand Up @@ -88,7 +87,6 @@ STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/'
WITH ORDER (a ASC, B DESC)
OPTIONS(
create_local_path 'true',
insert_mode 'append_new_files',
);

Expand Down Expand Up @@ -130,9 +128,8 @@ CREATE EXTERNAL TABLE
partitioned_insert_test(a string, b string, c bigint)
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/'
PARTITIONED BY (a, b)
PARTITIONED BY (a, b)
OPTIONS(
create_local_path 'true',
insert_mode 'append_new_files',
);

Expand Down Expand Up @@ -172,9 +169,8 @@ CREATE EXTERNAL TABLE
partitioned_insert_test_json(a string, b string)
STORED AS json
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_json/'
PARTITIONED BY (a)
PARTITIONED BY (a)
OPTIONS(
create_local_path 'true',
insert_mode 'append_new_files',
);

Expand Down Expand Up @@ -207,9 +203,8 @@ CREATE EXTERNAL TABLE
partitioned_insert_test_pq(a string, b bigint)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_pq/'
PARTITIONED BY (a)
PARTITIONED BY (a)
OPTIONS(
create_local_path 'true',
insert_mode 'append_new_files',
);

Expand Down Expand Up @@ -250,7 +245,6 @@ single_file_test(a bigint, b bigint)
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/single_csv_table.csv'
OPTIONS(
create_local_path 'true',
single_file 'true',
);

Expand All @@ -276,10 +270,7 @@ statement ok
CREATE EXTERNAL TABLE
directory_test(a bigint, b bigint)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0'
OPTIONS(
create_local_path 'true',
);
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0/';

query II
INSERT INTO directory_test values (1, 2), (3, 4);
Expand All @@ -296,8 +287,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1'
OPTIONS (create_local_path 'true');
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1/';

query TT
EXPLAIN
Expand Down Expand Up @@ -362,8 +352,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2'
OPTIONS (create_local_path 'true');
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2/';

query TT
EXPLAIN
Expand Down Expand Up @@ -407,8 +396,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(c1 varchar NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3'
OPTIONS (create_local_path 'true');
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3/';

# verify that the sort order of the insert query is maintained into the
# insert (there should be a SortExec in the following plan)
Expand Down Expand Up @@ -447,8 +435,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(id BIGINT, name varchar)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4'
OPTIONS (create_local_path 'true');
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4/';

query IT
insert into table_without_values(id, name) values(1, 'foo');
Expand Down Expand Up @@ -486,8 +473,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NOT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5'
OPTIONS (create_local_path 'true');
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5/';

query II
insert into table_without_values values(1, 100);
Expand Down
Loading

0 comments on commit 72778b1

Please sign in to comment.