Skip to content

Commit 2454e46

Browse files
authored
Reorganize table providers by table format (apache#1010)
* [feat] stubs for provider re-organization * [feat] implement infer_schema to make test pass * [wip] trying to implement pruned_partition_list * [typo] * [fix] replace enum with trait for extensibility * [fix] add partition cols to infered schema * [feat] forked file format executors avro still missing * [doc] comments about why we are flattening * [test] migrated tests to file formats * [test] improve listing test * [feat] add avro to refactored format providers * [fix] remove try from new when unnecessary * [fix] remove try_ from ListingTable new * [refacto] renamed format module to file_format also removed statistics from the PartitionedFile abstraction * [fix] removed Ballista stubs * [fix] rename create_executor * [feat] added store * [fix] Clippy * [test] improve file_format tests with limit * [fix] limit file system read size * [fix] avoid fetching unnecessary stats after limit * [fix] improve readability * [doc] improve comments * [refacto] keep async reader stub * [doc] cleanup comments * [test] test file listing * [fix] add last_modified back * [refacto] simplify csv reader exec * [refacto] change SizedFile back to FileMeta * [doc] comment clarification * [fix] avoid keeping object store as field * [refacto] grouped params to avoid too_many_arguments * [fix] get_by_uri also returns path * [fix] ListingTable at store level instead of registry * [fix] builder take self and not ref to self * Replace file format providers (#2) * [fix] replace file format providers in datafusion * [lint] clippy * [fix] replace file format providers in ballista * [fix] await in python wrapper * [doc] clearer doc about why sql() is async * [doc] typos and clarity * [fix] missing await after rebase
1 parent d2d47d3 commit 2454e46

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+5116
-4462
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ use datafusion::arrow::record_batch::RecordBatch;
7676
async fn main() -> datafusion::error::Result<()> {
7777
// register the table
7878
let mut ctx = ExecutionContext::new();
79-
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?;
79+
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;
8080

8181
// create a plan to run a SQL query
82-
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;
82+
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;
8383

8484
// execute and print results
8585
df.show().await?;
@@ -98,7 +98,7 @@ use datafusion::arrow::record_batch::RecordBatch;
9898
async fn main() -> datafusion::error::Result<()> {
9999
// create the dataframe
100100
let mut ctx = ExecutionContext::new();
101-
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
101+
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
102102

103103
let df = df.filter(col("a").lt_eq(col("b")))?
104104
.aggregate(vec![col("a")], vec![min(col("b"))])?;

ballista-examples/src/bin/ballista-dataframe.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ async fn main() -> Result<()> {
3333

3434
// define the query using the DataFrame trait
3535
let df = ctx
36-
.read_parquet(filename)?
36+
.read_parquet(filename)
37+
.await?
3738
.select_columns(&["id", "bool_col", "timestamp_col"])?
3839
.filter(col("id").gt(lit(1)))?;
3940

ballista-examples/src/bin/ballista-sql.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,18 @@ async fn main() -> Result<()> {
3434
"aggregate_test_100",
3535
&format!("{}/csv/aggregate_test_100.csv", testdata),
3636
CsvReadOptions::new(),
37-
)?;
37+
)
38+
.await?;
3839

3940
// execute the query
40-
let df = ctx.sql(
41-
"SELECT c1, MIN(c12), MAX(c12) \
41+
let df = ctx
42+
.sql(
43+
"SELECT c1, MIN(c12), MAX(c12) \
4244
FROM aggregate_test_100 \
4345
WHERE c11 > 0.1 AND c11 < 0.9 \
4446
GROUP BY c1",
45-
)?;
47+
)
48+
.await?;
4649

4750
// print the results
4851
df.show().await?;

ballista/rust/client/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,15 @@ async fn main() -> Result<()> {
104104
"tripdata",
105105
"/path/to/yellow_tripdata_2020-01.csv",
106106
CsvReadOptions::new(),
107-
)?;
107+
).await?;
108108
109109
// execute the query
110110
let df = ctx.sql(
111111
"SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
112112
FROM tripdata
113113
GROUP BY passenger_count
114114
ORDER BY passenger_count",
115-
)?;
115+
).await?;
116116
117117
// collect the results and print them to stdout
118118
let results = df.collect().await?;

ballista/rust/client/src/context.rs

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ use datafusion::datasource::TableProvider;
3131
use datafusion::error::{DataFusionError, Result};
3232
use datafusion::execution::dataframe_impl::DataFrameImpl;
3333
use datafusion::logical_plan::LogicalPlan;
34-
use datafusion::physical_plan::avro::AvroReadOptions;
35-
use datafusion::physical_plan::csv::CsvReadOptions;
34+
use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
3635
use datafusion::sql::parser::FileType;
3736

3837
struct BallistaContextState {
@@ -128,11 +127,11 @@ impl BallistaContext {
128127
}
129128

130129
/// Create a DataFrame representing an Avro table scan
131-
132-
pub fn read_avro(
130+
/// TODO fetch schema from scheduler instead of resolving locally
131+
pub async fn read_avro(
133132
&self,
134133
path: &str,
135-
options: AvroReadOptions,
134+
options: AvroReadOptions<'_>,
136135
) -> Result<Arc<dyn DataFrame>> {
137136
// convert to absolute path because the executor likely has a different working directory
138137
let path = PathBuf::from(path);
@@ -147,13 +146,13 @@ impl BallistaContext {
147146
guard.config(),
148147
)
149148
};
150-
let df = ctx.read_avro(path.to_str().unwrap(), options)?;
149+
let df = ctx.read_avro(path.to_str().unwrap(), options).await?;
151150
Ok(df)
152151
}
153152

154153
/// Create a DataFrame representing a Parquet table scan
155-
156-
pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
154+
/// TODO fetch schema from scheduler instead of resolving locally
155+
pub async fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
157156
// convert to absolute path because the executor likely has a different working directory
158157
let path = PathBuf::from(path);
159158
let path = fs::canonicalize(&path)?;
@@ -167,16 +166,16 @@ impl BallistaContext {
167166
guard.config(),
168167
)
169168
};
170-
let df = ctx.read_parquet(path.to_str().unwrap())?;
169+
let df = ctx.read_parquet(path.to_str().unwrap()).await?;
171170
Ok(df)
172171
}
173172

174173
/// Create a DataFrame representing a CSV table scan
175-
176-
pub fn read_csv(
174+
/// TODO fetch schema from scheduler instead of resolving locally
175+
pub async fn read_csv(
177176
&self,
178177
path: &str,
179-
options: CsvReadOptions,
178+
options: CsvReadOptions<'_>,
180179
) -> Result<Arc<dyn DataFrame>> {
181180
// convert to absolute path because the executor likely has a different working directory
182181
let path = PathBuf::from(path);
@@ -191,7 +190,7 @@ impl BallistaContext {
191190
guard.config(),
192191
)
193192
};
194-
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
193+
let df = ctx.read_csv(path.to_str().unwrap(), options).await?;
195194
Ok(df)
196195
}
197196

@@ -206,39 +205,42 @@ impl BallistaContext {
206205
Ok(())
207206
}
208207

209-
pub fn register_csv(
208+
pub async fn register_csv(
210209
&self,
211210
name: &str,
212211
path: &str,
213-
options: CsvReadOptions,
212+
options: CsvReadOptions<'_>,
214213
) -> Result<()> {
215-
match self.read_csv(path, options)?.to_logical_plan() {
214+
match self.read_csv(path, options).await?.to_logical_plan() {
216215
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
217216
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
218217
}
219218
}
220219

221-
pub fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
222-
match self.read_parquet(path)?.to_logical_plan() {
220+
pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
221+
match self.read_parquet(path).await?.to_logical_plan() {
223222
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
224223
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
225224
}
226225
}
227226

228-
pub fn register_avro(
227+
pub async fn register_avro(
229228
&self,
230229
name: &str,
231230
path: &str,
232-
options: AvroReadOptions,
231+
options: AvroReadOptions<'_>,
233232
) -> Result<()> {
234-
match self.read_avro(path, options)?.to_logical_plan() {
233+
match self.read_avro(path, options).await?.to_logical_plan() {
235234
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
236235
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
237236
}
238237
}
239238

240-
/// Create a DataFrame from a SQL statement
241-
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
239+
/// Create a DataFrame from a SQL statement.
240+
///
241+
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
242+
/// might require the schema to be inferred.
243+
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
242244
let mut ctx = {
243245
let state = self.state.lock().unwrap();
244246
create_df_ctx_with_ballista_query_planner(
@@ -275,15 +277,17 @@ impl BallistaContext {
275277
CsvReadOptions::new()
276278
.schema(&schema.as_ref().to_owned().into())
277279
.has_header(*has_header),
278-
)?;
280+
)
281+
.await?;
279282
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
280283
}
281284
FileType::Parquet => {
282-
self.register_parquet(name, location)?;
285+
self.register_parquet(name, location).await?;
283286
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
284287
}
285288
FileType::Avro => {
286-
self.register_avro(name, location, AvroReadOptions::default())?;
289+
self.register_avro(name, location, AvroReadOptions::default())
290+
.await?;
287291
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
288292
}
289293
_ => Err(DataFusionError::NotImplemented(format!(
@@ -292,7 +296,7 @@ impl BallistaContext {
292296
))),
293297
},
294298

295-
_ => ctx.sql(sql),
299+
_ => ctx.sql(sql).await,
296300
}
297301
}
298302
}
@@ -306,7 +310,7 @@ mod tests {
306310
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
307311
.await
308312
.unwrap();
309-
let df = context.sql("SELECT 1;").unwrap();
313+
let df = context.sql("SELECT 1;").await.unwrap();
310314
df.collect().await.unwrap();
311315
}
312316
}

ballista/rust/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ sqlparser = "0.11.0"
4141
tokio = "1.0"
4242
tonic = "0.5"
4343
uuid = { version = "0.8", features = ["v4"] }
44+
chrono = "0.4"
4445

4546
arrow-flight = { version = "^5.3" }
4647

0 commit comments

Comments
 (0)