File tree 13 files changed +23
-31
lines changed
core/src/serde/physical_plan 13 files changed +23
-31
lines changed Original file line number Diff line number Diff line change @@ -142,7 +142,7 @@ impl BallistaContext {
142
142
let path = fs:: canonicalize ( & path) ?;
143
143
144
144
// use local DataFusion context for now but later this might call the scheduler
145
- let mut ctx = create_datafusion_context ( & self . state . lock ( ) . unwrap ( ) . config ( ) ) ;
145
+ let mut ctx = create_datafusion_context ( self . state . lock ( ) . unwrap ( ) . config ( ) ) ;
146
146
let df = ctx. read_parquet ( path. to_str ( ) . unwrap ( ) ) ?;
147
147
Ok ( df)
148
148
}
@@ -159,7 +159,7 @@ impl BallistaContext {
159
159
let path = fs:: canonicalize ( & path) ?;
160
160
161
161
// use local DataFusion context for now but later this might call the scheduler
162
- let mut ctx = create_datafusion_context ( & self . state . lock ( ) . unwrap ( ) . config ( ) ) ;
162
+ let mut ctx = create_datafusion_context ( self . state . lock ( ) . unwrap ( ) . config ( ) ) ;
163
163
let df = ctx. read_csv ( path. to_str ( ) . unwrap ( ) , options) ?;
164
164
Ok ( df)
165
165
}
@@ -193,7 +193,7 @@ impl BallistaContext {
193
193
// use local DataFusion context for now but later this might call the scheduler
194
194
// register tables
195
195
let state = self . state . lock ( ) . unwrap ( ) ;
196
- let mut ctx = create_datafusion_context ( & state. config ( ) ) ;
196
+ let mut ctx = create_datafusion_context ( state. config ( ) ) ;
197
197
for ( name, plan) in & state. tables {
198
198
let plan = ctx. optimize ( plan) ?;
199
199
let execution_plan = ctx. create_physical_plan ( & plan) ?;
Original file line number Diff line number Diff line change @@ -131,9 +131,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
131
131
}
132
132
PhysicalPlanType :: ParquetScan ( scan) => {
133
133
let projection = scan. projection . iter ( ) . map ( |i| * i as usize ) . collect ( ) ;
134
- let path: & str = & scan. filename [ 0 ] . as_str ( ) ;
134
+ let path: & str = scan. filename [ 0 ] . as_str ( ) ;
135
135
Ok ( Arc :: new ( ParquetExec :: try_from_path (
136
- & path,
136
+ path,
137
137
Some ( projection) ,
138
138
None ,
139
139
scan. batch_size as usize ,
Original file line number Diff line number Diff line change @@ -259,7 +259,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
259
259
let filenames = exec
260
260
. partitions ( )
261
261
. iter ( )
262
- . flat_map ( |part| part. filenames ( ) . to_owned ( ) )
262
+ . flat_map ( |part| part. filenames ( ) )
263
263
. collect ( ) ;
264
264
Ok ( protobuf:: PhysicalPlanNode {
265
265
physical_plan_type : Some ( PhysicalPlanType :: ParquetScan (
Original file line number Diff line number Diff line change @@ -316,7 +316,7 @@ impl SchedulerState {
316
316
) )
317
317
. unwrap ( ) ;
318
318
let task_is_dead = self
319
- . reschedule_dead_task ( & referenced_task, & executors)
319
+ . reschedule_dead_task ( referenced_task, & executors)
320
320
. await ?;
321
321
if task_is_dead {
322
322
continue ' tasks;
Original file line number Diff line number Diff line change @@ -484,7 +484,7 @@ fn get_table(
484
484
let path = format ! ( "{}/{}" , path, table) ;
485
485
Ok ( Arc :: new ( ParquetTable :: try_new (
486
486
& path,
487
- create_datafusion_context_concurrency ( max_concurrency) ,
487
+ ExecutionContext :: with_concurrency ( max_concurrency) ,
488
488
) ?) )
489
489
}
490
490
other => {
@@ -493,11 +493,6 @@ fn get_table(
493
493
}
494
494
}
495
495
496
- fn create_datafusion_context_concurrency ( n : usize ) -> ExecutionContext {
497
- let config = ExecutionConfig :: new ( ) . with_concurrency ( n) ;
498
- ExecutionContext :: with_config ( config)
499
- }
500
-
501
496
fn get_schema ( table : & str ) -> Schema {
502
497
// note that the schema intentionally uses signed integers so that any generated Parquet
503
498
// files can also be used to benchmark tools that only support signed integers, such as
Original file line number Diff line number Diff line change @@ -66,8 +66,8 @@ impl CsvFile {
66
66
let schema = Arc :: new ( match options. schema {
67
67
Some ( s) => s. clone ( ) ,
68
68
None => {
69
- let filenames =
70
- LocalFileSystem { } . list_all_files ( & path, options. file_extension ) ?;
69
+ let filenames = LocalFileSystem
70
+ . list_all_files ( path. as_str ( ) , options. file_extension ) ?;
71
71
if filenames. is_empty ( ) {
72
72
return Err ( DataFusionError :: Plan ( format ! (
73
73
"No files found at {path} with file extension {file_extension}" ,
Original file line number Diff line number Diff line change @@ -59,7 +59,7 @@ impl NdJsonFile {
59
59
schema
60
60
} else {
61
61
let filenames =
62
- LocalFileSystem { } . list_all_files ( & path, options. file_extension ) ?;
62
+ LocalFileSystem . list_all_files ( path, options. file_extension ) ?;
63
63
if filenames. is_empty ( ) {
64
64
return Err ( DataFusionError :: Plan ( format ! (
65
65
"No files found at {path} with file extension {file_extension}" ,
Original file line number Diff line number Diff line change @@ -33,7 +33,7 @@ pub struct LocalFileSystem;
33
33
34
34
impl ObjectStore for LocalFileSystem {
35
35
fn as_any ( & self ) -> & dyn Any {
36
- return self ;
36
+ self
37
37
}
38
38
39
39
fn list_all_files ( & self , path : & str , ext : & str ) -> Result < Vec < String > > {
Original file line number Diff line number Diff line change @@ -78,7 +78,7 @@ impl ObjectStoreRegistry {
78
78
store : Arc < dyn ObjectStore > ,
79
79
) -> Option < Arc < dyn ObjectStore > > {
80
80
let mut stores = self . object_stores . write ( ) . unwrap ( ) ;
81
- stores. insert ( scheme. to_string ( ) , store)
81
+ stores. insert ( scheme, store)
82
82
}
83
83
84
84
/// Get the store registered for scheme
Original file line number Diff line number Diff line change @@ -58,7 +58,7 @@ impl ParquetTable {
58
58
let max_concurrency = context. state . lock ( ) . unwrap ( ) . config . concurrency ;
59
59
let root_desc = ParquetRootDesc :: new ( path. as_str ( ) , context) ;
60
60
Ok ( Self {
61
- path : path . clone ( ) ,
61
+ path,
62
62
desc : Arc :: new ( root_desc?) ,
63
63
max_concurrency,
64
64
enable_pruning : true ,
@@ -222,7 +222,7 @@ impl SourceRootDescBuilder for ParquetRootDesc {
222
222
} ;
223
223
224
224
Ok ( PartitionedFile {
225
- file_path : file_path . clone ( ) ,
225
+ file_path,
226
226
schema,
227
227
statistics,
228
228
partition_value : None ,
Original file line number Diff line number Diff line change @@ -143,8 +143,7 @@ impl CsvExec {
143
143
) -> Result < Self > {
144
144
let file_extension = String :: from ( options. file_extension ) ;
145
145
146
- let filenames =
147
- LocalFileSystem { } . list_all_files ( & path, options. file_extension ) ?;
146
+ let filenames = LocalFileSystem . list_all_files ( path, options. file_extension ) ?;
148
147
if filenames. is_empty ( ) {
149
148
return Err ( DataFusionError :: Execution ( format ! (
150
149
"No files found at {path} with file extension {file_extension}" ,
Original file line number Diff line number Diff line change @@ -89,8 +89,7 @@ impl NdJsonExec {
89
89
) -> Result < Self > {
90
90
let file_extension = options. file_extension . to_string ( ) ;
91
91
92
- let filenames =
93
- LocalFileSystem { } . list_all_files ( & path, options. file_extension ) ?;
92
+ let filenames = LocalFileSystem . list_all_files ( path, options. file_extension ) ?;
94
93
95
94
if filenames. is_empty ( ) {
96
95
return Err ( DataFusionError :: Execution ( format ! (
Original file line number Diff line number Diff line change @@ -279,13 +279,12 @@ impl ParquetPartition {
279
279
}
280
280
281
281
/// The Parquet filename for this partition
282
- pub fn filenames ( & self ) -> & [ String ] {
283
- todo ! ( )
284
- }
285
-
286
- /// Statistics for this partition
287
- pub fn statistics ( & self ) -> & Statistics {
288
- todo ! ( )
282
+ pub fn filenames ( & self ) -> Vec < String > {
283
+ self . file_partition
284
+ . files
285
+ . iter ( )
286
+ . map ( |f| f. file_path . clone ( ) )
287
+ . collect ( )
289
288
}
290
289
}
291
290
You can’t perform that action at this time.
0 commit comments