-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Refactor ParquetExec in preparation for implementing parallel scans for statistics #897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
// collect statistics for all partitions - we should really do this in parallel threads | ||
for chunk in chunks { | ||
let filenames: Vec<String> = chunk.iter().map(|x| x.to_string()).collect(); | ||
partitions.push(ParquetPartition::try_from_files(filenames, limit)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plan is to use tokio::spawn
here but this requires making more methods async so I wanted to tackle that as a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea looks very good to me @andygrove 👍
This may conflict with https://github.com/apache/arrow-datafusion/pull/811/files from @yjshen
I think the comment about truncate
ing the schemas should probably be addressed before merging, but I don't see any bug that it would cause at the present time, so I would be fine with this PR going in as is.
@@ -582,14 +389,244 @@ impl ParquetExec { | |||
|
|||
impl ParquetPartition { | |||
/// Create a new parquet partition | |||
pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self { | |||
pub fn new( | |||
filenames: Vec<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if having a Vec<FilenameAndSchema>
would be clearer than two parallel arrays (and then they could not get out of sync either)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the PartitionedFile
abstraction proposed by @yjshen in https://github.com/apache/arrow-datafusion/pull/811/files#diff-72f3a52c56e83e00d8c605d461f092617a3c205619376bb373069c662f9cfc93R54 would help solve this problem?
}; | ||
// remove files that are not needed in case of limit | ||
let mut filenames = filenames; | ||
filenames.truncate(total_files); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you also need to truncate schemas
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it.
Sorry I didn't realize #896. Actually, I was addressing the same issue (async and parallel parquet stats listing) pointed out by @rdettai here in #811: https://github.com/apache/arrow-datafusion/pull/811/files#diff-72f3a52c56e83e00d8c605d461f092617a3c205619376bb373069c662f9cfc93R189-R223, could you please take a look at this PR if you have time? |
@yjshen I have changed this PR to a draft and will hold off on working on this for now and will review your PR when I have time - probably at the weekend. It looks like you are farther along than I was with this. |
Thanks, @andygrove. It will be great to have your help. As you mentioned here in the comment:
I run into the same situation while handling the async listing and reading. And we may need to decide on how async should be propagated through the API: limit async to remote storage accessing or change the user-facing API. Looking forward to hearing from you. :P |
Closing this in favor of the work happening in #811 |
Which issue does this PR close?
Closes #896.
Rationale for this change
Refactor in preparation for making partition scans run in parallel.
What changes are included in this PR?
Refactor to move some logic from ParquetExec down to ParquetPartition
Are there any user-facing changes?
No.