Skip to content

Commit b382bb7

Browse files
committed
Extract Listing URI logic into ListingTableUri structure
1 parent 49c5b89 commit b382bb7

File tree

24 files changed

+486
-536
lines changed

24 files changed

+486
-536
lines changed

ballista/rust/core/src/serde/logical_plan/mod.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use datafusion::datasource::file_format::avro::AvroFormat;
2727
use datafusion::datasource::file_format::csv::CsvFormat;
2828
use datafusion::datasource::file_format::parquet::ParquetFormat;
2929
use datafusion::datasource::file_format::FileFormat;
30-
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
30+
use datafusion::datasource::listing::{
31+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
32+
};
3133
use datafusion::logical_plan::plan::{
3234
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
3335
};
@@ -227,24 +229,19 @@ impl AsLogicalPlan for LogicalPlanNode {
227229
target_partitions: scan.target_partitions as usize,
228230
};
229231

230-
let object_store = ctx
231-
.runtime_env()
232-
.object_store(scan.path.as_str())
233-
.map_err(|e| {
232+
let path = ListingTableUrl::parse(scan.path.as_str())?;
233+
234+
let object_store =
235+
ctx.runtime_env().object_store(&path).map_err(|e| {
234236
BallistaError::NotImplemented(format!(
235237
"No object store is registered for path {}: {:?}",
236238
scan.path, e
237239
))
238-
})?
239-
.0;
240+
})?;
240241

241-
println!(
242-
"Found object store {:?} for path {}",
243-
object_store,
244-
scan.path.as_str()
245-
);
242+
println!("Found object store {:?} for path {}", object_store, path);
246243

247-
let config = ListingTableConfig::new(object_store, scan.path.as_str())
244+
let config = ListingTableConfig::new(object_store, path)
248245
.with_listing_options(options)
249246
.with_schema(Arc::new(schema));
250247

@@ -1043,6 +1040,7 @@ mod roundtrip_tests {
10431040
use async_trait::async_trait;
10441041
use core::panic;
10451042
use datafusion::common::DFSchemaRef;
1043+
use datafusion::datasource::listing::ListingTableUrl;
10461044
use datafusion::logical_plan::source_as_provider;
10471045
use datafusion::{
10481046
arrow::datatypes::{DataType, Field, Schema},
@@ -1163,7 +1161,7 @@ mod roundtrip_tests {
11631161
vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
11641162

11651163
let plan = std::sync::Arc::new(
1166-
test_scan_csv("employee.csv", Some(vec![3, 4]))
1164+
test_scan_csv("file:///employee.csv", Some(vec![3, 4]))
11671165
.await?
11681166
.sort(vec![col("salary")])?
11691167
.build()?,
@@ -1235,13 +1233,13 @@ mod roundtrip_tests {
12351233

12361234
#[tokio::test]
12371235
async fn roundtrip_analyze() -> Result<()> {
1238-
let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
1236+
let verbose_plan = test_scan_csv("file:///employee.csv", Some(vec![3, 4]))
12391237
.await?
12401238
.sort(vec![col("salary")])?
12411239
.explain(true, true)?
12421240
.build()?;
12431241

1244-
let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
1242+
let plan = test_scan_csv("file:///employee.csv", Some(vec![3, 4]))
12451243
.await?
12461244
.sort(vec![col("salary")])?
12471245
.explain(false, true)?
@@ -1256,13 +1254,13 @@ mod roundtrip_tests {
12561254

12571255
#[tokio::test]
12581256
async fn roundtrip_explain() -> Result<()> {
1259-
let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
1257+
let verbose_plan = test_scan_csv("file:///employee.csv", Some(vec![3, 4]))
12601258
.await?
12611259
.sort(vec![col("salary")])?
12621260
.explain(true, false)?
12631261
.build()?;
12641262

1265-
let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
1263+
let plan = test_scan_csv("file:///employee.csv", Some(vec![3, 4]))
12661264
.await?
12671265
.sort(vec![col("salary")])?
12681266
.explain(false, false)?
@@ -1277,11 +1275,11 @@ mod roundtrip_tests {
12771275

12781276
#[tokio::test]
12791277
async fn roundtrip_join() -> Result<()> {
1280-
let scan_plan = test_scan_csv("employee1", Some(vec![0, 3, 4]))
1278+
let scan_plan = test_scan_csv("file:///employee1", Some(vec![0, 3, 4]))
12811279
.await?
12821280
.build()?;
12831281

1284-
let plan = test_scan_csv("employee2", Some(vec![0, 3, 4]))
1282+
let plan = test_scan_csv("file:///employee2", Some(vec![0, 3, 4]))
12851283
.await?
12861284
.join(&scan_plan, JoinType::Inner, (vec!["id"], vec!["id"]))?
12871285
.build()?;
@@ -1292,7 +1290,7 @@ mod roundtrip_tests {
12921290

12931291
#[tokio::test]
12941292
async fn roundtrip_sort() -> Result<()> {
1295-
let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
1293+
let plan = test_scan_csv("file:///employee.csv", Some(vec![3, 4]))
12961294
.await?
12971295
.sort(vec![col("salary")])?
12981296
.build()?;
@@ -1316,7 +1314,7 @@ mod roundtrip_tests {
13161314

13171315
#[tokio::test]
13181316
async fn roundtrip_logical_plan() -> Result<()> {
1319-
let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
1317+
let plan = test_scan_csv("file:///employee.csv", Some(vec![3, 4]))
13201318
.await?
13211319
.aggregate(vec![col("state")], vec![max(col("salary"))])?
13221320
.build()?;
@@ -1336,9 +1334,10 @@ mod roundtrip_tests {
13361334
ctx.runtime_env()
13371335
.register_object_store("test", custom_object_store.clone());
13381336

1339-
let (os, uri) = ctx.runtime_env().object_store("test://foo.csv")?;
1337+
let uri = ListingTableUrl::parse("test://foo.csv")?;
1338+
let os = ctx.runtime_env().object_store(&uri)?;
13401339
assert_eq!("TestObjectStore", &format!("{:?}", os));
1341-
assert_eq!("foo.csv", uri);
1340+
assert_eq!("foo.csv", uri.to_string());
13421341

13431342
let schema = test_schema();
13441343
let plan = ctx

ballista/rust/core/src/serde/physical_plan/mod.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,7 +1008,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
10081008

10091009
fn decode_scan_config(
10101010
proto: &protobuf::FileScanExecConf,
1011-
runtime: &RuntimeEnv,
1011+
_runtime: &RuntimeEnv,
10121012
) -> Result<FileScanConfig, BallistaError> {
10131013
let schema = Arc::new(convert_required!(proto.schema)?);
10141014
let projection = proto
@@ -1029,14 +1029,10 @@ fn decode_scan_config(
10291029
.map(|f| f.try_into())
10301030
.collect::<Result<Vec<_>, _>>()?;
10311031

1032-
let object_store = if let Some(file) = file_groups.get(0).and_then(|h| h.get(0)) {
1033-
runtime.object_store(file.file_meta.path())?.0
1034-
} else {
1035-
Arc::new(LocalFileSystem {})
1036-
};
1032+
// TODO: This will not roundtrip object storage correctly as it loses the scheme
10371033

10381034
Ok(FileScanConfig {
1039-
object_store,
1035+
object_store: Arc::new(LocalFileSystem {}),
10401036
file_schema: schema,
10411037
file_groups,
10421038
statistics,

datafusion-examples/examples/flight_server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use arrow_flight::SchemaAsIpc;
2222
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
2323
use datafusion::datasource::file_format::parquet::ParquetFormat;
24-
use datafusion::datasource::listing::ListingOptions;
24+
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
2525
use futures::Stream;
2626
use tonic::transport::Server;
2727
use tonic::{Request, Response, Status, Streaming};
@@ -68,9 +68,10 @@ impl FlightService for FlightServiceImpl {
6868
let request = request.into_inner();
6969

7070
let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
71+
let url = ListingTableUrl::parse(&request.path[0])?;
7172

7273
let schema = listing_options
73-
.infer_schema(Arc::new(LocalFileSystem {}), &request.path[0])
74+
.infer_schema(Arc::new(LocalFileSystem {}), url)
7475
.await
7576
.unwrap();
7677

datafusion/core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ datafusion-jit = { path = "../jit", version = "8.0.0", optional = true }
6666
datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" }
6767
datafusion-row = { path = "../row", version = "8.0.0" }
6868
futures = "0.3"
69+
glob = "0.3.0"
6970
hashbrown = { version = "0.12", features = ["raw"] }
71+
itertools = "0.10"
7072
lazy_static = { version = "^1.4.0" }
7173
log = "^0.4"
7274
num-traits = { version = "0.2", optional = true }
@@ -84,6 +86,7 @@ sqlparser = "0.17"
8486
tempfile = "3"
8587
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
8688
tokio-stream = "0.1"
89+
url = "2.2"
8790
uuid = { version = "1.0", features = ["v4"] }
8891

8992
[dev-dependencies]

datafusion/core/benches/sort_limit_query_sql.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ extern crate criterion;
2020
use criterion::Criterion;
2121
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
2222
use datafusion::datasource::file_format::csv::CsvFormat;
23-
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
23+
use datafusion::datasource::listing::{
24+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
25+
};
2426

2527
use parking_lot::Mutex;
2628
use std::sync::Arc;
@@ -64,11 +66,12 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
6466
let testdata = datafusion::test_util::arrow_test_data();
6567

6668
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
69+
let url = ListingTableUrl::parse(path).unwrap();
6770

6871
// create CSV data source
6972
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
7073

71-
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
74+
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), url)
7275
.with_listing_options(listing_options)
7376
.with_schema(schema);
7477

datafusion/core/src/catalog/schema.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::any::Any;
2323
use std::collections::HashMap;
2424
use std::sync::Arc;
2525

26-
use crate::datasource::listing::{ListingTable, ListingTableConfig};
26+
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
2727
use crate::datasource::object_store_registry::ObjectStoreRegistry;
2828
use crate::datasource::TableProvider;
2929
use crate::error::{DataFusionError, Result};
@@ -157,10 +157,7 @@ impl ObjectStoreSchemaProvider {
157157
}
158158

159159
/// Retrieves a `ObjectStore` instance by scheme
160-
pub fn object_store<'a>(
161-
&self,
162-
uri: &'a str,
163-
) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
160+
pub fn object_store(&self, uri: &ListingTableUrl) -> Result<Arc<dyn ObjectStore>> {
164161
self.object_store_registry
165162
.lock()
166163
.get_by_uri(uri)
@@ -173,13 +170,13 @@ impl ObjectStoreSchemaProvider {
173170
pub async fn register_listing_table(
174171
&self,
175172
name: &str,
176-
uri: &str,
173+
uri: ListingTableUrl,
177174
config: Option<ListingTableConfig>,
178175
) -> Result<()> {
179176
let config = match config {
180177
Some(cfg) => cfg,
181178
None => {
182-
let (object_store, _path) = self.object_store(uri)?;
179+
let object_store = self.object_store(&uri)?;
183180
ListingTableConfig::new(object_store, uri).infer().await?
184181
}
185182
};
@@ -255,6 +252,7 @@ mod tests {
255252
use crate::datasource::empty::EmptyTable;
256253
use crate::execution::context::SessionContext;
257254

255+
use crate::datasource::listing::ListingTableUrl;
258256
use futures::StreamExt;
259257

260258
#[tokio::test]
@@ -280,12 +278,13 @@ mod tests {
280278
async fn test_schema_register_listing_table() {
281279
let testdata = crate::test_util::parquet_test_data();
282280
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
281+
let uri = ListingTableUrl::parse(filename).unwrap();
283282

284283
let schema = ObjectStoreSchemaProvider::new();
285284
let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {}));
286285

287286
schema
288-
.register_listing_table("alltypes_plain", &filename, None)
287+
.register_listing_table("alltypes_plain", uri, None)
289288
.await
290289
.unwrap();
291290

@@ -338,8 +337,9 @@ mod tests {
338337
|| file == OsStr::new("alltypes_plain.parquet")
339338
{
340339
let name = path.file_stem().unwrap().to_str().unwrap();
340+
let path = ListingTableUrl::parse(&sized_file.path).unwrap();
341341
schema
342-
.register_listing_table(name, &sized_file.path, None)
342+
.register_listing_table(name, path, None)
343343
.await
344344
.unwrap();
345345
}
@@ -360,17 +360,18 @@ mod tests {
360360
async fn test_schema_register_same_listing_table() {
361361
let testdata = crate::test_util::parquet_test_data();
362362
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
363+
let uri = ListingTableUrl::parse(filename).unwrap();
363364

364365
let schema = ObjectStoreSchemaProvider::new();
365366
let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {}));
366367

367368
schema
368-
.register_listing_table("alltypes_plain", &filename, None)
369+
.register_listing_table("alltypes_plain", uri.clone(), None)
369370
.await
370371
.unwrap();
371372

372373
schema
373-
.register_listing_table("alltypes_plain", &filename, None)
374+
.register_listing_table("alltypes_plain", uri, None)
374375
.await
375376
.unwrap();
376377
}

0 commit comments

Comments
 (0)