diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index ba10d51c2e53..c46badd64fa8 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -50,6 +50,7 @@ use datafusion::{ use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; +use datafusion::datasource::listing::ListingTableUrl; use serde::Serialize; use structopt::StructOpt; @@ -425,7 +426,8 @@ fn get_table( table_partition_cols: vec![], }; - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path) + let table_path = ListingTableUrl::parse(path)?; + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) .with_listing_options(options) .with_schema(schema); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 703cb702546f..a3d7c0f56644 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow_flight::SchemaAsIpc; use datafusion::datafusion_data_access::object_store::local::LocalFileSystem; use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::listing::{ListingOptions, ListingTableUrl}; use futures::Stream; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; @@ -68,9 +68,11 @@ impl FlightService for FlightServiceImpl { let request = request.into_inner(); let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let table_path = + ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; let schema = listing_options - .infer_schema(Arc::new(LocalFileSystem {}), &request.path[0]) + .infer_schema(Arc::new(LocalFileSystem {}), &table_path) .await .unwrap(); diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index f8170443e213..08da53f9c558 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" } datafusion-row = { path = "../row", version = "8.0.0" } datafusion-sql = { path = "../sql", version = "8.0.0" } futures = "0.3" +glob = "0.3.0" hashbrown = { version = "0.12", features = ["raw"] } +itertools = "0.10" lazy_static = { version = "^1.4.0" } log = "^0.4" num-traits = { version = "0.2", optional = true } @@ -85,6 +87,7 @@ sqlparser = "0.17" tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } tokio-stream = "0.1" +url = "2.2" uuid = { version = "1.0", features = ["v4"] } [dev-dependencies] diff --git a/datafusion/core/benches/sort_limit_query_sql.rs b/datafusion/core/benches/sort_limit_query_sql.rs index faeff6bc9c69..d1f253a982a5 100644 --- a/datafusion/core/benches/sort_limit_query_sql.rs +++ b/datafusion/core/benches/sort_limit_query_sql.rs @@ -20,7 +20,9 @@ extern crate criterion; use criterion::Criterion; use datafusion::datafusion_data_access::object_store::local::LocalFileSystem; use datafusion::datasource::file_format::csv::CsvFormat; -use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; use parking_lot::Mutex; use std::sync::Arc; @@ -64,11 +66,12 @@ fn create_context() -> Arc> { let testdata = datafusion::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); + let table_path = ListingTableUrl::parse(path).unwrap(); // create CSV data source let listing_options = ListingOptions::new(Arc::new(CsvFormat::default())); - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path) + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) .with_listing_options(listing_options) .with_schema(schema); diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 9ef5c3d67895..748cad52ba7f 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -23,8 +23,8 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; -use crate::datasource::listing::{ListingTable, ListingTableConfig}; -use crate::datasource::object_store_registry::ObjectStoreRegistry; +use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; +use crate::datasource::object_store::ObjectStoreRegistry; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use datafusion_data_access::object_store::ObjectStore; @@ -156,31 +156,33 @@ impl ObjectStoreSchemaProvider { .register_store(scheme.into(), object_store) } - /// Retrieves a `ObjectStore` instance by scheme - pub fn object_store<'a>( + /// Retrieves a `ObjectStore` instance for a given Url + pub fn object_store( &self, - uri: &'a str, - ) -> Result<(Arc, &'a str)> { + url: impl AsRef, + ) -> Result> { self.object_store_registry .lock() - .get_by_uri(uri) + .get_by_url(url) .map_err(DataFusionError::from) } /// If supported by the implementation, adds a new table to this schema by creating a - /// `ListingTable` from the provided `uri` and a previously registered `ObjectStore`. + /// `ListingTable` from the provided `url` and a previously registered `ObjectStore`. /// If a table of the same name existed before, it returns "Table already exists" error. pub async fn register_listing_table( &self, name: &str, - uri: &str, + table_path: ListingTableUrl, config: Option, ) -> Result<()> { let config = match config { Some(cfg) => cfg, None => { - let (object_store, _path) = self.object_store(uri)?; - ListingTableConfig::new(object_store, uri).infer().await? + let object_store = self.object_store(&table_path)?; + ListingTableConfig::new(object_store, table_path) + .infer() + .await? } }; @@ -255,6 +257,7 @@ mod tests { use crate::datasource::empty::EmptyTable; use crate::execution::context::SessionContext; + use crate::datasource::listing::ListingTableUrl; use futures::StreamExt; #[tokio::test] @@ -280,12 +283,13 @@ mod tests { async fn test_schema_register_listing_table() { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let table_path = ListingTableUrl::parse(filename).unwrap(); let schema = ObjectStoreSchemaProvider::new(); let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {})); schema - .register_listing_table("alltypes_plain", &filename, None) + .register_listing_table("alltypes_plain", table_path, None) .await .unwrap(); @@ -338,8 +342,9 @@ mod tests { || file == OsStr::new("alltypes_plain.parquet") { let name = path.file_stem().unwrap().to_str().unwrap(); + let path = ListingTableUrl::parse(&sized_file.path).unwrap(); schema - .register_listing_table(name, &sized_file.path, None) + .register_listing_table(name, path, None) .await .unwrap(); } @@ -360,17 +365,18 @@ mod tests { async fn test_schema_register_same_listing_table() { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let table_path = ListingTableUrl::parse(filename).unwrap(); let schema = ObjectStoreSchemaProvider::new(); let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {})); schema - .register_listing_table("alltypes_plain", &filename, None) + .register_listing_table("alltypes_plain", table_path.clone(), None) .await .unwrap(); schema - .register_listing_table("alltypes_plain", &filename, None) + .register_listing_table("alltypes_plain", table_path, None) .await .unwrap(); } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 669ed0efdde2..eae86fa9cca7 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -85,6 +85,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { pub(crate) mod test_util { use super::*; use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use datafusion_data_access::object_store::local::{ local_unpartitioned_file, LocalFileSystem, }; @@ -115,6 +116,7 @@ pub(crate) mod test_util { .create_physical_plan( FileScanConfig { object_store: store, + object_store_url: ObjectStoreUrl::local_filesystem(), file_schema, file_groups, statistics, diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 11a91f2eeaa8..a26eafabb50c 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -17,7 +17,6 @@ //! Helper functions for the table implementation -use std::path::{Component, Path}; use std::sync::Arc; use arrow::{ @@ -29,11 +28,7 @@ use arrow::{ record_batch::RecordBatch, }; use chrono::{TimeZone, Utc}; -use datafusion_common::DataFusionError; -use futures::{ - stream::{self}, - StreamExt, TryStreamExt, -}; +use futures::{stream::BoxStream, TryStreamExt}; use log::debug; use crate::{ @@ -44,7 +39,8 @@ use crate::{ scalar::ScalarValue, }; -use super::{PartitionedFile, PartitionedFileStream}; +use super::PartitionedFile; +use crate::datasource::listing::ListingTableUrl; use datafusion_data_access::{object_store::ObjectStore, FileMeta, SizedFile}; use datafusion_expr::Volatility; @@ -161,94 +157,53 @@ pub fn split_files( /// TODO for tables with many files (10k+), it will usually more efficient /// to first list the folders relative to the first partition dimension, /// prune those, then list only the contain of the remaining folders. -pub async fn pruned_partition_list( - store: &dyn ObjectStore, - table_path: &str, +pub async fn pruned_partition_list<'a>( + store: &'a dyn ObjectStore, + table_path: &'a ListingTableUrl, filters: &[Expr], - file_extension: &str, - table_partition_cols: &[String], -) -> Result { + file_extension: &'a str, + table_partition_cols: &'a [String], +) -> Result>> { + let list = table_path.list_all_files(store, file_extension); + // if no partition col => simply list all the files if table_partition_cols.is_empty() { - return Ok(Box::pin( - store - .glob_file_with_suffix(table_path, file_extension) - .await? - .map(|f| { - Ok(PartitionedFile { - partition_values: vec![], - file_meta: f?, - range: None, - }) - }), - )); + return Ok(Box::pin(list.map_ok(|object_meta| object_meta.into()))); } let applicable_filters: Vec<_> = filters .iter() .filter(|f| expr_applicable_for_cols(table_partition_cols, f)) .collect(); - let stream_path = table_path.to_owned(); + if applicable_filters.is_empty() { // Parse the partition values while listing all the files // Note: We might avoid parsing the partition values if they are not used in any projection, // but the cost of parsing will likely be far dominated by the time to fetch the listing from // the object store. - let table_partition_cols_stream = table_partition_cols.to_vec(); - Ok(Box::pin( - store - .glob_file_with_suffix(table_path, file_extension) - .await? - .filter_map(move |f| { - let stream_path = stream_path.clone(); - let table_partition_cols_stream = table_partition_cols_stream.clone(); - async move { - let file_meta = match f { - Ok(fm) => fm, - Err(err) => return Some(Err(err)), - }; - let parsed_path = parse_partitions_for_path( - &stream_path, - file_meta.path(), - &table_partition_cols_stream, - ) - .map(|p| { - p.iter() - .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) - .collect() - }); - - parsed_path.map(|partition_values| { - Ok(PartitionedFile { - partition_values, - file_meta, - range: None, - }) - }) - } - }), - )) + Ok(Box::pin(list.try_filter_map(move |file_meta| async move { + let parsed_path = parse_partitions_for_path( + table_path, + file_meta.path(), + table_partition_cols, + ) + .map(|p| { + p.iter() + .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) + .collect() + }); + + Ok(parsed_path.map(|partition_values| PartitionedFile { + partition_values, + file_meta, + range: None, + })) + }))) } else { // parse the partition values and serde them as a RecordBatch to filter them - // TODO avoid collecting but have a streaming memory table instead - let batches: Vec = store - .glob_file_with_suffix(table_path, file_extension) - .await? - // TODO we set an arbitrary high batch size here, it does not matter as we list - // all the files anyway. This number will need to be adjusted according to the object - // store if we switch to a streaming-stlye pruning of the files. For instance S3 lists - // 1000 items at a time so batches of 1000 would be ideal with S3 as store. - .chunks(1024) - .map(|v| { - v.into_iter() - .collect::>>() - }) - .map_err(DataFusionError::IoError) - .map(move |metas| paths_to_batch(table_partition_cols, &stream_path, &metas?)) - .try_collect() - .await?; - - let mem_table = MemTable::try_new(batches[0].schema(), vec![batches])?; + let metas: Vec<_> = list.try_collect().await?; + let batch = paths_to_batch(table_partition_cols, table_path, &metas)?; + let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; // Filter the partitions using a local datafusion context // TODO having the external context would allow us to resolve `Volatility::Stable` @@ -260,7 +215,7 @@ pub async fn pruned_partition_list( } let filtered_batches = df.collect().await?; - Ok(Box::pin(stream::iter( + Ok(Box::pin(futures::stream::iter( batches_to_paths(&filtered_batches).into_iter().map(Ok), ))) } @@ -275,7 +230,7 @@ pub async fn pruned_partition_list( /// Note: For the last modified date, this looses precisions higher than millisecond. fn paths_to_batch( table_partition_cols: &[String], - table_path: &str, + table_path: &ListingTableUrl, metas: &[FileMeta], ) -> Result { let mut key_builder = StringBuilder::new(metas.len()); @@ -373,21 +328,15 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec { /// Extract the partition values for the given `file_path` (in the given `table_path`) /// associated to the partitions defined by `table_partition_cols` fn parse_partitions_for_path<'a>( - table_path: &str, + table_path: &ListingTableUrl, file_path: &'a str, table_partition_cols: &[String], ) -> Option> { - let subpath = file_path.strip_prefix(table_path)?; - - // split subpath into components ignoring leading separator if exists - let subpath = Path::new(subpath) - .components() - .filter(|c| !matches!(c, Component::RootDir)) - .filter_map(|c| c.as_os_str().to_str()); + let subpath = table_path.strip_prefix(file_path)?; let mut part_values = vec![]; - for (path, pn) in subpath.zip(table_partition_cols) { - match path.split_once('=') { + for (part, pn) in subpath.zip(table_partition_cols) { + match part.split_once('=') { Some((name, val)) if name == pn => part_values.push(val), _ => return None, } @@ -401,6 +350,7 @@ mod tests { logical_plan::{case, col, lit}, test::object_store::TestObjectStore, }; + use futures::StreamExt; use super::*; @@ -453,7 +403,7 @@ mod tests { let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( store.as_ref(), - "tablepath/", + &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter], ".parquet", &[String::from("mypartition")], @@ -476,33 +426,34 @@ mod tests { let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( store.as_ref(), - "tablepath/", + &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter], ".parquet", &[String::from("mypartition")], ) .await .expect("partition pruning failed") - .collect::>() - .await; + .try_collect::>() + .await + .unwrap(); assert_eq!(pruned.len(), 2); - let f1 = pruned[0].as_ref().expect("first item not an error"); + let f1 = &pruned[0]; assert_eq!( - &f1.file_meta.sized_file.path, + f1.file_meta.path(), "tablepath/mypartition=val1/file.parquet" ); assert_eq!( &f1.partition_values, &[ScalarValue::Utf8(Some(String::from("val1"))),] ); - let f2 = pruned[1].as_ref().expect("second item not an error"); + let f2 = &pruned[1]; assert_eq!( - &f2.file_meta.sized_file.path, + f2.file_meta.path(), "tablepath/mypartition=val1/other=val3/file.parquet" ); assert_eq!( - &f2.partition_values, + f2.partition_values, &[ScalarValue::Utf8(Some(String::from("val1"))),] ); } @@ -522,20 +473,21 @@ mod tests { let filter3 = Expr::eq(col("part2"), col("other")); let pruned = pruned_partition_list( store.as_ref(), - "tablepath/", + &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter1, filter2, filter3], ".parquet", &[String::from("part1"), String::from("part2")], ) .await .expect("partition pruning failed") - .collect::>() - .await; + .try_collect::>() + .await + .unwrap(); assert_eq!(pruned.len(), 2); - let f1 = pruned[0].as_ref().expect("first item not an error"); + let f1 = &pruned[0]; assert_eq!( - &f1.file_meta.sized_file.path, + f1.file_meta.path(), "tablepath/part1=p1v2/part2=p2v1/file1.parquet" ); assert_eq!( @@ -545,9 +497,9 @@ mod tests { ScalarValue::Utf8(Some(String::from("p2v1"))) ] ); - let f2 = pruned[1].as_ref().expect("second item not an error"); + let f2 = &pruned[1]; assert_eq!( - &f2.file_meta.sized_file.path, + f2.file_meta.path(), "tablepath/part1=p1v2/part2=p2v1/file2.parquet" ); assert_eq!( @@ -563,12 +515,16 @@ mod tests { fn test_parse_partitions_for_path() { assert_eq!( Some(vec![]), - parse_partitions_for_path("bucket/mytable", "bucket/mytable/file.csv", &[]) + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + "bucket/mytable/file.csv", + &[] + ) ); assert_eq!( None, parse_partitions_for_path( - "bucket/othertable", + &ListingTableUrl::parse("file:///bucket/othertable").unwrap(), "bucket/mytable/file.csv", &[] ) @@ -576,7 +532,7 @@ mod tests { assert_eq!( None, parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/file.csv", &[String::from("mypartition")] ) @@ -584,7 +540,7 @@ mod tests { assert_eq!( Some(vec!["v1"]), parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/mypartition=v1/file.csv", &[String::from("mypartition")] ) @@ -592,7 +548,7 @@ mod tests { assert_eq!( Some(vec!["v1"]), parse_partitions_for_path( - "bucket/mytable/", + &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(), "bucket/mytable/mypartition=v1/file.csv", &[String::from("mypartition")] ) @@ -601,7 +557,7 @@ mod tests { assert_eq!( None, parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/v1/file.csv", &[String::from("mypartition")] ) @@ -609,7 +565,7 @@ mod tests { assert_eq!( Some(vec!["v1", "v2"]), parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv", &[String::from("mypartition"), String::from("otherpartition")] ) @@ -617,7 +573,7 @@ mod tests { assert_eq!( Some(vec!["v1"]), parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv", &[String::from("mypartition")] ) @@ -630,7 +586,7 @@ mod tests { assert_eq!( Some(vec!["v1"]), parse_partitions_for_path( - "bucket\\mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket\\mytable\\mypartition=v1\\file.csv", &[String::from("mypartition")] ) @@ -638,7 +594,7 @@ mod tests { assert_eq!( Some(vec!["v1", "v2"]), parse_partitions_for_path( - "bucket\\mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv", &[String::from("mypartition"), String::from("otherpartition")] ) @@ -664,7 +620,8 @@ mod tests { }, ]; - let batches = paths_to_batch(&[], "mybucket/tablepath", &files) + let table_path = ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(); + let batches = paths_to_batch(&[], &table_path, &files) .expect("Serialization of file list to batch failed"); let parsed_files = batches_to_paths(&[batches]); @@ -698,9 +655,12 @@ mod tests { }, ]; - let batches = - paths_to_batch(&[String::from("part1")], "mybucket/tablepath", &files) - .expect("Serialization of file list to batch failed"); + let batches = paths_to_batch( + &[String::from("part1")], + &ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(), + &files, + ) + .expect("Serialization of file list to batch failed"); let parsed_files = batches_to_paths(&[batches]); assert_eq!(parsed_files.len(), 2); diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 0f0a7d20ee48..c11de5f8021a 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -20,12 +20,14 @@ mod helpers; mod table; +mod url; use datafusion_common::ScalarValue; use datafusion_data_access::{FileMeta, Result, SizedFile}; use futures::Stream; use std::pin::Pin; +pub use self::url::ListingTableUrl; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; /// Stream of files get listed from object store diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1dceb8b35e19..34e44971d662 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -28,7 +28,9 @@ use crate::datasource::{ avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, FileFormat, }, - get_statistics_with_limit, TableProvider, TableType, + get_statistics_with_limit, + listing::ListingTableUrl, + TableProvider, TableType, }; use crate::logical_expr::TableProviderFilterPushDown; use crate::{ @@ -51,7 +53,7 @@ pub struct ListingTableConfig { /// `ObjectStore` that contains the files for the `ListingTable`. pub object_store: Arc, /// Path on the `ObjectStore` for creating `ListingTable`. - pub table_path: String, + pub table_path: ListingTableUrl, /// Optional `SchemaRef` for the to be created `ListingTable`. pub file_schema: Option, /// Optional `ListingOptions` for the to be created `ListingTable`. @@ -60,13 +62,10 @@ pub struct ListingTableConfig { impl ListingTableConfig { /// Creates new `ListingTableConfig`. The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`. - pub fn new( - object_store: Arc, - table_path: impl Into, - ) -> Self { + pub fn new(object_store: Arc, table_path: ListingTableUrl) -> Self { Self { object_store, - table_path: table_path.into(), + table_path, file_schema: None, options: None, } @@ -106,18 +105,18 @@ impl ListingTableConfig { /// Infer `ListingOptions` based on `table_path` suffix. pub async fn infer_options(self) -> Result { - let mut files = self.object_store.list_file(&self.table_path).await?; - let file = files + let file = self + .table_path + .list_all_files(self.object_store.as_ref(), "") .next() .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let tokens: Vec<&str> = file.path().split('.').collect(); - let file_type = tokens.last().ok_or_else(|| { + let file_type = file.path().rsplit('.').next().ok_or_else(|| { DataFusionError::Internal("Unable to infer file suffix".into()) })?; - let format = ListingTableConfig::infer_format(*file_type)?; + let format = ListingTableConfig::infer_format(file_type)?; let listing_options = ListingOptions { format, @@ -140,7 +139,7 @@ impl ListingTableConfig { match self.options { Some(options) => { let schema = options - .infer_schema(self.object_store.clone(), self.table_path.as_str()) + .infer_schema(self.object_store.clone(), &self.table_path) .await?; Ok(Self { @@ -213,10 +212,9 @@ impl ListingOptions { pub async fn infer_schema<'a>( &'a self, store: Arc, - path: &'a str, + table_path: &'a ListingTableUrl, ) -> Result { - let extension = &self.file_extension; - let list_stream = store.glob_file_with_suffix(path, extension).await?; + let list_stream = table_path.list_all_files(store.as_ref(), &self.file_extension); let files: Vec<_> = list_stream.try_collect().await?; self.format.infer_schema(&store, &files).await } @@ -226,7 +224,7 @@ impl ListingOptions { /// or file system listing capability to get the list of files. pub struct ListingTable { object_store: Arc, - table_path: String, + table_path: ListingTableUrl, /// File fields only file_schema: SchemaRef, /// File fields + partition columns @@ -276,10 +274,12 @@ impl ListingTable { pub fn object_store(&self) -> &Arc { &self.object_store } + /// Get path ref - pub fn table_path(&self) -> &str { + pub fn table_path(&self) -> &ListingTableUrl { &self.table_path } + /// Get options ref pub fn options(&self) -> &ListingOptions { &self.options @@ -322,6 +322,7 @@ impl TableProvider for ListingTable { .create_physical_plan( FileScanConfig { object_store: Arc::clone(&self.object_store), + object_store_url: self.table_path.object_store(), file_schema: Arc::clone(&self.file_schema), file_groups: partitioned_file_lists, statistics, @@ -369,6 +370,7 @@ impl ListingTable { .await?; // collect the statistics if required by the config + // TODO: Collect statistics and schema in single-pass let object_store = Arc::clone(&self.object_store); let files = file_list.then(move |part_file| { let object_store = object_store.clone(); @@ -436,11 +438,12 @@ mod tests { async fn load_table_stats_by_default() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let table_path = ListingTableUrl::parse(filename).unwrap(); let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); let schema = opt - .infer_schema(Arc::new(LocalFileSystem {}), &filename) + .infer_schema(Arc::new(LocalFileSystem {}), &table_path) .await?; - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename) + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) .with_listing_options(opt) .with_schema(schema); let table = ListingTable::try_new(config)?; @@ -464,9 +467,10 @@ mod tests { collect_stat: true, }; + let table_path = ListingTableUrl::parse("file:///table/").unwrap(); let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); - let config = ListingTableConfig::new(store, "table/") + let config = ListingTableConfig::new(store, table_path) .with_listing_options(opt) .with_schema(file_schema); let table = ListingTable::try_new(config)?; @@ -504,7 +508,7 @@ mod tests { "bucket/key-prefix/file3", "bucket/key-prefix/file4", ], - "bucket/key-prefix/", + "file:///bucket/key-prefix/", 12, 5, ) @@ -518,7 +522,7 @@ mod tests { "bucket/key-prefix/file2", "bucket/key-prefix/file3", ], - "bucket/key-prefix/", + "file:///bucket/key-prefix/", 4, 4, ) @@ -533,14 +537,15 @@ mod tests { "bucket/key-prefix/file3", "bucket/key-prefix/file4", ], - "bucket/key-prefix/", + "file:///bucket/key-prefix/", 2, 2, ) .await?; // no files => no groups - assert_list_files_for_scan_grouping(&[], "bucket/key-prefix/", 2, 0).await?; + assert_list_files_for_scan_grouping(&[], "file:///bucket/key-prefix/", 2, 0) + .await?; // files that don't match the prefix assert_list_files_for_scan_grouping( @@ -549,7 +554,7 @@ mod tests { "bucket/key-prefix/file1", "bucket/other-prefix/roguefile", ], - "bucket/key-prefix/", + "file:///bucket/key-prefix/", 10, 2, ) @@ -560,7 +565,8 @@ mod tests { async fn load_table(name: &str) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename) + let table_path = ListingTableUrl::parse(filename).unwrap(); + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) .infer() .await?; let table = ListingTable::try_new(config)?; @@ -590,7 +596,8 @@ mod tests { let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); - let config = ListingTableConfig::new(mock_store, table_prefix.to_owned()) + let table_path = ListingTableUrl::parse(table_prefix).unwrap(); + let config = ListingTableConfig::new(mock_store, table_path) .with_listing_options(opt) .with_schema(Arc::new(schema)); diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs new file mode 100644 index 000000000000..041a6ab7f045 --- /dev/null +++ b/datafusion/core/src/datasource/listing/url.rs @@ -0,0 +1,307 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::datasource::object_store::ObjectStoreUrl; +use datafusion_common::{DataFusionError, Result}; +use datafusion_data_access::object_store::ObjectStore; +use datafusion_data_access::FileMeta; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use glob::Pattern; +use itertools::Itertools; +use std::path::is_separator; +use url::Url; + +/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`] +/// for more information on the supported expressions +#[derive(Debug, Clone)] +pub struct ListingTableUrl { + /// A URL that identifies a file or directory to list files from + url: Url, + /// An optional glob expression used to filter files + glob: Option, +} + +impl ListingTableUrl { + /// Parse a provided string as a `ListingTableUrl` + /// + /// # Paths without a Scheme + /// + /// If no scheme is provided, or the string is an absolute filesystem path + /// as determined [`std::path::Path::is_absolute`], the string will be + /// interpreted as a path on the local filesystem using the operating + /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix. + /// + /// If the path contains any of `'?', '*', '['`, it will be considered + /// a glob expression and resolved as described in the section below. + /// + /// Otherwise, the path will be resolved to an absolute path, returning + /// an error if it does not exist, and converted to a [file URI] + /// + /// If you wish to specify a path that does not exist on the local + /// machine you must provide it as a fully-qualified [file URI] + /// e.g. `file:///myfile.txt` + /// + /// ## Glob File Paths + /// + /// If no scheme is provided, and the path contains a glob expression, it will + /// be resolved as follows. + /// + /// The string up to the first path segment containing a glob expression will be extracted, + /// and resolved in the same manner as a normal scheme-less path. That is, resolved to + /// an absolute path on the local filesystem, returning an error if it does not exist, + /// and converted to a [file URI] + /// + /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a + /// filter when listing files from object storage + /// + /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme + pub fn parse(s: impl AsRef) -> Result { + let s = s.as_ref(); + + // This is necessary to handle the case of a path starting with a drive letter + if std::path::Path::new(s).is_absolute() { + return Self::parse_path(s); + } + + match Url::parse(s) { + Ok(url) => Ok(Self { url, glob: None }), + Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s), + Err(e) => Err(DataFusionError::External(Box::new(e))), + } + } + + /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path + fn parse_path(s: &str) -> Result { + let (prefix, glob) = match split_glob_expression(s) { + Some((prefix, glob)) => { + let glob = Pattern::new(glob) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + (prefix, Some(glob)) + } + None => (s, None), + }; + + let path = std::path::Path::new(prefix).canonicalize()?; + let url = match path.is_file() { + true => Url::from_file_path(path).unwrap(), + false => Url::from_directory_path(path).unwrap(), + }; + + Ok(Self { url, glob }) + } + + /// Returns the URL scheme + pub fn scheme(&self) -> &str { + self.url.scheme() + } + + /// Returns the path as expected by [`ObjectStore`] + /// + /// In particular for file scheme URLs, this is an absolute + /// on the local filesystem in the OS-specific path representation + /// + /// For other URLs, this is a the host and path of the URL, + /// delimited by `/`, and with no leading `/` + /// + /// TODO: Handle paths consistently (#2489) + fn prefix(&self) -> &str { + match self.scheme() { + "file" => match cfg!(target_family = "windows") { + true => self.url.path().strip_prefix('/').unwrap(), + false => self.url.path(), + }, + _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath], + } + } + + /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning + /// an iterator of the remaining path segments + /// + /// TODO: Handle paths consistently (#2489) + pub(crate) fn strip_prefix<'a, 'b: 'a>( + &'a self, + path: &'b str, + ) -> Option + 'a> { + let prefix = self.prefix(); + // Ignore empty path segments + let diff = itertools::diff_with( + path.split(is_separator).filter(|s| !s.is_empty()), + prefix.split(is_separator).filter(|s| !s.is_empty()), + |a, b| a == b, + ); + + match diff { + // Match with remaining + Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath), + _ => None, + } + } + + /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension` + pub(crate) fn list_all_files<'a>( + &'a self, + store: &'a dyn ObjectStore, + file_extension: &'a str, + ) -> BoxStream<'a, Result> { + futures::stream::once(async move { + let prefix = self.prefix(); + store.list_file(prefix.as_ref()).await + }) + .try_flatten() + .map_err(DataFusionError::IoError) + .try_filter(move |meta| { + let path = meta.path(); + + let extension_match = path.ends_with(file_extension); + let glob_match = match &self.glob { + Some(glob) => match self.strip_prefix(path) { + Some(mut segments) => { + let stripped = segments.join("/"); + glob.matches(&stripped) + } + None => false, + }, + None => true, + }; + + futures::future::ready(extension_match && glob_match) + }) + .boxed() + } + + /// Returns this [`ListingTableUrl`] as a string + pub fn as_str(&self) -> &str { + self.as_ref() + } + + /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`] + pub fn object_store(&self) -> ObjectStoreUrl { + let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath]; + ObjectStoreUrl::parse(url).unwrap() + } +} + +impl AsRef for ListingTableUrl { + fn as_ref(&self) -> &str { + self.url.as_ref() + } +} + +impl AsRef for ListingTableUrl { + fn as_ref(&self) -> &Url { + &self.url + } +} + +impl std::fmt::Display for ListingTableUrl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.as_str().fmt(f) + } +} + +const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; + +/// Splits `path` at the first path segment containing a glob expression, returning +/// `None` if no glob expression found. +/// +/// Path delimiters are determined using [`std::path::is_separator`] which +/// permits `/` as a path delimiter even on Windows platforms. +/// +fn split_glob_expression(path: &str) -> Option<(&str, &str)> { + let mut last_separator = 0; + + for (byte_idx, char) in path.char_indices() { + if GLOB_START_CHARS.contains(&char) { + if last_separator == 0 { + return Some((".", path)); + } + return Some(path.split_at(last_separator)); + } + + if std::path::is_separator(char) { + last_separator = byte_idx + char.len_utf8(); + } + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_prefix_path() { + let root = std::env::current_dir().unwrap(); + let root = root.to_string_lossy(); + + let url = ListingTableUrl::parse(&root).unwrap(); + let child = format!("{}/partition/file", root); + + let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect(); + assert_eq!(prefix, vec!["partition", "file"]); + } + + #[test] + fn test_prefix_s3() { + let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap(); + assert_eq!(url.prefix(), "bucket/foo/bar"); + + let path = "bucket/foo/bar/partition/foo.parquet"; + let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect(); + assert_eq!(prefix, vec!["partition", "foo.parquet"]); + + let path = "other-bucket/foo/bar/partition/foo.parquet"; + assert!(url.strip_prefix(path).is_none()); + } + + #[test] + fn test_split_glob() { + fn test(input: &str, expected: Option<(&str, &str)>) { + assert_eq!( + split_glob_expression(input), + expected, + "testing split_glob_expression with {}", + input + ); + } + + // no glob patterns + test("/", None); + test("/a.txt", None); + test("/a", None); + test("/a/", None); + test("/a/b", None); + test("/a/b/", None); + test("/a/b.txt", None); + test("/a/b/c.txt", None); + // glob patterns, thus we build the longest path (os-specific) + test("*.txt", Some((".", "*.txt"))); + test("/*.txt", Some(("/", "*.txt"))); + test("/a/*b.txt", Some(("/a/", "*b.txt"))); + test("/a/*/b.txt", Some(("/a/", "*/b.txt"))); + test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt"))); + test("/a/b*.txt", Some(("/a/", "b*.txt"))); + test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt"))); + + // https://github.com/apache/arrow-datafusion/issues/2465 + test( + "/a/b/c//alltypes_plain*.parquet", + Some(("/a/b/c//", "alltypes_plain*.parquet")), + ); + } +} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index f3cc0a04eecd..65fc2adcbaa0 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -23,7 +23,7 @@ pub mod empty; pub mod file_format; pub mod listing; pub mod memory; -pub mod object_store_registry; +pub mod object_store; pub mod view; use futures::Stream; diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs new file mode 100644 index 000000000000..ac7e1a847075 --- /dev/null +++ b/datafusion/core/src/datasource/object_store.rs @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. +//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS +//! and query data inside these systems. + +use datafusion_common::{DataFusionError, Result}; +use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME}; +use datafusion_data_access::object_store::ObjectStore; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; +use url::Url; + +/// A parsed URL identifying a particular [`ObjectStore`] +#[derive(Debug, Clone)] +pub struct ObjectStoreUrl { + url: Url, +} + +impl ObjectStoreUrl { + /// Parse an [`ObjectStoreUrl`] from a string + pub fn parse(s: impl AsRef) -> Result { + let mut parsed = + Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?; + + let remaining = &parsed[url::Position::BeforePath..]; + if !remaining.is_empty() && remaining != "/" { + return Err(DataFusionError::Execution(format!( + "ObjectStoreUrl must only contain scheme and authority, got: {}", + remaining + ))); + } + + // Always set path for consistency + parsed.set_path("/"); + Ok(Self { url: parsed }) + } + + /// An [`ObjectStoreUrl`] for the local filesystem + pub fn local_filesystem() -> Self { + Self::parse("file://").unwrap() + } + + /// Returns this [`ObjectStoreUrl`] as a string + pub fn as_str(&self) -> &str { + self.as_ref() + } +} + +impl AsRef for ObjectStoreUrl { + fn as_ref(&self) -> &str { + self.url.as_ref() + } +} + +impl AsRef for ObjectStoreUrl { + fn as_ref(&self) -> &Url { + &self.url + } +} + +impl std::fmt::Display for ObjectStoreUrl { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.as_str().fmt(f) + } +} + +/// Object store registry +pub struct ObjectStoreRegistry { + /// A map from scheme to object store that serve list / read operations for the store + pub object_stores: RwLock>>, +} + +impl std::fmt::Debug for ObjectStoreRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("ObjectStoreRegistry") + .field( + "schemes", + &self.object_stores.read().keys().collect::>(), + ) + .finish() + } +} + +impl Default for ObjectStoreRegistry { + fn default() -> Self { + Self::new() + } +} + +impl ObjectStoreRegistry { + /// Create the registry that object stores can registered into. + /// ['LocalFileSystem'] store is registered in by default to support read local files natively. + pub fn new() -> Self { + let mut map: HashMap> = HashMap::new(); + map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + + Self { + object_stores: RwLock::new(map), + } + } + + /// Adds a new store to this registry. + /// If a store of the same prefix existed before, it is replaced in the registry and returned. + pub fn register_store( + &self, + scheme: String, + store: Arc, + ) -> Option> { + let mut stores = self.object_stores.write(); + stores.insert(scheme, store) + } + + /// Get the store registered for scheme + pub fn get(&self, scheme: &str) -> Option> { + let stores = self.object_stores.read(); + stores.get(scheme).cloned() + } + + /// Get a suitable store for the provided URL. For example: + /// + /// - URL with scheme `file://` or no schema will return the default LocalFS store + /// - URL with scheme `s3://` will return the S3 store if it's registered + /// + pub fn get_by_url(&self, url: impl AsRef) -> Result> { + let url = url.as_ref(); + let store = self.get(url.scheme()).ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {}", + url + )) + })?; + + Ok(store) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::datasource::listing::ListingTableUrl; + use datafusion_data_access::object_store::local::LocalFileSystem; + use std::sync::Arc; + + #[test] + fn test_object_store_url() { + let listing = ListingTableUrl::parse("file:///").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "file:///"); + + let file = ObjectStoreUrl::parse("file://").unwrap(); + assert_eq!(file.as_str(), "file:///"); + + let listing = ListingTableUrl::parse("s3://bucket/").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "s3://bucket/"); + + let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); + assert_eq!(url.as_str(), "s3://bucket/"); + + let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap(); + assert_eq!(url.as_str(), "s3://username:password@host:123/"); + + let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err(); + assert_eq!(err.to_string(), "External error: invalid port number"); + + let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err(); + assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?"); + + let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err(); + assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar"); + + let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err(); + assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); + + let err = + ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err(); + assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); + } + + #[test] + fn test_get_by_url_s3() { + let sut = ObjectStoreRegistry::default(); + sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {})); + let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); + sut.get_by_url(&url).unwrap(); + } + + #[test] + fn test_get_by_url_file() { + let sut = ObjectStoreRegistry::default(); + let url = ListingTableUrl::parse("file:///bucket/key").unwrap(); + sut.get_by_url(&url).unwrap(); + } + + #[test] + fn test_get_by_url_local() { + let sut = ObjectStoreRegistry::default(); + let url = ListingTableUrl::parse("../").unwrap(); + sut.get_by_url(&url).unwrap(); + } +} diff --git a/datafusion/core/src/datasource/object_store_registry.rs b/datafusion/core/src/datasource/object_store_registry.rs deleted file mode 100644 index 70336af026c7..000000000000 --- a/datafusion/core/src/datasource/object_store_registry.rs +++ /dev/null @@ -1,138 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. -//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS -//! and query data inside these systems. - -use datafusion_common::{DataFusionError, Result}; -use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME}; -use datafusion_data_access::object_store::ObjectStore; -use parking_lot::RwLock; -use std::collections::HashMap; -use std::fmt; -use std::sync::Arc; - -/// Object store registry -pub struct ObjectStoreRegistry { - /// A map from scheme to object store that serve list / read operations for the store - pub object_stores: RwLock>>, -} - -impl fmt::Debug for ObjectStoreRegistry { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ObjectStoreRegistry") - .field( - "schemes", - &self.object_stores.read().keys().collect::>(), - ) - .finish() - } -} - -impl Default for ObjectStoreRegistry { - fn default() -> Self { - Self::new() - } -} - -impl ObjectStoreRegistry { - /// Create the registry that object stores can registered into. - /// ['LocalFileSystem'] store is registered in by default to support read local files natively. - pub fn new() -> Self { - let mut map: HashMap> = HashMap::new(); - map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); - - Self { - object_stores: RwLock::new(map), - } - } - - /// Adds a new store to this registry. - /// If a store of the same prefix existed before, it is replaced in the registry and returned. - pub fn register_store( - &self, - scheme: String, - store: Arc, - ) -> Option> { - let mut stores = self.object_stores.write(); - stores.insert(scheme, store) - } - - /// Get the store registered for scheme - pub fn get(&self, scheme: &str) -> Option> { - let stores = self.object_stores.read(); - stores.get(scheme).cloned() - } - - /// Get a suitable store for the URI based on it's scheme. For example: - /// - URI with scheme `file://` or no schema will return the default LocalFS store - /// - URI with scheme `s3://` will return the S3 store if it's registered - /// Returns a tuple with the store and the self-described uri of the file in that store - pub fn get_by_uri<'a>( - &self, - uri: &'a str, - ) -> Result<(Arc, &'a str)> { - if let Some((scheme, path)) = uri.split_once("://") { - let stores = self.object_stores.read(); - let store = stores - .get(&*scheme.to_lowercase()) - .map(Clone::clone) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "No suitable object store found for {}", - scheme - )) - })?; - Ok((store, path)) - } else { - Ok((Arc::new(LocalFileSystem), uri)) - } - } -} - -#[cfg(test)] -mod tests { - use super::ObjectStoreRegistry; - use datafusion_data_access::object_store::local::LocalFileSystem; - use std::sync::Arc; - - #[test] - fn test_get_by_uri_s3() { - let sut = ObjectStoreRegistry::default(); - sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {})); - let uri = "s3://bucket/key"; - let (_, path) = sut.get_by_uri(uri).unwrap(); - assert_eq!(path, "bucket/key"); - } - - #[test] - fn test_get_by_uri_file() { - let sut = ObjectStoreRegistry::default(); - let uri = "file:///bucket/key"; - let (_, path) = sut.get_by_uri(uri).unwrap(); - assert_eq!(path, "/bucket/key"); - } - - #[test] - fn test_get_by_uri_local() { - let sut = ObjectStoreRegistry::default(); - let uri = "/bucket/key"; - let (_, path) = sut.get_by_uri(uri).unwrap(); - assert_eq!(path, "/bucket/key"); - } -} diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 652834d73ae3..4d579776e666 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -57,7 +57,7 @@ use crate::catalog::{ schema::{MemorySchemaProvider, SchemaProvider}, }; use crate::dataframe::DataFrame; -use crate::datasource::listing::ListingTableConfig; +use crate::datasource::listing::{ListingTableConfig, ListingTableUrl}; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{ @@ -519,26 +519,25 @@ impl SessionContext { /// Creates a DataFrame for reading an Avro data source. pub async fn read_avro( &self, - uri: impl Into, + table_path: impl AsRef, options: AvroReadOptions<'_>, ) -> Result> { - let uri: String = uri.into(); - let (object_store, path) = self.runtime_env().object_store(&uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); - let path: String = path.into(); - let resolved_schema = match options.schema { Some(s) => s, None => { listing_options - .infer_schema(Arc::clone(&object_store), &path) + .infer_schema(Arc::clone(&object_store), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, path.clone()) + + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -548,26 +547,24 @@ impl SessionContext { /// Creates a DataFrame for reading an Json data source. pub async fn read_json( &mut self, - uri: impl Into, + table_path: impl AsRef, options: NdJsonReadOptions<'_>, ) -> Result> { - let uri: String = uri.into(); - let (object_store, path) = self.runtime_env().object_store(&uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); - let path: String = path.into(); - let resolved_schema = match options.schema { Some(s) => s, None => { listing_options - .infer_schema(Arc::clone(&object_store), &path) + .infer_schema(Arc::clone(&object_store), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, path) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -586,52 +583,47 @@ impl SessionContext { /// Creates a DataFrame for reading a CSV data source. pub async fn read_csv( &self, - uri: impl Into, + table_path: impl AsRef, options: CsvReadOptions<'_>, ) -> Result> { - let uri: String = uri.into(); - let (object_store, path) = self.runtime_env().object_store(&uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; - let path = path.to_string(); let listing_options = options.to_listing_options(target_partitions); let resolved_schema = match options.schema { Some(s) => Arc::new(s.to_owned()), None => { listing_options - .infer_schema(Arc::clone(&object_store), &path) + .infer_schema(Arc::clone(&object_store), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, path.clone()) + let config = ListingTableConfig::new(object_store, table_path.clone()) .with_listing_options(listing_options) .with_schema(resolved_schema); - let provider = ListingTable::try_new(config)?; - let plan = - LogicalPlanBuilder::scan(path, provider_as_source(Arc::new(provider)), None)? - .build()?; - Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) + let provider = ListingTable::try_new(config)?; + self.read_table(Arc::new(provider)) } /// Creates a DataFrame for reading a Parquet data source. pub async fn read_parquet( &self, - uri: impl Into, + table_path: impl AsRef, options: ParquetReadOptions<'_>, ) -> Result> { - let uri: String = uri.into(); - let (object_store, path) = self.runtime_env().object_store(&uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); - let path: String = path.into(); // with parquet we resolve the schema in all cases let resolved_schema = listing_options - .infer_schema(Arc::clone(&object_store), &path) + .infer_schema(Arc::clone(&object_store), &table_path) .await?; - let config = ListingTableConfig::new(object_store, path) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); @@ -651,23 +643,24 @@ impl SessionContext { /// Registers a table that uses the listing feature of the object store to /// find the files to be processed /// This is async because it might need to resolve the schema. - pub async fn register_listing_table<'a>( - &'a self, - name: &'a str, - uri: &'a str, + pub async fn register_listing_table( + &self, + name: &str, + table_path: impl AsRef, options: ListingOptions, provided_schema: Option, ) -> Result<()> { - let (object_store, path) = self.runtime_env().object_store(uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let resolved_schema = match provided_schema { None => { options - .infer_schema(Arc::clone(&object_store), path) + .infer_schema(Arc::clone(&object_store), &table_path) .await? } Some(s) => s, }; - let config = ListingTableConfig::new(object_store, path) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(options) .with_schema(resolved_schema); let table = ListingTable::try_new(config)?; @@ -680,7 +673,7 @@ impl SessionContext { pub async fn register_csv( &self, name: &str, - uri: &str, + table_path: &str, options: CsvReadOptions<'_>, ) -> Result<()> { let listing_options = @@ -688,7 +681,7 @@ impl SessionContext { self.register_listing_table( name, - uri, + table_path, listing_options, options.schema.map(|s| Arc::new(s.to_owned())), ) @@ -702,13 +695,13 @@ impl SessionContext { pub async fn register_json( &self, name: &str, - uri: &str, + table_path: &str, options: NdJsonReadOptions<'_>, ) -> Result<()> { let listing_options = options.to_listing_options(self.copied_config().target_partitions); - self.register_listing_table(name, uri, listing_options, options.schema) + self.register_listing_table(name, table_path, listing_options, options.schema) .await?; Ok(()) } @@ -718,7 +711,7 @@ impl SessionContext { pub async fn register_parquet( &self, name: &str, - uri: &str, + table_path: &str, options: ParquetReadOptions<'_>, ) -> Result<()> { let (target_partitions, parquet_pruning) = { @@ -729,7 +722,7 @@ impl SessionContext { .parquet_pruning(parquet_pruning) .to_listing_options(target_partitions); - self.register_listing_table(name, uri, listing_options, None) + self.register_listing_table(name, table_path, listing_options, None) .await?; Ok(()) } @@ -739,13 +732,13 @@ impl SessionContext { pub async fn register_avro( &self, name: &str, - uri: &str, + table_path: &str, options: AvroReadOptions<'_>, ) -> Result<()> { let listing_options = options.to_listing_options(self.copied_config().target_partitions); - self.register_listing_table(name, uri, listing_options, options.schema) + self.register_listing_table(name, table_path, listing_options, options.schema) .await?; Ok(()) } diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs index 73bbc836ec92..26d1471a1df5 100644 --- a/datafusion/core/src/execution/runtime_env.rs +++ b/datafusion/core/src/execution/runtime_env.rs @@ -26,12 +26,13 @@ use crate::{ }, }; -use crate::datasource::object_store_registry::ObjectStoreRegistry; +use crate::datasource::object_store::ObjectStoreRegistry; use datafusion_common::DataFusionError; use datafusion_data_access::object_store::ObjectStore; use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::sync::Arc; +use url::Url; #[derive(Clone)] /// Execution runtime environment. @@ -100,12 +101,9 @@ impl RuntimeEnv { } /// Retrieves a `ObjectStore` instance by scheme - pub fn object_store<'a>( - &self, - uri: &'a str, - ) -> Result<(Arc, &'a str)> { + pub fn object_store(&self, url: impl AsRef) -> Result> { self.object_store_registry - .get_by_uri(uri) + .get_by_url(url) .map_err(DataFusionError::from) } } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 600e24fb8f18..e8852fdc142a 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -52,11 +52,11 @@ //! .to_string(); //! //! let expected = vec![ -//! "+---+--------------------------+", -//! "| a | MIN(tests/example.csv.b) |", -//! "+---+--------------------------+", -//! "| 1 | 2 |", -//! "+---+--------------------------+" +//! "+---+----------------+", +//! "| a | MIN(?table?.b) |", +//! "+---+----------------+", +//! "| 1 | 2 |", +//! "+---+----------------+" //! ]; //! //! assert_eq!(pretty_results.trim().lines().collect::>(), expected); diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 3b2c4515eafd..f24e1e4e6f81 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -241,6 +241,7 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; @@ -261,6 +262,7 @@ mod tests { Arc::new(ParquetExec::new( FileScanConfig { object_store: TestObjectStore::new_arc(&[("x", 100)]), + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), file_schema: schema(), file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], statistics: Statistics::default(), diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index fc56ce1d87fe..8f4d30be0e17 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -167,6 +167,7 @@ impl ExecutionPlan for AvroExec { mod tests { use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::prelude::SessionContext; use crate::scalar::ScalarValue; use arrow::datatypes::{DataType, Field, Schema}; @@ -188,6 +189,7 @@ mod tests { let avro_exec = AvroExec::new(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![meta.into()]], file_schema, statistics: Statistics::default(), @@ -241,9 +243,12 @@ mod tests { async fn avro_exec_missing_column() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); - let store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); - let actual_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?; + let actual_schema = AvroFormat {} + .infer_schema(&object_store, &[meta.clone()]) + .await?; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); @@ -253,7 +258,8 @@ mod tests { let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]); let avro_exec = AvroExec::new(FileScanConfig { - object_store: store, + object_store, + object_store_url, file_groups: vec![vec![meta.into()]], file_schema, statistics: Statistics::default(), @@ -307,9 +313,12 @@ mod tests { async fn avro_exec_with_partition() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); - let store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); - let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?; + let file_schema = AvroFormat {} + .infer_schema(&object_store, &[meta.clone()]) + .await?; let mut partitioned_file = PartitionedFile::from(meta); partitioned_file.partition_values = @@ -319,7 +328,8 @@ mod tests { // select specific columns of the files as well as the partitioning // column which is supposed to be the last column in the table schema. projection: Some(vec![0, 1, file_schema.fields().len(), 2]), - object_store: store, + object_store, + object_store_url, file_groups: vec![vec![partitioned_file]], file_schema, statistics: Statistics::default(), diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 5470f6d57fd1..3a179a7a29fb 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -195,6 +195,7 @@ mod tests { use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; use datafusion_data_access::object_store::local::local_unpartitioned_file; @@ -205,9 +206,14 @@ mod tests { const TEST_DATA_BASE: &str = "tests/jsons"; - async fn prepare_store( - ) -> (Arc, Vec>, SchemaRef) { + async fn prepare_store() -> ( + Arc, + ObjectStoreUrl, + Vec>, + SchemaRef, + ) { let store = Arc::new(LocalFileSystem {}) as _; + let store_url = ObjectStoreUrl::local_filesystem(); let path = format!("{}/1.json", TEST_DATA_BASE); let meta = local_unpartitioned_file(path); let schema = JsonFormat::default() @@ -215,7 +221,7 @@ mod tests { .await .unwrap(); - (store, vec![vec![meta.into()]], schema) + (store, store_url, vec![vec![meta.into()]], schema) } #[tokio::test] @@ -223,10 +229,13 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; - let (object_store, file_groups, file_schema) = prepare_store().await; + + let (object_store, object_store_url, file_groups, file_schema) = + prepare_store().await; let exec = NdJsonExec::new(FileScanConfig { object_store, + object_store_url, file_groups, file_schema, statistics: Statistics::default(), @@ -280,7 +289,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; - let (object_store, file_groups, actual_schema) = prepare_store().await; + let (object_store, object_store_url, file_groups, actual_schema) = + prepare_store().await; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); @@ -290,6 +300,7 @@ mod tests { let exec = NdJsonExec::new(FileScanConfig { object_store, + object_store_url, file_groups, file_schema, statistics: Statistics::default(), @@ -319,10 +330,12 @@ mod tests { async fn nd_json_exec_file_projection() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let (object_store, file_groups, file_schema) = prepare_store().await; + let (object_store, object_store_url, file_groups, file_schema) = + prepare_store().await; let exec = NdJsonExec::new(FileScanConfig { object_store, + object_store_url, file_groups, file_schema, statistics: Statistics::default(), diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 566b4c8d47ab..0fed497d3c61 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -38,7 +38,7 @@ pub use csv::CsvExec; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; -use crate::datasource::listing::PartitionedFile; +use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -68,6 +68,8 @@ lazy_static! { pub struct FileScanConfig { /// Store from which the `files` should be fetched pub object_store: Arc, + /// Object store URL + pub object_store_url: ObjectStoreUrl, /// Schema before projection. It contains the columns that are expected /// to be in the files without the table partition columns. pub file_schema: SchemaRef, @@ -658,6 +660,7 @@ mod tests { file_groups: vec![vec![]], limit: None, object_store: TestObjectStore::new_arc(&[]), + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), projection, statistics, table_partition_cols, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 0931484ef770..9eda036a4226 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -640,6 +640,7 @@ mod tests { use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; use crate::datasource::listing::FileRange; + use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::options::CsvReadOptions; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::Float32Array; @@ -681,6 +682,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![file_groups], file_schema, statistics: Statistics::default(), @@ -1067,6 +1069,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups, file_schema, statistics: Statistics::default(), @@ -1155,6 +1158,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { object_store: store, + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![partitioned_file]], file_schema: schema, statistics: Statistics::default(), @@ -1214,6 +1218,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![partitioned_file]], file_schema: Arc::new(Schema::empty()), statistics: Statistics::default(), diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index feb0bf322619..2a89ea0df92d 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -288,6 +288,7 @@ pub fn with_new_children_if_necessary( /// ``` /// use datafusion::prelude::*; /// use datafusion::physical_plan::displayable; +/// use std::path::is_separator; /// /// #[tokio::main] /// async fn main() { @@ -310,11 +311,15 @@ pub fn with_new_children_if_necessary( /// let displayable_plan = displayable(physical_plan.as_ref()); /// let plan_string = format!("{}", displayable_plan.indent()); /// +/// let working_directory = std::env::current_dir().unwrap(); +/// let normalized = working_directory.to_string_lossy().replace(is_separator, "/"); +/// let plan_string = plan_string.replace(&normalized, "WORKING_DIR"); +/// /// assert_eq!("ProjectionExec: expr=[a@0 as a]\ /// \n CoalesceBatchesExec: target_batch_size=4096\ /// \n FilterExec: a@0 < 5\ /// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ -/// \n CsvExec: files=[tests/example.csv], has_header=true, limit=None, projection=[a]", +/// \n CsvExec: files=[WORKING_DIR/tests/example.csv], has_header=true, limit=None, projection=[a]", /// plan_string.trim()); /// /// let one_line = format!("{}", displayable_plan.one_line()); diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 815379de44db..19a9db9a4ed2 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -18,6 +18,7 @@ //! Common unit test utility methods use crate::arrow::array::UInt32Array; +use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::{MemTable, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; @@ -120,6 +121,7 @@ pub fn partitioned_csv_config( Ok(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_schema: schema, file_groups, statistics: Default::default(), diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index cc9c02305a76..95a6f16ed3c6 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -48,7 +48,7 @@ impl TestObjectStore { #[async_trait] impl ObjectStore for TestObjectStore { async fn list_file(&self, prefix: &str) -> Result { - let prefix = prefix.to_owned(); + let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string(); Ok(Box::pin( stream::iter( self.files diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 4297b12b6047..35eb2ca3e77e 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -20,6 +20,7 @@ use std::{fs, io, sync::Arc}; use async_trait::async_trait; +use datafusion::datasource::listing::ListingTableUrl; use datafusion::{ assert_batches_sorted_eq, datafusion_data_access::{ @@ -182,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mytable", + "file:///mytable", ); let result = ctx @@ -218,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mytable", + "file:///mytable", ); let result = ctx @@ -255,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mytable", + "file:///mytable", ); let result = ctx @@ -419,6 +420,7 @@ fn register_partitioned_aggregate_csv( let mut options = ListingOptions::new(Arc::new(CsvFormat::default())); options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); + let table_path = ListingTableUrl::parse(table_path).unwrap(); let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(options) .with_schema(file_schema); @@ -444,8 +446,12 @@ async fn register_partitioned_alltypes_parquet( options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); options.collect_stat = true; + let table_path = ListingTableUrl::parse(format!("mirror:///{}", table_path)).unwrap(); + let store_path = + ListingTableUrl::parse(format!("mirror:///{}", store_paths[0])).unwrap(); + let file_schema = options - .infer_schema(Arc::clone(&object_store), store_paths[0]) + .infer_schema(Arc::clone(&object_store), &store_path) .await .expect("Parquet schema inference failed"); @@ -487,7 +493,7 @@ impl ObjectStore for MirroringObjectStore { &self, prefix: &str, ) -> datafusion_data_access::Result { - let prefix = prefix.to_owned(); + let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string(); let size = self.file_size; Ok(Box::pin( stream::iter( diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 51ddff2fb302..947ebc699b48 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -17,6 +17,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; +use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::error::Result; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{collect, ExecutionPlan}; @@ -81,21 +82,23 @@ async fn get_exec( let meta = local_unpartitioned_file(filename); let format = ParquetFormat::default(); - let store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store_url = ObjectStoreUrl::local_filesystem(); let file_schema = format - .infer_schema(&store, &[meta.clone()]) + .infer_schema(&object_store, &[meta.clone()]) .await .expect("Schema inference"); let statistics = format - .infer_stats(&store, file_schema.clone(), &meta) + .infer_stats(&object_store, file_schema.clone(), &meta) .await .expect("Stats inference"); let file_groups = vec![vec![meta.into()]]; let exec = format .create_physical_plan( FileScanConfig { - object_store: store, + object_store, + object_store_url, file_schema, file_groups, statistics, diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index f72e0f8f8120..759edf6fc55b 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -642,7 +642,7 @@ order by #[tokio::test] async fn test_physical_plan_display_indent() { // Hard code target_partitions as it appears in the RepartitionExec output - let config = SessionConfig::new().with_target_partitions(3); + let config = SessionConfig::new().with_target_partitions(9000); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await.unwrap(); let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \ @@ -662,22 +662,21 @@ async fn test_physical_plan_display_indent() { " ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]", " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 3)", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)", " AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", " CoalesceBatchesExec: target_batch_size=4096", " FilterExec: c12@1 < CAST(10 AS Float64)", - " RepartitionExec: partitioning=RoundRobinBatch(3)", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c12]", ]; - let data_path = datafusion::test_util::arrow_test_data(); + let normalizer = ExplainNormalizer::new(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() // normalize paths - .map(|s| s.replace(&data_path, "ARROW_TEST_DATA")) + .map(|s| normalizer.normalize(s)) .collect::>(); - assert_eq!( expected, actual, "expected:\n{:#?}\nactual:\n\n{:#?}\n", @@ -688,7 +687,7 @@ async fn test_physical_plan_display_indent() { #[tokio::test] async fn test_physical_plan_display_indent_multi_children() { // Hard code target_partitions as it appears in the RepartitionExec output - let config = SessionConfig::new().with_target_partitions(3); + let config = SessionConfig::new().with_target_partitions(9000); let ctx = SessionContext::with_config(config); // ensure indenting works for nodes with multiple children register_aggregate_csv(&ctx).await.unwrap(); @@ -708,25 +707,25 @@ async fn test_physical_plan_display_indent_multi_children() { " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 0 })]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 3)", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)", " ProjectionExec: expr=[c1@0 as c1]", " ProjectionExec: expr=[c1@0 as c1]", - " RepartitionExec: partitioning=RoundRobinBatch(3)", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 3)", + " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000)", " ProjectionExec: expr=[c2@0 as c2]", " ProjectionExec: expr=[c1@0 as c2]", - " RepartitionExec: partitioning=RoundRobinBatch(3)", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]", ]; - let data_path = datafusion::test_util::arrow_test_data(); + let normalizer = ExplainNormalizer::new(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() // normalize paths - .map(|s| s.replace(&data_path, "ARROW_TEST_DATA")) + .map(|s| normalizer.normalize(s)) .collect::>(); assert_eq!( diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs index 79deaae79ab3..a74076415a49 100644 --- a/datafusion/core/tests/sql/json.rs +++ b/datafusion/core/tests/sql/json.rs @@ -96,7 +96,7 @@ async fn json_explain() { \n CoalescePartitionsExec\ \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ - \n JsonExec: limit=None, files=[tests/jsons/2.json]\n", + \n JsonExec: limit=None, files=[WORKING_DIR/tests/jsons/2.json]\n", ], ]; assert_eq!(expected, actual); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index a13f2ebffe2b..3e19dbcb990b 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -48,7 +48,9 @@ use datafusion::{execution::context::SessionContext, physical_plan::displayable} use datafusion_expr::Volatility; use std::fs::File; use std::io::Write; +use std::path::PathBuf; use tempfile::TempDir; +use url::Url; /// A macro to assert that some particular line contains two substrings /// @@ -811,25 +813,57 @@ pub fn table_with_sequence( Ok(Arc::new(MemTable::try_new(schema, partitions)?)) } -// Normalizes parts of an explain plan that vary from run to run (such as path) -fn normalize_for_explain(s: &str) -> String { - // Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv - // to ARROW_TEST_DATA/csv/aggregate_test_100.csv - let data_path = datafusion::test_util::arrow_test_data(); - let s = s.replace(&data_path, "ARROW_TEST_DATA"); +pub struct ExplainNormalizer { + replacements: Vec<(String, String)>, +} + +impl ExplainNormalizer { + fn new() -> Self { + let mut replacements = vec![]; + + let mut push_path = |path: PathBuf, key: &str| { + // Push path as is + replacements.push((path.to_string_lossy().to_string(), key.to_string())); + + // Push canonical version of path + let canonical = path.canonicalize().unwrap(); + replacements.push((canonical.to_string_lossy().to_string(), key.to_string())); + + if cfg!(target_family = "windows") { + // Push URL representation of path, to handle windows + let url = Url::from_file_path(canonical).unwrap(); + let path = url.path().strip_prefix('/').unwrap(); + replacements.push((path.to_string(), key.to_string())); + } + }; + + push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA"); + push_path(std::env::current_dir().unwrap(), "WORKING_DIR"); - // convert things like partitioning=RoundRobinBatch(16) - // to partitioning=RoundRobinBatch(NUM_CORES) - let needle = format!("RoundRobinBatch({})", num_cpus::get()); - s.replace(&needle, "RoundRobinBatch(NUM_CORES)") + // convert things like partitioning=RoundRobinBatch(16) + // to partitioning=RoundRobinBatch(NUM_CORES) + let needle = format!("RoundRobinBatch({})", num_cpus::get()); + replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string())); + + Self { replacements } + } + + fn normalize(&self, s: impl Into) -> String { + let mut s = s.into(); + for (from, to) in &self.replacements { + s = s.replace(from, to); + } + s + } } /// Applies normalize_for_explain to every line fn normalize_vec_for_explain(v: Vec>) -> Vec> { + let normalizer = ExplainNormalizer::new(); v.into_iter() .map(|l| { l.into_iter() - .map(|s| normalize_for_explain(&s)) + .map(|s| normalizer.normalize(s)) .collect::>() }) .collect::>() @@ -948,7 +982,7 @@ async fn nyc() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv( "tripdata", - "file.csv", + "file:///file.csv", CsvReadOptions::new().schema(&schema), ) .await?; diff --git a/datafusion/data-access/Cargo.toml b/datafusion/data-access/Cargo.toml index 7bf447f04cfe..8a556f1f35d9 100644 --- a/datafusion/data-access/Cargo.toml +++ b/datafusion/data-access/Cargo.toml @@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion" readme = "README.md" authors = ["Apache Arrow "] license = "Apache-2.0" -keywords = [ "arrow", "query", "sql" ] +keywords = ["arrow", "query", "sql"] edition = "2021" rust-version = "1.59" @@ -36,7 +36,6 @@ path = "src/lib.rs" async-trait = "0.1.41" chrono = { version = "0.4", default-features = false, features = ["std"] } futures = "0.3" -glob = "0.3.0" parking_lot = "0.12" tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion/data-access/src/object_store/local.rs b/datafusion/data-access/src/object_store/local.rs index 604539814f19..7b0cab7cd00c 100644 --- a/datafusion/data-access/src/object_store/local.rs +++ b/datafusion/data-access/src/object_store/local.rs @@ -317,30 +317,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn test_globbing() -> Result<()> { - let tmp = tempdir()?; - let a1_path = tmp.path().join("a1.txt"); - let a2_path = tmp.path().join("a2.txt"); - let b1_path = tmp.path().join("b1.txt"); - File::create(&a1_path)?; - File::create(&a2_path)?; - File::create(&b1_path)?; - - let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap()); - let mut all_files = HashSet::new(); - let mut files = LocalFileSystem.glob_file(&glob).await?; - while let Some(file) = files.next().await { - let file = file?; - assert_eq!(file.size(), 0); - all_files.insert(file.path().to_owned()); - } - - assert_eq!(all_files.len(), 2); - assert!(all_files.contains(a1_path.to_str().unwrap())); - assert!(all_files.contains(a2_path.to_str().unwrap())); - - Ok(()) - } } diff --git a/datafusion/data-access/src/object_store/mod.rs b/datafusion/data-access/src/object_store/mod.rs index 93a930a6dd24..496a5494fa01 100644 --- a/datafusion/data-access/src/object_store/mod.rs +++ b/datafusion/data-access/src/object_store/mod.rs @@ -21,15 +21,11 @@ pub mod local; use std::fmt::Debug; use std::io::Read; -use std::path; -use std::path::{Component, Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; -use futures::future::ready; -use futures::{AsyncRead, Stream, StreamExt, TryStreamExt}; -use glob::Pattern; +use futures::{AsyncRead, Stream}; use crate::{FileMeta, ListEntry, Result, SizedFile}; @@ -78,54 +74,6 @@ pub trait ObjectStore: Sync + Send + Debug { /// Returns all the files in path `prefix` async fn list_file(&self, prefix: &str) -> Result; - /// Calls `list_file` with a suffix filter - async fn list_file_with_suffix( - &self, - prefix: &str, - suffix: &str, - ) -> Result { - self.glob_file_with_suffix(prefix, suffix).await - } - - /// Returns all the files matching `glob_pattern` - async fn glob_file(&self, glob_pattern: &str) -> Result { - if !contains_glob_start_char(glob_pattern) { - self.list_file(glob_pattern).await - } else { - let normalized_glob_pb = normalize_path(Path::new(glob_pattern)); - let normalized_glob_pattern = - normalized_glob_pb.as_os_str().to_str().unwrap(); - let start_path = - find_longest_search_path_without_glob_pattern(normalized_glob_pattern); - let file_stream = self.list_file(&start_path).await?; - let pattern = Pattern::new(normalized_glob_pattern).unwrap(); - Ok(Box::pin(file_stream.filter(move |fr| { - let matches_pattern = match fr { - Ok(f) => pattern.matches(f.path()), - Err(_) => true, - }; - async move { matches_pattern } - }))) - } - } - - /// Calls `glob_file` with a suffix filter - async fn glob_file_with_suffix( - &self, - glob_pattern: &str, - suffix: &str, - ) -> Result { - let files_to_consider = match contains_glob_start_char(glob_pattern) { - true => self.glob_file(glob_pattern).await, - false => self.list_file(glob_pattern).await, - }?; - - match suffix.is_empty() { - true => Ok(files_to_consider), - false => filter_suffix(files_to_consider, suffix), - } - } - /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. async fn list_dir( @@ -137,144 +85,3 @@ pub trait ObjectStore: Sync + Send + Debug { /// Get object reader for one file fn file_reader(&self, file: SizedFile) -> Result>; } - -/// Normalize a path without requiring it to exist on the filesystem (path::canonicalize) -pub fn normalize_path>(path: P) -> PathBuf { - let ends_with_slash = path - .as_ref() - .to_str() - .map_or(false, |s| s.ends_with(path::MAIN_SEPARATOR)); - let mut normalized = PathBuf::new(); - for component in path.as_ref().components() { - match &component { - Component::ParentDir => { - if !normalized.pop() { - normalized.push(component); - } - } - _ => { - normalized.push(component); - } - } - } - if ends_with_slash { - normalized.push(""); - } - normalized -} - -const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; - -/// Determine whether the path contains a globbing character -fn contains_glob_start_char(path: &str) -> bool { - path.chars().any(|c| GLOB_START_CHARS.contains(&c)) -} - -/// Filters the file_stream to only contain files that end with suffix -fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> Result { - let suffix = suffix.to_owned(); - Ok(Box::pin( - file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))), - )) -} - -fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { - // in case the glob_pattern is not actually a glob pattern, take the entire thing - if !contains_glob_start_char(glob_pattern) { - glob_pattern.to_string() - } else { - // take all the components of the path (left-to-right) which do not contain a glob pattern - let components_in_glob_pattern = Path::new(glob_pattern).components(); - let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new(); - for component_in_glob_pattern in components_in_glob_pattern { - let component_as_str = - component_in_glob_pattern.as_os_str().to_str().unwrap(); - if contains_glob_start_char(component_as_str) { - break; - } - path_buf_for_longest_search_path_without_glob_pattern - .push(component_in_glob_pattern); - } - - let mut result = path_buf_for_longest_search_path_without_glob_pattern - .to_str() - .unwrap() - .to_string(); - - // when we're not at the root, append a separator - if path_buf_for_longest_search_path_without_glob_pattern - .components() - .count() - > 1 - { - result.push(path::MAIN_SEPARATOR); - } - result - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_is_glob_path() -> Result<()> { - assert!(!contains_glob_start_char("/")); - assert!(!contains_glob_start_char("/test")); - assert!(!contains_glob_start_char("/test/")); - assert!(contains_glob_start_char("/test*")); - Ok(()) - } - - fn test_longest_base_path(input: &str, expected: &str) { - assert_eq!( - find_longest_search_path_without_glob_pattern(input), - expected, - "testing find_longest_search_path_without_glob_pattern with {}", - input - ); - } - - #[tokio::test] - async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> { - // no glob patterns, thus we get the full path (as-is) - test_longest_base_path("/", "/"); - test_longest_base_path("/a.txt", "/a.txt"); - test_longest_base_path("/a", "/a"); - test_longest_base_path("/a/", "/a/"); - test_longest_base_path("/a/b", "/a/b"); - test_longest_base_path("/a/b/", "/a/b/"); - test_longest_base_path("/a/b.txt", "/a/b.txt"); - test_longest_base_path("/a/b/c.txt", "/a/b/c.txt"); - // glob patterns, thus we build the longest path (os-specific) - use path::MAIN_SEPARATOR; - test_longest_base_path("/*.txt", &format!("{MAIN_SEPARATOR}")); - test_longest_base_path( - "/a/*b.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - "/a/*/b.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - "/a/b/[123]/file*.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - "/a/b*.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - "/a/b/**/c*.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - &format!("{}/alltypes_plain*.parquet", "/a/b/c//"), // https://github.com/apache/arrow-datafusion/issues/2465 - &format!( - "{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}c{MAIN_SEPARATOR}" - ), - ); - Ok(()) - } -} diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 4993cfdce5dd..4d927339315b 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -30,7 +30,7 @@ use datafusion::{ file_format::{ avro::AvroFormat, csv::CsvFormat, parquet::ParquetFormat, FileFormat, }, - listing::{ListingOptions, ListingTable, ListingTableConfig}, + listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, }, logical_plan::{provider_as_source, source_as_provider}, }; @@ -411,6 +411,7 @@ impl AsLogicalPlan for LogicalPlanNode { FileFormatType::Avro(..) => Arc::new(AvroFormat::default()), }; + let table_path = ListingTableUrl::parse(&scan.path)?; let options = ListingOptions { file_extension: scan.file_extension.clone(), format: file_format, @@ -419,7 +420,7 @@ impl AsLogicalPlan for LogicalPlanNode { target_partitions: scan.target_partitions as usize, }; - let object_store = ctx.runtime_env().object_store(scan.path.as_str())?.0; + let object_store = ctx.runtime_env().object_store(&table_path)?; println!( "Found object store {:?} for path {}", @@ -427,7 +428,7 @@ impl AsLogicalPlan for LogicalPlanNode { scan.path.as_str() ); - let config = ListingTableConfig::new(object_store, scan.path.as_str()) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(options) .with_schema(Arc::new(schema)); @@ -758,7 +759,7 @@ impl AsLogicalPlan for LogicalPlanNode { .options() .table_partition_cols .clone(), - path: listing_table.table_path().to_owned(), + path: listing_table.table_path().to_string(), schema: Some(schema), projection, filters, diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh index 0c3922627310..8c2e687f308f 100755 --- a/dev/build-arrow-ballista.sh +++ b/dev/build-arrow-ballista.sh @@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null # clone the repo # TODO make repo/branch configurable -git clone https://github.com/apache/arrow-ballista +git clone https://github.com/tustvold/arrow-ballista -b url-refactor # update dependencies to local crates python ./dev/make-ballista-deps-local.py