Skip to content

Commit 4314a3b

Browse files
committed
Extract Listing URI logic into ListingTableUri structure
1 parent 9ea7dc6 commit 4314a3b

File tree

23 files changed

+463
-508
lines changed

23 files changed

+463
-508
lines changed

benchmarks/src/bin/tpch.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use datafusion::{
5050

5151
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
5252
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
53+
use datafusion::datasource::listing::ListingTableUrl;
5354
use serde::Serialize;
5455
use structopt::StructOpt;
5556

@@ -425,7 +426,8 @@ fn get_table(
425426
table_partition_cols: vec![],
426427
};
427428

428-
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
429+
let uri = ListingTableUrl::parse(path)?;
430+
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), uri)
429431
.with_listing_options(options)
430432
.with_schema(schema);
431433

datafusion-examples/examples/flight_server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use arrow_flight::SchemaAsIpc;
2222
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
2323
use datafusion::datasource::file_format::parquet::ParquetFormat;
24-
use datafusion::datasource::listing::ListingOptions;
24+
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
2525
use futures::Stream;
2626
use tonic::transport::Server;
2727
use tonic::{Request, Response, Status, Streaming};
@@ -68,9 +68,10 @@ impl FlightService for FlightServiceImpl {
6868
let request = request.into_inner();
6969

7070
let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
71+
let url = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;
7172

7273
let schema = listing_options
73-
.infer_schema(Arc::new(LocalFileSystem {}), &request.path[0])
74+
.infer_schema(Arc::new(LocalFileSystem {}), &url)
7475
.await
7576
.unwrap();
7677

datafusion/core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" }
6767
datafusion-row = { path = "../row", version = "8.0.0" }
6868
datafusion-sql = { path = "../sql", version = "8.0.0" }
6969
futures = "0.3"
70+
glob = "0.3.0"
7071
hashbrown = { version = "0.12", features = ["raw"] }
72+
itertools = "0.10"
7173
lazy_static = { version = "^1.4.0" }
7274
log = "^0.4"
7375
num-traits = { version = "0.2", optional = true }
@@ -85,6 +87,7 @@ sqlparser = "0.17"
8587
tempfile = "3"
8688
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
8789
tokio-stream = "0.1"
90+
url = "2.2"
8891
uuid = { version = "1.0", features = ["v4"] }
8992

9093
[dev-dependencies]

datafusion/core/benches/sort_limit_query_sql.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ extern crate criterion;
2020
use criterion::Criterion;
2121
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
2222
use datafusion::datasource::file_format::csv::CsvFormat;
23-
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
23+
use datafusion::datasource::listing::{
24+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
25+
};
2426

2527
use parking_lot::Mutex;
2628
use std::sync::Arc;
@@ -64,11 +66,12 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
6466
let testdata = datafusion::test_util::arrow_test_data();
6567

6668
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
69+
let url = ListingTableUrl::parse(path).unwrap();
6770

6871
// create CSV data source
6972
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
7073

71-
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
74+
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), url)
7275
.with_listing_options(listing_options)
7376
.with_schema(schema);
7477

datafusion/core/src/catalog/schema.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::any::Any;
2323
use std::collections::HashMap;
2424
use std::sync::Arc;
2525

26-
use crate::datasource::listing::{ListingTable, ListingTableConfig};
26+
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
2727
use crate::datasource::object_store_registry::ObjectStoreRegistry;
2828
use crate::datasource::TableProvider;
2929
use crate::error::{DataFusionError, Result};
@@ -157,10 +157,7 @@ impl ObjectStoreSchemaProvider {
157157
}
158158

159159
/// Retrieves a `ObjectStore` instance by scheme
160-
pub fn object_store<'a>(
161-
&self,
162-
uri: &'a str,
163-
) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
160+
pub fn object_store(&self, uri: &ListingTableUrl) -> Result<Arc<dyn ObjectStore>> {
164161
self.object_store_registry
165162
.lock()
166163
.get_by_uri(uri)
@@ -173,13 +170,13 @@ impl ObjectStoreSchemaProvider {
173170
pub async fn register_listing_table(
174171
&self,
175172
name: &str,
176-
uri: &str,
173+
uri: ListingTableUrl,
177174
config: Option<ListingTableConfig>,
178175
) -> Result<()> {
179176
let config = match config {
180177
Some(cfg) => cfg,
181178
None => {
182-
let (object_store, _path) = self.object_store(uri)?;
179+
let object_store = self.object_store(&uri)?;
183180
ListingTableConfig::new(object_store, uri).infer().await?
184181
}
185182
};
@@ -255,6 +252,7 @@ mod tests {
255252
use crate::datasource::empty::EmptyTable;
256253
use crate::execution::context::SessionContext;
257254

255+
use crate::datasource::listing::ListingTableUrl;
258256
use futures::StreamExt;
259257

260258
#[tokio::test]
@@ -280,12 +278,13 @@ mod tests {
280278
async fn test_schema_register_listing_table() {
281279
let testdata = crate::test_util::parquet_test_data();
282280
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
281+
let uri = ListingTableUrl::parse(filename).unwrap();
283282

284283
let schema = ObjectStoreSchemaProvider::new();
285284
let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {}));
286285

287286
schema
288-
.register_listing_table("alltypes_plain", &filename, None)
287+
.register_listing_table("alltypes_plain", uri, None)
289288
.await
290289
.unwrap();
291290

@@ -338,8 +337,9 @@ mod tests {
338337
|| file == OsStr::new("alltypes_plain.parquet")
339338
{
340339
let name = path.file_stem().unwrap().to_str().unwrap();
340+
let path = ListingTableUrl::parse(&sized_file.path).unwrap();
341341
schema
342-
.register_listing_table(name, &sized_file.path, None)
342+
.register_listing_table(name, path, None)
343343
.await
344344
.unwrap();
345345
}
@@ -360,17 +360,18 @@ mod tests {
360360
async fn test_schema_register_same_listing_table() {
361361
let testdata = crate::test_util::parquet_test_data();
362362
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
363+
let uri = ListingTableUrl::parse(filename).unwrap();
363364

364365
let schema = ObjectStoreSchemaProvider::new();
365366
let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {}));
366367

367368
schema
368-
.register_listing_table("alltypes_plain", &filename, None)
369+
.register_listing_table("alltypes_plain", uri.clone(), None)
369370
.await
370371
.unwrap();
371372

372373
schema
373-
.register_listing_table("alltypes_plain", &filename, None)
374+
.register_listing_table("alltypes_plain", uri, None)
374375
.await
375376
.unwrap();
376377
}

0 commit comments

Comments
 (0)