Skip to content

Commit 9ea7dc6

Browse files
authored
Decouple FileFormat from datafusion_data_access (#2572)
* Decouple FileFormat from datafusion_data_access * Review feedback * Update ballista pin
1 parent a9cc38a commit 9ea7dc6

File tree

13 files changed

+348
-416
lines changed

13 files changed

+348
-416
lines changed

datafusion/core/src/datasource/file_format/avro.rs

Lines changed: 30 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use arrow::datatypes::Schema;
2424
use arrow::{self, datatypes::SchemaRef};
2525
use async_trait::async_trait;
26-
use futures::StreamExt;
26+
use datafusion_data_access::FileMeta;
2727

2828
use super::FileFormat;
2929
use crate::avro_to_arrow::read_avro_schema_from_reader;
@@ -32,7 +32,7 @@ use crate::logical_plan::Expr;
3232
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
3333
use crate::physical_plan::ExecutionPlan;
3434
use crate::physical_plan::Statistics;
35-
use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
35+
use datafusion_data_access::object_store::ObjectStore;
3636

3737
/// The default file extension of avro files
3838
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
@@ -46,10 +46,14 @@ impl FileFormat for AvroFormat {
4646
self
4747
}
4848

49-
async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
49+
async fn infer_schema(
50+
&self,
51+
store: &Arc<dyn ObjectStore>,
52+
files: &[FileMeta],
53+
) -> Result<SchemaRef> {
5054
let mut schemas = vec![];
51-
while let Some(obj_reader) = readers.next().await {
52-
let mut reader = obj_reader?.sync_reader()?;
55+
for file in files {
56+
let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
5357
let schema = read_avro_schema_from_reader(&mut reader)?;
5458
schemas.push(schema);
5559
}
@@ -59,8 +63,9 @@ impl FileFormat for AvroFormat {
5963

6064
async fn infer_stats(
6165
&self,
62-
_reader: Arc<dyn ObjectReader>,
66+
_store: &Arc<dyn ObjectStore>,
6367
_table_schema: SchemaRef,
68+
_file: &FileMeta,
6469
) -> Result<Statistics> {
6570
Ok(Statistics::default())
6671
}
@@ -78,15 +83,9 @@ impl FileFormat for AvroFormat {
7883
#[cfg(test)]
7984
#[cfg(feature = "avro")]
8085
mod tests {
81-
use crate::{
82-
datafusion_data_access::object_store::local::{
83-
local_object_reader, local_object_reader_stream, LocalFileSystem,
84-
},
85-
physical_plan::collect,
86-
};
87-
8886
use super::*;
89-
use crate::datasource::listing::local_unpartitioned_file;
87+
use crate::datasource::file_format::test_util::scan_format;
88+
use crate::physical_plan::collect;
9089
use crate::prelude::{SessionConfig, SessionContext};
9190
use arrow::array::{
9291
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
@@ -100,7 +99,7 @@ mod tests {
10099
let ctx = SessionContext::with_config(config);
101100
let task_ctx = ctx.task_ctx();
102101
let projection = None;
103-
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
102+
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
104103
let stream = exec.execute(0, task_ctx)?;
105104

106105
let tt_batches = stream
@@ -122,7 +121,7 @@ mod tests {
122121
let session_ctx = SessionContext::new();
123122
let task_ctx = session_ctx.task_ctx();
124123
let projection = None;
125-
let exec = get_exec("alltypes_plain.avro", &projection, Some(1)).await?;
124+
let exec = get_exec("alltypes_plain.avro", projection, Some(1)).await?;
126125
let batches = collect(exec, task_ctx).await?;
127126
assert_eq!(1, batches.len());
128127
assert_eq!(11, batches[0].num_columns());
@@ -136,7 +135,7 @@ mod tests {
136135
let session_ctx = SessionContext::new();
137136
let task_ctx = session_ctx.task_ctx();
138137
let projection = None;
139-
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
138+
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
140139

141140
let x: Vec<String> = exec
142141
.schema()
@@ -188,7 +187,7 @@ mod tests {
188187
let session_ctx = SessionContext::new();
189188
let task_ctx = session_ctx.task_ctx();
190189
let projection = Some(vec![1]);
191-
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
190+
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
192191

193192
let batches = collect(exec, task_ctx).await?;
194193
assert_eq!(batches.len(), 1);
@@ -218,7 +217,7 @@ mod tests {
218217
let session_ctx = SessionContext::new();
219218
let task_ctx = session_ctx.task_ctx();
220219
let projection = Some(vec![0]);
221-
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
220+
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
222221

223222
let batches = collect(exec, task_ctx).await?;
224223
assert_eq!(batches.len(), 1);
@@ -245,7 +244,7 @@ mod tests {
245244
let session_ctx = SessionContext::new();
246245
let task_ctx = session_ctx.task_ctx();
247246
let projection = Some(vec![10]);
248-
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
247+
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
249248

250249
let batches = collect(exec, task_ctx).await?;
251250
assert_eq!(batches.len(), 1);
@@ -272,7 +271,7 @@ mod tests {
272271
let session_ctx = SessionContext::new();
273272
let task_ctx = session_ctx.task_ctx();
274273
let projection = Some(vec![6]);
275-
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
274+
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
276275

277276
let batches = collect(exec, task_ctx).await?;
278277
assert_eq!(batches.len(), 1);
@@ -302,7 +301,7 @@ mod tests {
302301
let session_ctx = SessionContext::new();
303302
let task_ctx = session_ctx.task_ctx();
304303
let projection = Some(vec![7]);
305-
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
304+
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
306305

307306
let batches = collect(exec, task_ctx).await?;
308307
assert_eq!(batches.len(), 1);
@@ -332,7 +331,7 @@ mod tests {
332331
let session_ctx = SessionContext::new();
333332
let task_ctx = session_ctx.task_ctx();
334333
let projection = Some(vec![9]);
335-
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
334+
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
336335

337336
let batches = collect(exec, task_ctx).await?;
338337
assert_eq!(batches.len(), 1);
@@ -359,36 +358,13 @@ mod tests {
359358

360359
async fn get_exec(
361360
file_name: &str,
362-
projection: &Option<Vec<usize>>,
361+
projection: Option<Vec<usize>>,
363362
limit: Option<usize>,
364363
) -> Result<Arc<dyn ExecutionPlan>> {
365364
let testdata = crate::test_util::arrow_test_data();
366-
let filename = format!("{}/avro/{}", testdata, file_name);
365+
let store_root = format!("{}/avro", testdata);
367366
let format = AvroFormat {};
368-
let file_schema = format
369-
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
370-
.await
371-
.expect("Schema inference");
372-
let statistics = format
373-
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
374-
.await
375-
.expect("Stats inference");
376-
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
377-
let exec = format
378-
.create_physical_plan(
379-
FileScanConfig {
380-
object_store: Arc::new(LocalFileSystem {}),
381-
file_schema,
382-
file_groups,
383-
statistics,
384-
projection: projection.clone(),
385-
limit,
386-
table_partition_cols: vec![],
387-
},
388-
&[],
389-
)
390-
.await?;
391-
Ok(exec)
367+
scan_format(&format, &store_root, file_name, projection, limit).await
392368
}
393369
}
394370

@@ -397,18 +373,17 @@ mod tests {
397373
mod tests {
398374
use super::*;
399375

400-
use crate::datafusion_data_access::object_store::local::local_object_reader_stream;
376+
use super::super::test_util::scan_format;
401377
use crate::error::DataFusionError;
402378

403379
#[tokio::test]
404380
async fn test() -> Result<()> {
381+
let format = AvroFormat {};
405382
let testdata = crate::test_util::arrow_test_data();
406-
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
407-
let schema_result = AvroFormat {}
408-
.infer_schema(local_object_reader_stream(vec![filename]))
409-
.await;
383+
let filename = "avro/alltypes_plain.avro";
384+
let result = scan_format(&format, &testdata, filename, None, None).await;
410385
assert!(matches!(
411-
schema_result,
386+
result,
412387
Err(DataFusionError::NotImplemented(msg))
413388
if msg == *"cannot read avro schema without the 'avro' feature enabled"
414389
));

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 22 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use arrow::datatypes::Schema;
2424
use arrow::{self, datatypes::SchemaRef};
2525
use async_trait::async_trait;
26-
use futures::StreamExt;
26+
use datafusion_data_access::FileMeta;
2727

2828
use super::FileFormat;
2929
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
@@ -32,7 +32,7 @@ use crate::logical_plan::Expr;
3232
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
3333
use crate::physical_plan::ExecutionPlan;
3434
use crate::physical_plan::Statistics;
35-
use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
35+
use datafusion_data_access::object_store::ObjectStore;
3636

3737
/// The default file extension of csv files
3838
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
@@ -93,13 +93,17 @@ impl FileFormat for CsvFormat {
9393
self
9494
}
9595

96-
async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
96+
async fn infer_schema(
97+
&self,
98+
store: &Arc<dyn ObjectStore>,
99+
files: &[FileMeta],
100+
) -> Result<SchemaRef> {
97101
let mut schemas = vec![];
98102

99-
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX);
103+
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
100104

101-
while let Some(obj_reader) = readers.next().await {
102-
let mut reader = obj_reader?.sync_reader()?;
105+
for file in files {
106+
let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
103107
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
104108
&mut reader,
105109
self.delimiter,
@@ -122,8 +126,9 @@ impl FileFormat for CsvFormat {
122126

123127
async fn infer_stats(
124128
&self,
125-
_reader: Arc<dyn ObjectReader>,
129+
_store: &Arc<dyn ObjectStore>,
126130
_table_schema: SchemaRef,
131+
_file: &FileMeta,
127132
) -> Result<Statistics> {
128133
Ok(Statistics::default())
129134
}
@@ -142,24 +147,19 @@ impl FileFormat for CsvFormat {
142147
mod tests {
143148
use arrow::array::StringArray;
144149

150+
use super::super::test_util::scan_format;
145151
use super::*;
146-
use crate::datasource::listing::local_unpartitioned_file;
152+
use crate::physical_plan::collect;
147153
use crate::prelude::{SessionConfig, SessionContext};
148-
use crate::{
149-
datafusion_data_access::object_store::local::{
150-
local_object_reader, local_object_reader_stream, LocalFileSystem,
151-
},
152-
datasource::file_format::FileScanConfig,
153-
physical_plan::collect,
154-
};
154+
use futures::StreamExt;
155155

156156
#[tokio::test]
157157
async fn read_small_batches() -> Result<()> {
158158
let config = SessionConfig::new().with_batch_size(2);
159159
let ctx = SessionContext::with_config(config);
160160
// skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work)
161161
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
162-
let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
162+
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
163163
let task_ctx = ctx.task_ctx();
164164
let stream = exec.execute(0, task_ctx)?;
165165

@@ -186,7 +186,7 @@ mod tests {
186186
let session_ctx = SessionContext::new();
187187
let task_ctx = session_ctx.task_ctx();
188188
let projection = Some(vec![0, 1, 2, 3]);
189-
let exec = get_exec("aggregate_test_100.csv", &projection, Some(1)).await?;
189+
let exec = get_exec("aggregate_test_100.csv", projection, Some(1)).await?;
190190
let batches = collect(exec, task_ctx).await?;
191191
assert_eq!(1, batches.len());
192192
assert_eq!(4, batches[0].num_columns());
@@ -198,7 +198,7 @@ mod tests {
198198
#[tokio::test]
199199
async fn infer_schema() -> Result<()> {
200200
let projection = None;
201-
let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
201+
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
202202

203203
let x: Vec<String> = exec
204204
.schema()
@@ -233,7 +233,7 @@ mod tests {
233233
let session_ctx = SessionContext::new();
234234
let task_ctx = session_ctx.task_ctx();
235235
let projection = Some(vec![0]);
236-
let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
236+
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
237237

238238
let batches = collect(exec, task_ctx).await.expect("Collect batches");
239239

@@ -258,35 +258,11 @@ mod tests {
258258

259259
async fn get_exec(
260260
file_name: &str,
261-
projection: &Option<Vec<usize>>,
261+
projection: Option<Vec<usize>>,
262262
limit: Option<usize>,
263263
) -> Result<Arc<dyn ExecutionPlan>> {
264-
let testdata = crate::test_util::arrow_test_data();
265-
let filename = format!("{}/csv/{}", testdata, file_name);
264+
let root = format!("{}/csv", crate::test_util::arrow_test_data());
266265
let format = CsvFormat::default();
267-
let file_schema = format
268-
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
269-
.await
270-
.expect("Schema inference");
271-
let statistics = format
272-
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
273-
.await
274-
.expect("Stats inference");
275-
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
276-
let exec = format
277-
.create_physical_plan(
278-
FileScanConfig {
279-
object_store: Arc::new(LocalFileSystem),
280-
file_schema,
281-
file_groups,
282-
statistics,
283-
projection: projection.clone(),
284-
limit,
285-
table_partition_cols: vec![],
286-
},
287-
&[],
288-
)
289-
.await?;
290-
Ok(exec)
266+
scan_format(&format, &root, file_name, projection, limit).await
291267
}
292268
}

0 commit comments

Comments
 (0)