Skip to content

Commit 5893b10

Browse files
committed
[fix] replace file format providers in ballista
1 parent c595913 commit 5893b10

File tree

31 files changed

+732
-534
lines changed

31 files changed

+732
-534
lines changed

ballista-examples/src/bin/ballista-dataframe.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ async fn main() -> Result<()> {
3333

3434
// define the query using the DataFrame trait
3535
let df = ctx
36-
.read_parquet(filename)?
36+
.read_parquet(filename)
37+
.await?
3738
.select_columns(&["id", "bool_col", "timestamp_col"])?
3839
.filter(col("id").gt(lit(1)))?;
3940

ballista-examples/src/bin/ballista-sql.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,18 @@ async fn main() -> Result<()> {
3434
"aggregate_test_100",
3535
&format!("{}/csv/aggregate_test_100.csv", testdata),
3636
CsvReadOptions::new(),
37-
)?;
37+
)
38+
.await?;
3839

3940
// execute the query
40-
let df = ctx.sql(
41-
"SELECT c1, MIN(c12), MAX(c12) \
41+
let df = ctx
42+
.sql(
43+
"SELECT c1, MIN(c12), MAX(c12) \
4244
FROM aggregate_test_100 \
4345
WHERE c11 > 0.1 AND c11 < 0.9 \
4446
GROUP BY c1",
45-
)?;
47+
)
48+
.await?;
4649

4750
// print the results
4851
df.show().await?;

ballista/rust/client/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,15 @@ async fn main() -> Result<()> {
100100
"tripdata",
101101
"/path/to/yellow_tripdata_2020-01.csv",
102102
CsvReadOptions::new(),
103-
)?;
103+
).await?;
104104
105105
// execute the query
106106
let df = ctx.sql(
107107
"SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
108108
FROM tripdata
109109
GROUP BY passenger_count
110110
ORDER BY passenger_count",
111-
)?;
111+
).await?;
112112
113113
// collect the results and print them to stdout
114114
let results = df.collect().await?;

ballista/rust/client/src/context.rs

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ use datafusion::datasource::TableProvider;
3131
use datafusion::error::{DataFusionError, Result};
3232
use datafusion::execution::dataframe_impl::DataFrameImpl;
3333
use datafusion::logical_plan::LogicalPlan;
34-
use datafusion::physical_plan::avro::AvroReadOptions;
35-
use datafusion::physical_plan::csv::CsvReadOptions;
34+
use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
3635
use datafusion::sql::parser::FileType;
3736

3837
struct BallistaContextState {
@@ -128,11 +127,11 @@ impl BallistaContext {
128127
}
129128

130129
/// Create a DataFrame representing an Avro table scan
131-
132-
pub fn read_avro(
130+
/// TODO fetch schema from scheduler instead of resolving locally
131+
pub async fn read_avro(
133132
&self,
134133
path: &str,
135-
options: AvroReadOptions,
134+
options: AvroReadOptions<'_>,
136135
) -> Result<Arc<dyn DataFrame>> {
137136
// convert to absolute path because the executor likely has a different working directory
138137
let path = PathBuf::from(path);
@@ -147,13 +146,13 @@ impl BallistaContext {
147146
guard.config(),
148147
)
149148
};
150-
let df = ctx.read_avro(path.to_str().unwrap(), options)?;
149+
let df = ctx.read_avro(path.to_str().unwrap(), options).await?;
151150
Ok(df)
152151
}
153152

154153
/// Create a DataFrame representing a Parquet table scan
155-
156-
pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
154+
/// TODO fetch schema from scheduler instead of resolving locally
155+
pub async fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
157156
// convert to absolute path because the executor likely has a different working directory
158157
let path = PathBuf::from(path);
159158
let path = fs::canonicalize(&path)?;
@@ -167,16 +166,16 @@ impl BallistaContext {
167166
guard.config(),
168167
)
169168
};
170-
let df = ctx.read_parquet(path.to_str().unwrap())?;
169+
let df = ctx.read_parquet(path.to_str().unwrap()).await?;
171170
Ok(df)
172171
}
173172

174173
/// Create a DataFrame representing a CSV table scan
175-
176-
pub fn read_csv(
174+
/// TODO fetch schema from scheduler instead of resolving locally
175+
pub async fn read_csv(
177176
&self,
178177
path: &str,
179-
options: CsvReadOptions,
178+
options: CsvReadOptions<'_>,
180179
) -> Result<Arc<dyn DataFrame>> {
181180
// convert to absolute path because the executor likely has a different working directory
182181
let path = PathBuf::from(path);
@@ -191,7 +190,7 @@ impl BallistaContext {
191190
guard.config(),
192191
)
193192
};
194-
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
193+
let df = ctx.read_csv(path.to_str().unwrap(), options).await?;
195194
Ok(df)
196195
}
197196

@@ -206,39 +205,41 @@ impl BallistaContext {
206205
Ok(())
207206
}
208207

209-
pub fn register_csv(
208+
pub async fn register_csv(
210209
&self,
211210
name: &str,
212211
path: &str,
213-
options: CsvReadOptions,
212+
options: CsvReadOptions<'_>,
214213
) -> Result<()> {
215-
match self.read_csv(path, options)?.to_logical_plan() {
214+
match self.read_csv(path, options).await?.to_logical_plan() {
216215
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
217216
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
218217
}
219218
}
220219

221-
pub fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
222-
match self.read_parquet(path)?.to_logical_plan() {
220+
pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
221+
match self.read_parquet(path).await?.to_logical_plan() {
223222
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
224223
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
225224
}
226225
}
227226

228-
pub fn register_avro(
227+
pub async fn register_avro(
229228
&self,
230229
name: &str,
231230
path: &str,
232-
options: AvroReadOptions,
231+
options: AvroReadOptions<'_>,
233232
) -> Result<()> {
234-
match self.read_avro(path, options)?.to_logical_plan() {
233+
match self.read_avro(path, options).await?.to_logical_plan() {
235234
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
236235
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
237236
}
238237
}
239238

240-
/// Create a DataFrame from a SQL statement
241-
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
239+
/// Create a DataFrame from a SQL statement.
240+
///
241+
/// Async because CreateExternalTable might require to resolve the schema
242+
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
242243
let mut ctx = {
243244
let state = self.state.lock().unwrap();
244245
create_df_ctx_with_ballista_query_planner(
@@ -275,15 +276,17 @@ impl BallistaContext {
275276
CsvReadOptions::new()
276277
.schema(&schema.as_ref().to_owned().into())
277278
.has_header(*has_header),
278-
)?;
279+
)
280+
.await?;
279281
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
280282
}
281283
FileType::Parquet => {
282-
self.register_parquet(name, location)?;
284+
self.register_parquet(name, location).await?;
283285
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
284286
}
285287
FileType::Avro => {
286-
self.register_avro(name, location, AvroReadOptions::default())?;
288+
self.register_avro(name, location, AvroReadOptions::default())
289+
.await?;
287290
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
288291
}
289292
_ => Err(DataFusionError::NotImplemented(format!(
@@ -292,7 +295,7 @@ impl BallistaContext {
292295
))),
293296
},
294297

295-
_ => ctx.sql(sql),
298+
_ => ctx.sql(sql).await,
296299
}
297300
}
298301
}
@@ -306,7 +309,7 @@ mod tests {
306309
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
307310
.await
308311
.unwrap();
309-
let df = context.sql("SELECT 1;").unwrap();
312+
let df = context.sql("SELECT 1;").await.unwrap();
310313
df.collect().await.unwrap();
311314
}
312315
}

ballista/rust/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ sqlparser = "0.11.0"
4141
tokio = "1.0"
4242
tonic = "0.5"
4343
uuid = { version = "0.8", features = ["v4"] }
44+
chrono = "0.4"
4445

4546
arrow-flight = { version = "^5.3" }
4647

ballista/rust/core/proto/ballista.proto

Lines changed: 46 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ syntax = "proto3";
2020

2121
package ballista.protobuf;
2222

23+
import "google/protobuf/timestamp.proto";
24+
2325
option java_multiple_files = true;
2426
option java_package = "org.ballistacompute.protobuf";
2527
option java_outer_classname = "BallistaProto";
@@ -239,8 +241,7 @@ message SortExprNode {
239241
// LogicalPlan is a nested type
240242
message LogicalPlanNode {
241243
oneof LogicalPlanType {
242-
CsvTableScanNode csv_scan = 1;
243-
ParquetTableScanNode parquet_scan = 2;
244+
ListingTableScanNode listing_scan = 1;
244245
ProjectionNode projection = 3;
245246
SelectionNode selection = 4;
246247
LimitNode limit = 5;
@@ -254,25 +255,13 @@ message LogicalPlanNode {
254255
WindowNode window = 13;
255256
AnalyzeNode analyze = 14;
256257
CrossJoinNode cross_join = 15;
257-
AvroTableScanNode avro_scan = 16;
258258
}
259259
}
260260

261261
message ProjectionColumns {
262262
repeated string columns = 1;
263263
}
264264

265-
message CsvTableScanNode {
266-
string table_name = 1;
267-
string path = 2;
268-
bool has_header = 3;
269-
string delimiter = 4;
270-
string file_extension = 5;
271-
ProjectionColumns projection = 6;
272-
Schema schema = 7;
273-
repeated LogicalExprNode filters = 8;
274-
}
275-
276265
message Statistics {
277266
int64 num_rows = 1;
278267
int64 total_byte_size = 2;
@@ -282,30 +271,36 @@ message Statistics {
282271

283272
message PartitionedFile {
284273
string path = 1;
285-
Statistics statistics = 2;
274+
uint64 size = 2;
275+
uint64 last_modified_ns = 3;
286276
}
287277

288-
message TableDescriptor {
289-
string path = 1;
290-
repeated PartitionedFile partition_files = 2;
291-
Schema schema = 3;
278+
message CsvFormat {
279+
bool has_header = 1;
280+
string delimiter = 2;
292281
}
293282

294-
message ParquetTableScanNode {
295-
string table_name = 1;
296-
TableDescriptor table_desc = 2;
297-
ProjectionColumns projection = 3;
298-
repeated LogicalExprNode filters = 4;
299-
uint32 target_partitions = 5;
283+
message ParquetFormat {
284+
bool enable_pruning = 1;
300285
}
301286

302-
message AvroTableScanNode {
287+
message AvroFormat {}
288+
289+
message ListingTableScanNode {
303290
string table_name = 1;
304291
string path = 2;
305292
string file_extension = 3;
306293
ProjectionColumns projection = 4;
307294
Schema schema = 5;
308295
repeated LogicalExprNode filters = 6;
296+
repeated string partitions = 7;
297+
bool collect_stat = 8;
298+
uint32 target_partitions = 9;
299+
oneof FileFormatType {
300+
CsvFormat csv = 10;
301+
ParquetFormat parquet = 11;
302+
AvroFormat avro = 12;
303+
}
309304
}
310305

311306
message ProjectionNode {
@@ -598,40 +593,42 @@ message FilterExecNode {
598593
PhysicalExprNode expr = 2;
599594
}
600595

601-
message ParquetPartition {
602-
uint32 index = 1;
603-
repeated PartitionedFile files = 2;
596+
message FilePartition {
597+
repeated PartitionedFile files = 1;
598+
}
599+
600+
message ScanLimit {
601+
// wrap into a message to make it optional
602+
uint32 limit = 1;
604603
}
605604

606605
message ParquetScanExecNode {
607-
repeated ParquetPartition partitions = 1;
606+
repeated FilePartition partitions = 1;
608607
Schema schema = 2;
609-
repeated uint32 projection = 3;
610608
uint32 batch_size = 4;
609+
repeated uint32 projection = 6;
610+
ScanLimit limit = 7;
611+
Statistics statistics = 8;
611612
}
612613

613614
message CsvScanExecNode {
614-
string path = 1;
615-
repeated uint32 projection = 2;
616-
Schema schema = 3;
617-
string file_extension = 4;
618-
bool has_header = 5;
619-
uint32 batch_size = 6;
620-
string delimiter = 7;
621-
622-
// partition filenames
623-
repeated string filename = 8;
615+
repeated PartitionedFile files = 1;
616+
Schema schema = 2;
617+
bool has_header = 3;
618+
uint32 batch_size = 4;
619+
string delimiter = 5;
620+
repeated uint32 projection = 6;
621+
ScanLimit limit = 7;
622+
Statistics statistics = 8;
624623
}
625624

626625
message AvroScanExecNode {
627-
string path = 1;
628-
repeated uint32 projection = 2;
629-
Schema schema = 3;
630-
string file_extension = 4;
631-
uint32 batch_size = 5;
632-
633-
// partition filenames
634-
repeated string filename = 8;
626+
repeated PartitionedFile files = 1;
627+
Schema schema = 2;
628+
uint32 batch_size = 4;
629+
repeated uint32 projection = 6;
630+
ScanLimit limit = 7;
631+
Statistics statistics = 8;
635632
}
636633

637634
enum PartitionMode {
@@ -946,7 +943,6 @@ message GetFileMetadataParams {
946943

947944
message GetFileMetadataResult {
948945
Schema schema = 1;
949-
repeated FilePartitionMetadata partitions = 2;
950946
}
951947

952948
message FilePartitionMetadata {

0 commit comments

Comments
 (0)