Skip to content

Commit 59143c1

Browse files
authored
TableProvider to skip files in the folder which non relevant to selected reader (#16487)
* minor: Avoid parquet read failure if the folder contains non parquet files
1 parent 921f4a0 commit 59143c1

File tree

4 files changed

+203
-4
lines changed

4 files changed

+203
-4
lines changed

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ impl TableProviderFactory for ListingTableFactory {
108108
(Some(schema), table_partition_cols)
109109
};
110110

111-
let table_path = ListingTableUrl::parse(&cmd.location)?;
111+
let mut table_path = ListingTableUrl::parse(&cmd.location)?;
112112

113113
let options = ListingOptions::new(file_format)
114-
.with_file_extension(file_extension)
114+
.with_file_extension(&file_extension)
115115
.with_session_config_options(session_state.config())
116116
.with_table_partition_cols(table_partition_cols);
117117

@@ -125,6 +125,13 @@ impl TableProviderFactory for ListingTableFactory {
125125
// specifically for parquet file format.
126126
// See: https://github.com/apache/datafusion/issues/7317
127127
None => {
128+
// if the folder then rewrite a file path as 'path/*.parquet'
129+
// to only read the files the reader can understand
130+
if table_path.is_folder() && table_path.get_glob().is_none() {
131+
table_path = table_path.with_glob(
132+
format!("*.{}", cmd.file_type.to_lowercase()).as_ref(),
133+
)?;
134+
}
128135
let schema = options.infer_schema(session_state, &table_path).await?;
129136
let df_schema = Arc::clone(&schema).to_dfschema()?;
130137
let column_refs: HashSet<_> = cmd

datafusion/core/src/execution/context/parquet.rs

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,13 @@ mod tests {
107107
use crate::test_util::parquet_test_data;
108108

109109
use arrow::util::pretty::pretty_format_batches;
110-
use datafusion_common::assert_contains;
111110
use datafusion_common::config::TableParquetOptions;
111+
use datafusion_common::{
112+
assert_batches_eq, assert_batches_sorted_eq, assert_contains,
113+
};
112114
use datafusion_execution::config::SessionConfig;
113115

114-
use tempfile::tempdir;
116+
use tempfile::{tempdir, TempDir};
115117

116118
#[tokio::test]
117119
async fn read_with_glob_path() -> Result<()> {
@@ -400,4 +402,124 @@ mod tests {
400402
assert_eq!(total_rows, 5);
401403
Ok(())
402404
}
405+
406+
#[tokio::test]
407+
async fn read_from_parquet_folder() -> Result<()> {
408+
let ctx = SessionContext::new();
409+
let tmp_dir = TempDir::new()?;
410+
let test_path = tmp_dir.path().to_str().unwrap().to_string();
411+
412+
ctx.sql("SELECT 1 a")
413+
.await?
414+
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
415+
.await?;
416+
417+
ctx.sql("SELECT 2 a")
418+
.await?
419+
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
420+
.await?;
421+
422+
// Adding CSV to check it is not read with Parquet reader
423+
ctx.sql("SELECT 3 a")
424+
.await?
425+
.write_csv(&test_path, DataFrameWriteOptions::default(), None)
426+
.await?;
427+
428+
let actual = ctx
429+
.read_parquet(&test_path, ParquetReadOptions::default())
430+
.await?
431+
.collect()
432+
.await?;
433+
434+
#[cfg_attr(any(), rustfmt::skip)]
435+
assert_batches_sorted_eq!(&[
436+
"+---+",
437+
"| a |",
438+
"+---+",
439+
"| 2 |",
440+
"| 1 |",
441+
"+---+",
442+
], &actual);
443+
444+
let actual = ctx
445+
.read_parquet(test_path, ParquetReadOptions::default())
446+
.await?
447+
.collect()
448+
.await?;
449+
450+
#[cfg_attr(any(), rustfmt::skip)]
451+
assert_batches_sorted_eq!(&[
452+
"+---+",
453+
"| a |",
454+
"+---+",
455+
"| 2 |",
456+
"| 1 |",
457+
"+---+",
458+
], &actual);
459+
460+
Ok(())
461+
}
462+
463+
#[tokio::test]
464+
async fn read_from_parquet_folder_table() -> Result<()> {
465+
let ctx = SessionContext::new();
466+
let tmp_dir = TempDir::new()?;
467+
let test_path = tmp_dir.path().to_str().unwrap().to_string();
468+
469+
ctx.sql("SELECT 1 a")
470+
.await?
471+
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
472+
.await?;
473+
474+
ctx.sql("SELECT 2 a")
475+
.await?
476+
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
477+
.await?;
478+
479+
// Adding CSV to check it is not read with Parquet reader
480+
ctx.sql("SELECT 3 a")
481+
.await?
482+
.write_csv(&test_path, DataFrameWriteOptions::default(), None)
483+
.await?;
484+
485+
ctx.sql(format!("CREATE EXTERNAL TABLE parquet_folder_t1 STORED AS PARQUET LOCATION '{test_path}'").as_ref())
486+
.await?;
487+
488+
let actual = ctx
489+
.sql("select * from parquet_folder_t1")
490+
.await?
491+
.collect()
492+
.await?;
493+
#[cfg_attr(any(), rustfmt::skip)]
494+
assert_batches_sorted_eq!(&[
495+
"+---+",
496+
"| a |",
497+
"+---+",
498+
"| 2 |",
499+
"| 1 |",
500+
"+---+",
501+
], &actual);
502+
503+
Ok(())
504+
}
505+
506+
#[tokio::test]
507+
async fn read_dummy_folder() -> Result<()> {
508+
let ctx = SessionContext::new();
509+
let test_path = "/foo/";
510+
511+
let actual = ctx
512+
.read_parquet(test_path, ParquetReadOptions::default())
513+
.await?
514+
.collect()
515+
.await?;
516+
517+
#[cfg_attr(any(), rustfmt::skip)]
518+
assert_batches_eq!(&[
519+
"++",
520+
"++",
521+
], &actual);
522+
523+
Ok(())
524+
}
403525
}

datafusion/datasource/src/url.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,28 @@ impl ListingTableUrl {
282282
let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
283283
ObjectStoreUrl::parse(url).unwrap()
284284
}
285+
286+
/// Returns true if the [`ListingTableUrl`] points to the folder
287+
pub fn is_folder(&self) -> bool {
288+
self.url.scheme() == "file" && self.is_collection()
289+
}
290+
291+
/// Return the `url` for [`ListingTableUrl`]
292+
pub fn get_url(&self) -> &Url {
293+
&self.url
294+
}
295+
296+
/// Return the `glob` for [`ListingTableUrl`]
297+
pub fn get_glob(&self) -> &Option<Pattern> {
298+
&self.glob
299+
}
300+
301+
/// Returns a copy of current [`ListingTableUrl`] with a specified `glob`
302+
pub fn with_glob(self, glob: &str) -> Result<Self> {
303+
let glob =
304+
Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
305+
Self::try_new(self.url, Some(glob))
306+
}
285307
}
286308

287309
/// Creates a file URL from a potentially relative filesystem path

datafusion/sqllogictest/test_files/parquet.slt

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,54 @@ select count(*) from listing_table;
304304
----
305305
12
306306

307+
# Test table pointing to the folder with parquet files(ends with /)
308+
statement ok
309+
CREATE EXTERNAL TABLE listing_table_folder_0
310+
STORED AS PARQUET
311+
LOCATION 'test_files/scratch/parquet/test_table/';
312+
313+
statement ok
314+
set datafusion.execution.listing_table_ignore_subdirectory = true;
315+
316+
# scan file: 0.parquet 1.parquet 2.parquet
317+
query I
318+
select count(*) from listing_table_folder_0;
319+
----
320+
9
321+
322+
statement ok
323+
set datafusion.execution.listing_table_ignore_subdirectory = false;
324+
325+
# scan file: 0.parquet 1.parquet 2.parquet 3.parquet
326+
query I
327+
select count(*) from listing_table_folder_0;
328+
----
329+
12
330+
331+
# Test table pointing to the folder with parquet files(doesn't end with /)
332+
statement ok
333+
CREATE EXTERNAL TABLE listing_table_folder_1
334+
STORED AS PARQUET
335+
LOCATION 'test_files/scratch/parquet/test_table';
336+
337+
statement ok
338+
set datafusion.execution.listing_table_ignore_subdirectory = true;
339+
340+
# scan file: 0.parquet 1.parquet 2.parquet
341+
query I
342+
select count(*) from listing_table_folder_1;
343+
----
344+
9
345+
346+
statement ok
347+
set datafusion.execution.listing_table_ignore_subdirectory = false;
348+
349+
# scan file: 0.parquet 1.parquet 2.parquet 3.parquet
350+
query I
351+
select count(*) from listing_table_folder_1;
352+
----
353+
12
354+
307355
# Clean up
308356
statement ok
309357
DROP TABLE timestamp_with_tz;

0 commit comments

Comments
 (0)