Skip to content

Discussion: Design for experimental aggregation support in DataFusion compactions #4344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
m09526 opened this issue Feb 28, 2025 · 2 comments · May be fixed by #4383
Open

Discussion: Design for experimental aggregation support in DataFusion compactions #4344

m09526 opened this issue Feb 28, 2025 · 2 comments · May be fixed by #4383
Labels
question Further information is requested

Comments

@m09526
Copy link
Member

m09526 commented Feb 28, 2025

Background

To progress beyond "experimental" status, the DataFusion compaction library in Sleeper must be able to perform the function of "iterators" on in the Java codebase. The most important of these are basic filtering and aggregation logic, e.g. sum(), count(), etc. and simple row filtering e.g. age-off based on timestamp or a visibility filter.

Analysis

A full design and implementation is out of scope currently due to the number of things that have to be worked through first:

  • What functionality do we want to support?
  • What are the common use cases?
  • How should iterators be specified in a language/technology agnostic manner in Sleeper's configuration?
  • What is the performance of common iterator functionality in DataFusion?
  • How difficult is it to incorporate new functionality?

Since we can't answer some of these yet, this first attempt is a feasibility study for how this functionality can be added. It is intended to act as a minimum reasonable effort to provide a minimum functionality prototype. It does not and should not feature all the behaviour that we would want from a full general purpose iterator framework. Specifically, it is unlikely that this code will be anything more than experimental. Once necessary feedback and experience has been gleaned from this experiment, we should take any learnings to design a proper solution from scratch.

Description

  1. Re-purpose Sleeper table properties sleeper.table.iterator.class.name and sleeper.table.iterator.config to store configuration for DataFusion iterators. If the class.name property is set to the special marker DATAFUSION then this signifies to Sleeper that DataFusion aggregation or filtering is present on the table. Using the Java compactor on this table should instantly fail with explanatory message.
  2. The sleeper.table.iterator.config property for DataFusion will be of the form <filter>,<aggregation>. Either or both components may be empty. The filter component will expose one possible filter: age off. Only one filter will be specifiable at a time on a table (this is a minimum reasonable effort!).
    • Age off filter format is `ageoff='column','time_period'. If the elapsed time from the integer value in the given column to "now" is lower than the time_period value, the row is retained. Values in seconds.
    • Aggregation filter format is 'column'=OP where OP is one of sum, count, min or max. We may decide to expand the list of allowable operations. Additional aggregation filters should be comma separated.
  3. FFI bindings for Rust compaction code should be expanded to pass iterator configuration from Java to Rust code.
  4. Rust code will sanity check the given string for validity, e.g. column names are valid, strings are valid, regular expressions compile, etc.
  5. If aggregation is enabled, aggregation is performed on row key columns.
  6. If aggregation is enabled, all non row key columns MUST have an aggregation attached, otherwise query cannnot be run. This is inline with DataFusion and most other query engine expectations.
  7. The relevant filters and aggregations will be specified in the DataFusion query.

Desired findings

How much time/difficulty is the implementation?
How much performance impact is there relative to a normal compaction?
What, if any, other difficulties to we encounter whilst building this prototype?

@m09526 m09526 added the question Further information is requested label Feb 28, 2025
@gaffer01
Copy link
Member

Thanks, looks really good.

On the age off filter format example, I think in practice we want to age off relative to the current time, e.g. we have a column called 'timestamp' and we want to delete the row if (System.currentTime - timestamp) > time_period. This is a specific type of filter. So for this we need to know the column name and the value of time_period, rather than a fixed value of the timestamp field.

For the aggregation example, I think we stick to sum, min and max for now. Average can be supported but requires 2 columns (one for the count and one for the number of items), but here we're trying to specify the aggregation logic to apply to a specific column.

The regular expression example makes most sense for query-time compactions performed in queries or bulk exports. We need time to add support for using DataFusion for queries, but suggest we don't need to add regexp filtering support in at this stage.

@m09526
Copy link
Member Author

m09526 commented Feb 28, 2025

Ok, I've updated the proposal to reflect this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants