Skip to content

Commit e18f7ba

Browse files
timvwandygrove
andauthored
refactor how we create listing tables (#4227)
* refactor how we create listing tables * run linting * attempt to register default table factories * pass csv as catalog type * enure that listingschemaprovider can keep working for csv with header * no need to registery default table factories * remove unused imports * use builder for listingoptions * remove comment * fix roundtrip test * Update datafusion/core/src/datasource/listing_table_factory.rs Co-authored-by: Andy Grove <[email protected]> * register a listingtablefactory for NDJSON * always verify that a factory is available * allow NDJSON as an alias for JSON Co-authored-by: Andy Grove <[email protected]>
1 parent a0581dc commit e18f7ba

File tree

9 files changed

+141
-162
lines changed

9 files changed

+141
-162
lines changed

datafusion-cli/src/main.rs

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616
// under the License.
1717

1818
use clap::Parser;
19-
use datafusion::datasource::datasource::TableProviderFactory;
20-
use datafusion::datasource::file_format::file_type::FileType;
21-
use datafusion::datasource::listing_table_factory::ListingTableFactory;
2219
use datafusion::datasource::object_store::ObjectStoreRegistry;
2320
use datafusion::error::{DataFusionError, Result};
2421
use datafusion::execution::context::SessionConfig;
@@ -29,7 +26,6 @@ use datafusion_cli::{
2926
exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
3027
};
3128
use mimalloc::MiMalloc;
32-
use std::collections::HashMap;
3329
use std::env;
3430
use std::path::Path;
3531
use std::sync::Arc;
@@ -147,31 +143,11 @@ pub async fn main() -> Result<()> {
147143
}
148144

149145
fn create_runtime_env() -> Result<RuntimeEnv> {
150-
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
151-
HashMap::new();
152-
table_factories.insert(
153-
"csv".to_string(),
154-
Arc::new(ListingTableFactory::new(FileType::CSV)),
155-
);
156-
table_factories.insert(
157-
"parquet".to_string(),
158-
Arc::new(ListingTableFactory::new(FileType::PARQUET)),
159-
);
160-
table_factories.insert(
161-
"avro".to_string(),
162-
Arc::new(ListingTableFactory::new(FileType::AVRO)),
163-
);
164-
table_factories.insert(
165-
"json".to_string(),
166-
Arc::new(ListingTableFactory::new(FileType::JSON)),
167-
);
168-
169146
let object_store_provider = DatafusionCliObjectStoreProvider {};
170147
let object_store_registry =
171148
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
172149
let rn_config = RuntimeConfig::new()
173-
.with_object_store_registry(Arc::new(object_store_registry))
174-
.with_table_factories(table_factories);
150+
.with_object_store_registry(Arc::new(object_store_registry));
175151
RuntimeEnv::new(rn_config)
176152
}
177153

datafusion/core/src/catalog/listing_schema.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub struct ListingSchemaProvider {
4949
factory: Arc<dyn TableProviderFactory>,
5050
store: Arc<dyn ObjectStore>,
5151
tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
52+
format: String,
53+
has_header: bool,
5254
}
5355

5456
impl ListingSchemaProvider {
@@ -59,18 +61,24 @@ impl ListingSchemaProvider {
5961
/// `path`: The root path that contains subfolders which represent tables
6062
/// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
6163
/// `store`: The `ObjectStore` containing the table data
64+
/// `format`: The `FileFormat` of the tables
65+
/// `has_header`: Indicates whether the created external table has the has_header flag enabled
6266
pub fn new(
6367
authority: String,
6468
path: object_store::path::Path,
6569
factory: Arc<dyn TableProviderFactory>,
6670
store: Arc<dyn ObjectStore>,
71+
format: String,
72+
has_header: bool,
6773
) -> Self {
6874
Self {
6975
authority,
7076
path,
7177
factory,
7278
store,
7379
tables: Arc::new(Mutex::new(HashMap::new())),
80+
format,
81+
has_header,
7482
}
7583
}
7684

@@ -118,8 +126,8 @@ impl ListingSchemaProvider {
118126
schema: Arc::new(DFSchema::empty()),
119127
name: table_name.to_string(),
120128
location: table_url,
121-
file_type: "".to_string(),
122-
has_header: false,
129+
file_type: self.format.clone(),
130+
has_header: self.has_header,
123131
delimiter: ',',
124132
table_partition_cols: vec![],
125133
if_not_exists: false,

datafusion/core/src/datasource/file_format/file_type.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl FromStr for FileType {
158158
"AVRO" => Ok(FileType::AVRO),
159159
"PARQUET" => Ok(FileType::PARQUET),
160160
"CSV" => Ok(FileType::CSV),
161-
"JSON" => Ok(FileType::JSON),
161+
"JSON" | "NDJSON" => Ok(FileType::JSON),
162162
_ => Err(DataFusionError::NotImplemented(format!(
163163
"Unknown FileType: {}",
164164
s

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use crate::datasource::datasource::TableProviderFactory;
2121
use crate::datasource::file_format::avro::AvroFormat;
2222
use crate::datasource::file_format::csv::CsvFormat;
23-
use crate::datasource::file_format::file_type::{FileType, GetExt};
23+
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
2424
use crate::datasource::file_format::json::JsonFormat;
2525
use crate::datasource::file_format::parquet::ParquetFormat;
2626
use crate::datasource::file_format::FileFormat;
@@ -30,18 +30,24 @@ use crate::datasource::listing::{
3030
use crate::datasource::TableProvider;
3131
use crate::execution::context::SessionState;
3232
use async_trait::async_trait;
33+
use datafusion_common::DataFusionError;
3334
use datafusion_expr::CreateExternalTable;
35+
use std::str::FromStr;
3436
use std::sync::Arc;
3537

3638
/// A `TableProviderFactory` capable of creating new `ListingTable`s
37-
pub struct ListingTableFactory {
38-
file_type: FileType,
39-
}
39+
pub struct ListingTableFactory {}
4040

4141
impl ListingTableFactory {
4242
/// Creates a new `ListingTableFactory`
43-
pub fn new(file_type: FileType) -> Self {
44-
Self { file_type }
43+
pub fn new() -> Self {
44+
Self {}
45+
}
46+
}
47+
48+
impl Default for ListingTableFactory {
49+
fn default() -> Self {
50+
Self::new()
4551
}
4652
}
4753

@@ -52,24 +58,59 @@ impl TableProviderFactory for ListingTableFactory {
5258
state: &SessionState,
5359
cmd: &CreateExternalTable,
5460
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
55-
let file_extension = self.file_type.get_ext();
61+
let file_compression_type = FileCompressionType::from_str(
62+
cmd.file_compression_type.as_str(),
63+
)
64+
.map_err(|_| {
65+
DataFusionError::Execution(format!(
66+
"Unknown FileCompressionType {}",
67+
cmd.file_compression_type.as_str()
68+
))
69+
})?;
70+
let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| {
71+
DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type))
72+
})?;
5673

57-
let file_format: Arc<dyn FileFormat> = match self.file_type {
58-
FileType::CSV => Arc::new(CsvFormat::default()),
74+
let file_extension =
75+
file_type.get_ext_with_compression(file_compression_type.to_owned())?;
76+
77+
let file_format: Arc<dyn FileFormat> = match file_type {
78+
FileType::CSV => Arc::new(
79+
CsvFormat::default()
80+
.with_has_header(cmd.has_header)
81+
.with_delimiter(cmd.delimiter as u8)
82+
.with_file_compression_type(file_compression_type),
83+
),
5984
FileType::PARQUET => Arc::new(ParquetFormat::default()),
6085
FileType::AVRO => Arc::new(AvroFormat::default()),
61-
FileType::JSON => Arc::new(JsonFormat::default()),
86+
FileType::JSON => Arc::new(
87+
JsonFormat::default().with_file_compression_type(file_compression_type),
88+
),
89+
};
90+
91+
let provided_schema = if cmd.schema.fields().is_empty() {
92+
None
93+
} else {
94+
Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
6295
};
6396

64-
let options =
65-
ListingOptions::new(file_format).with_file_extension(file_extension);
97+
let options = ListingOptions::new(file_format)
98+
.with_collect_stat(state.config.collect_statistics)
99+
.with_file_extension(file_extension)
100+
.with_target_partitions(state.config.target_partitions)
101+
.with_table_partition_cols(cmd.table_partition_cols.clone())
102+
.with_file_sort_order(None);
66103

67104
let table_path = ListingTableUrl::parse(&cmd.location)?;
68-
let resolved_schema = options.infer_schema(state, &table_path).await?;
105+
let resolved_schema = match provided_schema {
106+
None => options.infer_schema(state, &table_path).await?,
107+
Some(s) => s,
108+
};
69109
let config = ListingTableConfig::new(table_path)
70110
.with_listing_options(options)
71111
.with_schema(resolved_schema);
72-
let table = ListingTable::try_new(config)?;
112+
let table =
113+
ListingTable::try_new(config)?.with_definition(cmd.definition.clone());
73114
Ok(Arc::new(table))
74115
}
75116
}

0 commit comments

Comments
 (0)