-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Map file-level column statistics to the table-level #15865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
bb0f430
to
26c5050
Compare
26c5050
to
777c5e7
Compare
Fyi @friendlymatthew |
777c5e7
to
df1db6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @xudong963 -- this looks good to me except for one more test.
Can you please add a test that verifies the statistics of a ListingTable that was created with two parquet files of different schemas? I think you could write a SLT level test with something like
> select * from values (1, 'a'), (2, 'b') t(int_col, str_col);
+---------+---------+
| int_col | str_col |
+---------+---------+
| 1 | a |
| 2 | b |
+---------+---------+
2 row(s) fetched.
Elapsed 0.006 seconds.
> COPY (SELECT * FROM values (1, 'a'), (2, 'b') t(int_col, str_col)) to '/tmp/table/1.parquet';
+-------+
| count |
+-------+
| 2 |
+-------+
1 row(s) fetched.
Elapsed 0.010 seconds.
> COPY (SELECT * FROM values ('c', 3), ('d', -1) t(str_col, int_col)) to '/tmp/table/2.parquet';
+-------+
| count |
+-------+
| 2 |
+-------+
1 row(s) fetched.
Elapsed 0.004 seconds.
And then verify the statistics with
> set datafusion.execution.collect_statistics = true;
0 row(s) fetched.
Elapsed 0.000 seconds.
> set datafusion.explain.show_statistics = true;
0 row(s) fetched.
Elapsed 0.000 seconds.
> create external table t stored as parquet location '/tmp/table';
0 row(s) fetched.
Elapsed 0.006 seconds.
> explain format indent select * from t;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: t projection=[int_col, str_col] |
| physical_plan | DataSourceExec: file_groups={2 groups: [[tmp/table/1.parquet], [tmp/table/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(288), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8View("a")) Max=Exact(Utf8View("d")) Null=Exact(0))]] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.001 seconds.
@@ -1129,7 +1130,17 @@ impl ListingTable { | |||
let (file_group, inexact_stats) = | |||
get_files_with_limit(files, limit, self.options.collect_stat).await?; | |||
|
|||
let file_groups = file_group.split_files(self.options.target_partitions); | |||
let mut file_groups = file_group.split_files(self.options.target_partitions); | |||
let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using the default schema mapper makes sense for now / in this PR, but in general I think it would make sense to allow the user to provide their own schema mapping rules here (so a default value that is not NULL
can be used, for example) via their own mapper.
However, we woudl have to add a schema mapper factory to ListingOptions
(this is not a change needed for this PR, I just noticed it while reviewing this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I filed an issue to track: #15889
Yes, cool tests |
let mut file_groups = file_group.split_files(self.options.target_partitions); | ||
let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb While I was working on #15852, I found in fact, for listing table, doesn't have the issue described in #15689, that is, all files here have the same schema because when creating table, all fetched files already use the SchemaMapper
to reorder their schema, see here: https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/opener.rs#L206.
What we should fix is let the file schema match the listing table schema, usually, if users specify the partition col, table schema will have the extra partition col infos, so I moved the mapper down the compute_all_files_statistics
method in the commit: 689fc66.
* init * fix clippy * add test
Which issue does this PR close?
Rationale for this change
As said in #15689
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?