Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37543][table-planner] Support sink reuse in batch mode #26379

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

xiangyuf
Copy link
Contributor

What is the purpose of the change

Support sink reuse in batch mode

Brief change log

  • Introduce SinkReuser to reuse sink node in Batch mode

Verifying this change

This change is covered by tests, such as (please describe tests).

BatchSinkReuseTest
SinkReuseITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? FLIP-506

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 31, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

.booleanType()
.defaultValue(false)
.withDescription(
"When it is true, the optimizer will try to find out duplicated table sinks and "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: find out duplicated -> find duplicate

.withDescription(
"When it is true, the optimizer will try to find out duplicated table sinks and "
+ "reuse them. This works only when "
+ TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious when would you have TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED set and not want to reuse duplicates? i.e. is it feasible for us to extend what TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED means rather than introduce a new flag?

import org.apache.flink.table.connector.sink.DynamicTableSink;

/**
* Interface for {@link DynamicTableSink}s that support target column writing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: support -> supports

* Interface for {@link DynamicTableSink}s that support target column writing.
*
* <p>The planner will parse target columns from the DML clause and call {@link
* #applyTargetColumns(int[][])} to pass an array of column index paths to sink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to sink -> to the sink


/**
* This checks each sink node to see if it can be reused with another sink node. If so, we will
* reuse all reusable sink to one instance. This only used in the STATEMENT SET clause with multiple
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sink -> sinks
I do not know what you mean by to one instance in this sentence. Maybe this sentence could be

If we find we have duplicate sinks that can be reused, then only one of these sink will be used. This is an optimization, so that we do not need to process multiple sinks that are actually representing the same destination table.

RelTraitSet currentInputTraitSet = currentSinkNode.getInput().getTraitSet();
for (ReusableSinkGroup group : reusableSinkGroups) {
// Only table sink with the same digest, specs and input trait set can be reused
if (!(group.digest.equals(currentSinkDigest)
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nicer code to have the if as a positive if and then have an else to avoid the continue.

Also I wonder if this change could be moved into a method in ReusableSinkGroup class

break;
}

if (!canBeReused) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this if mean? It is saying that if we cannot Reuse then add to a resuableSinkGroup. This does not look right. I was expecting if (canBeReused) {

return digest.toString();
}

private static class ReusableSinkGroup {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this static?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants