Skip to content

Commit bb8b91c

Browse files
committed
[fix] add partition cols to infered schema
1 parent fe7cb50 commit bb8b91c

File tree

1 file changed

+51
-44
lines changed

1 file changed

+51
-44
lines changed

datafusion/src/datasource/listing.rs

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
2121
use std::{any::Any, sync::Arc};
2222

23-
use arrow::datatypes::SchemaRef;
23+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
24+
use async_trait::async_trait;
2425
use futures::{StreamExt, TryStreamExt};
2526

2627
use crate::{
@@ -32,6 +33,7 @@ use crate::{
3233

3334
use super::{
3435
datasource::TableProviderFilterPushDown, format::FileFormat, PartitionedFile,
36+
TableProvider,
3537
};
3638

3739
/// Options for creating a `ListingTable`
@@ -75,7 +77,13 @@ impl ListingOptions {
7577
&self.file_extension, path
7678
))
7779
})?;
78-
self.format.infer_schema(&first_file).await
80+
let file_schema = self.format.infer_schema(&first_file).await?;
81+
// Add the partition columns to the file schema
82+
let mut fields = file_schema.fields().clone();
83+
for part in &self.partitions {
84+
fields.push(Field::new(part, DataType::Utf8, false));
85+
}
86+
Ok(Arc::new(Schema::new(fields)))
7987
}
8088
}
8189

@@ -104,9 +112,8 @@ impl ListingTable {
104112
}
105113
}
106114

107-
// TODO add back impl ExecutionPlan
108-
#[allow(dead_code)]
109-
impl ListingTable {
115+
#[async_trait]
116+
impl TableProvider for ListingTable {
110117
fn as_any(&self) -> &dyn Any {
111118
self
112119
}
@@ -122,7 +129,7 @@ impl ListingTable {
122129
filters: &[Expr],
123130
limit: Option<usize>,
124131
) -> Result<Arc<dyn ExecutionPlan>> {
125-
// 1. list files (with partitions)
132+
// list files (with partitions)
126133
let file_list = pruned_partition_list(
127134
&self.path,
128135
filters,
@@ -214,47 +221,47 @@ fn split_files(
214221

215222
#[cfg(test)]
216223
mod tests {
217-
// use super::*;
218-
// use futures::StreamExt;
224+
use super::*;
225+
use futures::StreamExt;
219226

220-
// #[tokio::test]
221-
// async fn read_small_batches() -> Result<()> {
222-
// let table = load_table("alltypes_plain.parquet").await?;
223-
// let projection = None;
224-
// let exec = table.scan(&projection, 2, &[], None)?;
225-
// let stream = exec.execute(0).await?;
227+
#[tokio::test]
228+
async fn read_small_batches() -> Result<()> {
229+
let table = load_table("alltypes_plain.parquet").await?;
230+
let projection = None;
231+
let exec = table.scan(&projection, 2, &[], None).await?;
232+
let stream = exec.execute(0).await?;
226233

227-
// let _ = stream
228-
// .map(|batch| {
229-
// let batch = batch.unwrap();
230-
// assert_eq!(11, batch.num_columns());
231-
// assert_eq!(2, batch.num_rows());
232-
// })
233-
// .fold(0, |acc, _| async move { acc + 1i32 })
234-
// .await;
234+
let _ = stream
235+
.map(|batch| {
236+
let batch = batch.unwrap();
237+
assert_eq!(11, batch.num_columns());
238+
assert_eq!(2, batch.num_rows());
239+
})
240+
.fold(0, |acc, _| async move { acc + 1i32 })
241+
.await;
235242

236-
// // test metadata
237-
// assert_eq!(exec.statistics().num_rows, Some(8));
238-
// assert_eq!(exec.statistics().total_byte_size, Some(671));
243+
// test metadata
244+
assert_eq!(exec.statistics().num_rows, Some(8));
245+
assert_eq!(exec.statistics().total_byte_size, Some(671));
239246

240-
// Ok(())
241-
// }
247+
Ok(())
248+
}
242249

243-
// async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
244-
// let testdata = crate::test_util::parquet_test_data();
245-
// let filename = format!("{}/{}", testdata, name);
246-
// let opt = ListingOptions {
247-
// file_extension: "parquet".to_owned(),
248-
// format: Arc::new(format::parquet::ParquetFormat {
249-
// enable_pruning: true,
250-
// }),
251-
// partitions: vec![],
252-
// max_partitions: 2,
253-
// collect_stat: true,
254-
// };
255-
// // here we resolve the schema locally
256-
// let schema = opt.infer_schema(&filename).await?;
257-
// let table = ListingTable::try_new(&filename, schema, opt)?;
258-
// Ok(Arc::new(table))
259-
// }
250+
async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
251+
let testdata = crate::test_util::parquet_test_data();
252+
let filename = format!("{}/{}", testdata, name);
253+
let opt = ListingOptions {
254+
file_extension: "parquet".to_owned(),
255+
format: Arc::new(format::parquet::ParquetFormat {
256+
enable_pruning: true,
257+
}),
258+
partitions: vec![],
259+
max_partitions: 2,
260+
collect_stat: true,
261+
};
262+
// here we resolve the schema locally
263+
let schema = opt.infer_schema(&filename).await?;
264+
let table = ListingTable::try_new(&filename, schema, opt)?;
265+
Ok(Arc::new(table))
266+
}
260267
}

0 commit comments

Comments
 (0)