Skip to content

Commit 4709fc6

Browse files
authored
Add FileScanConfig::new() API (#10623)
* Add FileScanConfig::new() API, update code to use new API * Remove add_* api
1 parent 2fc1575 commit 4709fc6

File tree

24 files changed

+316
-443
lines changed

24 files changed

+316
-443
lines changed

datafusion-examples/examples/csv_opener.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use std::{sync::Arc, vec};
1919

20-
use datafusion::common::Statistics;
2120
use datafusion::{
2221
assert_batches_eq,
2322
datasource::{
@@ -58,16 +57,11 @@ async fn main() -> Result<()> {
5857

5958
let path = std::path::Path::new(&path).canonicalize()?;
6059

61-
let scan_config = FileScanConfig {
62-
object_store_url: ObjectStoreUrl::local_filesystem(),
63-
file_schema: schema.clone(),
64-
file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
65-
statistics: Statistics::new_unknown(&schema),
66-
projection: Some(vec![12, 0]),
67-
limit: Some(5),
68-
table_partition_cols: vec![],
69-
output_ordering: vec![],
70-
};
60+
let scan_config =
61+
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
62+
.with_projection(Some(vec![12, 0]))
63+
.with_limit(Some(5))
64+
.with_file(PartitionedFile::new(path.display().to_string(), 10));
7165

7266
let result =
7367
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())

datafusion-examples/examples/json_opener.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use datafusion::{
2929
error::Result,
3030
physical_plan::metrics::ExecutionPlanMetricsSet,
3131
};
32-
use datafusion_common::Statistics;
3332

3433
use futures::StreamExt;
3534
use object_store::ObjectStore;
@@ -61,16 +60,11 @@ async fn main() -> Result<()> {
6160
Arc::new(object_store),
6261
);
6362

64-
let scan_config = FileScanConfig {
65-
object_store_url: ObjectStoreUrl::local_filesystem(),
66-
file_schema: schema.clone(),
67-
file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
68-
statistics: Statistics::new_unknown(&schema),
69-
projection: Some(vec![1, 0]),
70-
limit: Some(5),
71-
table_partition_cols: vec![],
72-
output_ordering: vec![],
73-
};
63+
let scan_config =
64+
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
65+
.with_projection(Some(vec![1, 0]))
66+
.with_limit(Some(5))
67+
.with_file(PartitionedFile::new(path.to_string(), 10));
7468

7569
let result =
7670
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,16 +154,11 @@ pub(crate) mod test_util {
154154
let exec = format
155155
.create_physical_plan(
156156
state,
157-
FileScanConfig {
158-
object_store_url: ObjectStoreUrl::local_filesystem(),
159-
file_schema,
160-
file_groups,
161-
statistics,
162-
projection,
163-
limit,
164-
table_partition_cols: vec![],
165-
output_ordering: vec![],
166-
},
157+
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
158+
.with_file_groups(file_groups)
159+
.with_statistics(statistics)
160+
.with_projection(projection)
161+
.with_limit(limit),
167162
None,
168163
)
169164
.await?;

datafusion/core/src/datasource/listing/table.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -805,16 +805,13 @@ impl TableProvider for ListingTable {
805805
.format
806806
.create_physical_plan(
807807
state,
808-
FileScanConfig {
809-
object_store_url,
810-
file_schema: Arc::clone(&self.file_schema),
811-
file_groups: partitioned_file_lists,
812-
statistics,
813-
projection: projection.cloned(),
814-
limit,
815-
output_ordering,
816-
table_partition_cols,
817-
},
808+
FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema))
809+
.with_file_groups(partitioned_file_lists)
810+
.with_statistics(statistics)
811+
.with_projection(projection.cloned())
812+
.with_limit(limit)
813+
.with_output_ordering(output_ordering)
814+
.with_table_partition_cols(table_partition_cols),
818815
filters.as_ref(),
819816
)
820817
.await

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -271,16 +271,11 @@ mod tests {
271271
.infer_schema(&state, &store, &[meta.clone()])
272272
.await?;
273273

274-
let avro_exec = AvroExec::new(FileScanConfig {
275-
object_store_url: ObjectStoreUrl::local_filesystem(),
276-
file_groups: vec![vec![meta.into()]],
277-
statistics: Statistics::new_unknown(&file_schema),
278-
file_schema,
279-
projection: Some(vec![0, 1, 2]),
280-
limit: None,
281-
table_partition_cols: vec![],
282-
output_ordering: vec![],
283-
});
274+
let avro_exec = AvroExec::new(
275+
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
276+
.with_file(meta.into())
277+
.with_projection(Some(vec![0, 1, 2])),
278+
);
284279
assert_eq!(
285280
avro_exec
286281
.properties()
@@ -348,16 +343,11 @@ mod tests {
348343
// Include the missing column in the projection
349344
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
350345

351-
let avro_exec = AvroExec::new(FileScanConfig {
352-
object_store_url,
353-
file_groups: vec![vec![meta.into()]],
354-
statistics: Statistics::new_unknown(&file_schema),
355-
file_schema,
356-
projection,
357-
limit: None,
358-
table_partition_cols: vec![],
359-
output_ordering: vec![],
360-
});
346+
let avro_exec = AvroExec::new(
347+
FileScanConfig::new(object_store_url, file_schema)
348+
.with_file(meta.into())
349+
.with_projection(projection),
350+
);
361351
assert_eq!(
362352
avro_exec
363353
.properties()
@@ -422,18 +412,19 @@ mod tests {
422412
let mut partitioned_file = PartitionedFile::from(meta);
423413
partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];
424414

425-
let avro_exec = AvroExec::new(FileScanConfig {
426-
// select specific columns of the files as well as the partitioning
427-
// column which is supposed to be the last column in the table schema.
428-
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
429-
object_store_url,
430-
file_groups: vec![vec![partitioned_file]],
431-
statistics: Statistics::new_unknown(&file_schema),
432-
file_schema,
433-
limit: None,
434-
table_partition_cols: vec![Field::new("date", DataType::Utf8, false)],
435-
output_ordering: vec![],
436-
});
415+
let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
416+
let avro_exec = AvroExec::new(
417+
FileScanConfig::new(object_store_url, file_schema)
418+
// select specific columns of the files as well as the partitioning
419+
// column which is supposed to be the last column in the table schema.
420+
.with_projection(projection)
421+
.with_file(partitioned_file)
422+
.with_table_partition_cols(vec![Field::new(
423+
"date",
424+
DataType::Utf8,
425+
false,
426+
)]),
427+
);
437428
assert_eq!(
438429
avro_exec
439430
.properties()

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ mod tests {
561561
tmp_dir.path(),
562562
)?;
563563

564-
let mut config = partitioned_csv_config(file_schema, file_groups)?;
564+
let mut config = partitioned_csv_config(file_schema, file_groups);
565565
config.projection = Some(vec![0, 2, 4]);
566566

567567
let csv = CsvExec::new(
@@ -627,7 +627,7 @@ mod tests {
627627
tmp_dir.path(),
628628
)?;
629629

630-
let mut config = partitioned_csv_config(file_schema, file_groups)?;
630+
let mut config = partitioned_csv_config(file_schema, file_groups);
631631
config.projection = Some(vec![4, 0, 2]);
632632

633633
let csv = CsvExec::new(
@@ -693,7 +693,7 @@ mod tests {
693693
tmp_dir.path(),
694694
)?;
695695

696-
let mut config = partitioned_csv_config(file_schema, file_groups)?;
696+
let mut config = partitioned_csv_config(file_schema, file_groups);
697697
config.limit = Some(5);
698698

699699
let csv = CsvExec::new(
@@ -756,7 +756,7 @@ mod tests {
756756
tmp_dir.path(),
757757
)?;
758758

759-
let mut config = partitioned_csv_config(file_schema, file_groups)?;
759+
let mut config = partitioned_csv_config(file_schema, file_groups);
760760
config.limit = Some(5);
761761

762762
let csv = CsvExec::new(
@@ -809,7 +809,7 @@ mod tests {
809809
tmp_dir.path(),
810810
)?;
811811

812-
let mut config = partitioned_csv_config(file_schema, file_groups)?;
812+
let mut config = partitioned_csv_config(file_schema, file_groups);
813813

814814
// Add partition columns
815815
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
@@ -914,7 +914,7 @@ mod tests {
914914
)
915915
.unwrap();
916916

917-
let config = partitioned_csv_config(file_schema, file_groups).unwrap();
917+
let config = partitioned_csv_config(file_schema, file_groups);
918918
let csv = CsvExec::new(
919919
config,
920920
true,

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

Lines changed: 114 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,41 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
6464

6565
/// The base configurations to provide when creating a physical plan for
6666
/// any given file format.
67+
///
68+
/// # Example
69+
/// ```
70+
/// # use std::sync::Arc;
71+
/// # use arrow_schema::Schema;
72+
/// use datafusion::datasource::listing::PartitionedFile;
73+
/// # use datafusion::datasource::physical_plan::FileScanConfig;
74+
/// # use datafusion_execution::object_store::ObjectStoreUrl;
75+
/// # let file_schema = Arc::new(Schema::empty());
76+
/// // create FileScan config for reading data from file://
77+
/// let object_store_url = ObjectStoreUrl::local_filesystem();
78+
/// let config = FileScanConfig::new(object_store_url, file_schema)
79+
/// .with_limit(Some(1000)) // read only the first 1000 records
80+
/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3
81+
/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
82+
/// .with_file(PartitionedFile::new("file1.parquet", 1234))
83+
/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
84+
/// // in a single row group
85+
/// .with_file_group(vec![
86+
/// PartitionedFile::new("file2.parquet", 56),
87+
/// PartitionedFile::new("file3.parquet", 78),
88+
/// ]);
89+
/// ```
6790
#[derive(Clone)]
6891
pub struct FileScanConfig {
6992
/// Object store URL, used to get an [`ObjectStore`] instance from
7093
/// [`RuntimeEnv::object_store`]
7194
///
95+
/// This `ObjectStoreUrl` should be the prefix of the absolute url for files
96+
/// as `file://` or `s3://my_bucket`. It should not include the path to the
97+
/// file itself. The relevant URL prefix must be registered via
98+
/// [`RuntimeEnv::register_object_store`]
99+
///
72100
/// [`ObjectStore`]: object_store::ObjectStore
101+
/// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
73102
/// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
74103
pub object_store_url: ObjectStoreUrl,
75104
/// Schema before `projection` is applied. It contains the all columns that may
@@ -87,6 +116,7 @@ pub struct FileScanConfig {
87116
/// sequentially, one after the next.
88117
pub file_groups: Vec<Vec<PartitionedFile>>,
89118
/// Estimated overall statistics of the files, taking `filters` into account.
119+
/// Defaults to [`Statistics::new_unknown`].
90120
pub statistics: Statistics,
91121
/// Columns on which to project the data. Indexes that are higher than the
92122
/// number of columns of `file_schema` refer to `table_partition_cols`.
@@ -101,6 +131,86 @@ pub struct FileScanConfig {
101131
}
102132

103133
impl FileScanConfig {
134+
/// Create a new `FileScanConfig` with default settings for scanning files.
135+
///
136+
/// See example on [`FileScanConfig`]
137+
///
138+
/// No file groups are added by default. See [`Self::with_file`], [`Self::with_file_group]` and
139+
/// [`Self::with_file_groups`].
140+
///
141+
/// # Parameters:
142+
/// * `object_store_url`: See [`Self::object_store_url`]
143+
/// * `file_schema`: See [`Self::file_schema`]
144+
pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) -> Self {
145+
let statistics = Statistics::new_unknown(&file_schema);
146+
Self {
147+
object_store_url,
148+
file_schema,
149+
file_groups: vec![],
150+
statistics,
151+
projection: None,
152+
limit: None,
153+
table_partition_cols: vec![],
154+
output_ordering: vec![],
155+
}
156+
}
157+
158+
/// Set the statistics of the files
159+
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
160+
self.statistics = statistics;
161+
self
162+
}
163+
164+
/// Set the projection of the files
165+
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
166+
self.projection = projection;
167+
self
168+
}
169+
170+
/// Set the limit of the files
171+
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
172+
self.limit = limit;
173+
self
174+
}
175+
176+
/// Add a file as a single group
177+
///
178+
/// See [Self::file_groups] for more information.
179+
pub fn with_file(self, file: PartitionedFile) -> Self {
180+
self.with_file_group(vec![file])
181+
}
182+
183+
/// Add the file groups
184+
///
185+
/// See [Self::file_groups] for more information.
186+
pub fn with_file_groups(
187+
mut self,
188+
mut file_groups: Vec<Vec<PartitionedFile>>,
189+
) -> Self {
190+
self.file_groups.append(&mut file_groups);
191+
self
192+
}
193+
194+
/// Add a new file group
195+
///
196+
/// See [Self::file_groups] for more information
197+
pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self {
198+
self.file_groups.push(file_group);
199+
self
200+
}
201+
202+
/// Set the partitioning columns of the files
203+
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
204+
self.table_partition_cols = table_partition_cols;
205+
self
206+
}
207+
208+
/// Set the output ordering of the files
209+
pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
210+
self.output_ordering = output_ordering;
211+
self
212+
}
213+
104214
/// Project the schema and the statistics on the given column indices
105215
pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
106216
if self.projection.is_none() && self.table_partition_cols.is_empty() {
@@ -1117,16 +1227,10 @@ mod tests {
11171227
statistics: Statistics,
11181228
table_partition_cols: Vec<Field>,
11191229
) -> FileScanConfig {
1120-
FileScanConfig {
1121-
file_schema,
1122-
file_groups: vec![vec![]],
1123-
limit: None,
1124-
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
1125-
projection,
1126-
statistics,
1127-
table_partition_cols,
1128-
output_ordering: vec![],
1129-
}
1230+
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), file_schema)
1231+
.with_projection(projection)
1232+
.with_statistics(statistics)
1233+
.with_table_partition_cols(table_partition_cols)
11301234
}
11311235

11321236
/// Convert partition columns from Vec<String DataType> to Vec<Field>

0 commit comments

Comments
 (0)