Skip to content

[FLINK-37620][state/forst] ForSt Sync mode support remote storage #26412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/content.zh/docs/ops/state/disaggregated_state.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ state.backend.forst.primary-dir: s3://your-bucket/forst-state
checkpoint and fast recovery, since the ForSt will perform file copy between the primary
storage location and the checkpoint directory during checkpointing and recovery.

#### ForSt Local Storage Location

By default, ForSt will **ONLY** disaggregate state when asynchronous APIs (State V2) are used. When
using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve as **local state store**.
Since a job may contain multiple ForSt instances with mixed API usage, synchronous local state access
along with asynchronous remote state access could help achieve better overall throughput.
If you want the operators with synchronous state APIs to store state in remote, the following configuration will help:
```yaml
state.backend.forst.sync.enforce-local: false
```
And you can specify the local storage location via:
```yaml
state.backend.forst.local-dir: path-to-local-dir
```

#### ForSt File Cache

Expand Down
14 changes: 14 additions & 0 deletions docs/content/docs/ops/state/disaggregated_state.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ state.backend.forst.primary-dir: s3://your-bucket/forst-state
checkpoint and fast recovery, since the ForSt will perform file copy between the primary
storage location and the checkpoint directory during checkpointing and recovery.

#### ForSt Local Storage Location

By default, ForSt will **ONLY** disaggregate state when asynchronous APIs (State V2) are used. When
using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve as **local state store**.
Since a job may contain multiple ForSt instances with mixed API usage, synchronous local state access
along with asynchronous remote state access could help achieve better overall throughput.
If you want the operators with synchronous state APIs to store state in remote, the following configuration will help:
```yaml
state.backend.forst.sync.enforce-local: false
```
And you can specify the local storage location via:
```yaml
state.backend.forst.local-dir: path-to-local-dir
```

#### ForSt File Cache

Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/forst_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
<td>String</td>
<td>The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager.</td>
</tr>
<tr>
<td><h5>state.backend.forst.sync.enforce-local</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enforce local state for operators in synchronous mode when enabling disaggregated state. This is useful in cases where both synchronous operators and asynchronous operators are used in the same job.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this true?
same job. -> same local job.

</tr>
<tr>
<td><h5>state.backend.forst.timer-service.cache-size</h5></td>
<td style="word-wrap: break-word;">128</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>String</td>
<td>The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager.</td>
</tr>
<tr>
<td><h5>state.backend.forst.sync.enforce-local</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enforce local state for operators in synchronous mode when enabling disaggregated state. This is useful in cases where both synchronous operators and asynchronous operators are used in the same job.</td>
</tr>
<tr>
<td><h5>state.backend.forst.timer-service.cache-size</h5></td>
<td style="word-wrap: break-word;">128</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ private ForStRestoreOperation getForStRestoreOperation(
// env. We expect to directly use the dfs directory in flink env or local directory as
// working dir. We will implement this in ForStDB later, but before that, we achieved this
// by setting the dbPath to "/" when the dfs directory existed.
// TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir
Path instanceForStPath =
optionsContainer.getRemoteForStPath() == null
? optionsContainer.getLocalForStPath()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ public class ForStOptions {
CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT,
LOCAL_DIR_AS_PRIMARY_SHORTCUT));

@Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
public static final ConfigOption<Boolean> SYNC_ENFORCE_LOCAL =
ConfigOptions.key("state.backend.forst.sync.enforce-local")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to enforce local state for operators in synchronous mode when"
+ " enabling disaggregated state. This is useful in cases where "
+ "both synchronous operators and asynchronous operators are used "
+ "in the same job.");

@Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
public static final ConfigOption<String> CACHE_DIRECTORY =
ConfigOptions.key("state.backend.forst.cache.dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -188,8 +189,12 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend
/** The recovery claim mode. */
private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT;

/** Whether to share the ForSt remote directory with checkpoint directory. */
private boolean remoteShareWithCheckpoint;

/** Whether to use local directory as primary directory in synchronous mode. */
private boolean forceSyncLocal;

// ------------------------------------------------------------------------

/** Creates a new {@code ForStStateBackend} for storing state. */
Expand All @@ -203,6 +208,7 @@ public ForStStateBackend() {
this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
this.remoteShareWithCheckpoint = false;
this.forceSyncLocal = true;
}

/**
Expand Down Expand Up @@ -237,6 +243,7 @@ private ForStStateBackend(
: new Path(remoteDirStr);
}
}
this.forceSyncLocal = config.get(ForStOptions.SYNC_ENFORCE_LOCAL);

this.priorityQueueConfig =
ForStPriorityQueueConfig.fromOtherAndConfiguration(
Expand Down Expand Up @@ -409,31 +416,7 @@ public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(

lazyInitializeForJob(env, fileCompatibleIdentifier);

String opChildPath =
String.format(
"op_%s_attempt_%s",
fileCompatibleIdentifier, env.getTaskInfo().getAttemptNumber());

Path localBasePath =
new Path(
new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath)
.getAbsolutePath());
Path remoteBasePath = null;
if (remoteForStDirectory != null) {
remoteBasePath =
new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath);
} else if (remoteShareWithCheckpoint) {
if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) {
Path sharedStateDirectory =
((FsCheckpointStorageAccess) env.getCheckpointStorageAccess())
.getSharedStateDirectory();
remoteBasePath = new Path(sharedStateDirectory, opChildPath);
LOG.info("Set remote ForSt directory to checkpoint directory {}", remoteBasePath);
} else {
LOG.warn(
"Remote ForSt directory can't be set, because checkpoint directory isn't on file system.");
}
}
Tuple2<Path, Path> localAndRemoteBasePath = getForStBasePath(fileCompatibleIdentifier, env);

final OpaqueMemoryResource<ForStSharedResources> sharedResources =
ForStOperationUtils.allocateSharedCachesIfConfigured(
Expand All @@ -448,8 +431,8 @@ public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(
final ForStResourceContainer resourceContainer =
createOptionsAndResourceContainer(
sharedResources,
localBasePath,
remoteBasePath,
localAndRemoteBasePath.f0,
localAndRemoteBasePath.f1,
env.getCheckpointStorageAccess(),
parameters.getMetricGroup(),
nativeMetricOptions.isStatisticsEnabled());
Expand Down Expand Up @@ -505,17 +488,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(

lazyInitializeForJob(env, fileCompatibleIdentifier);

Path instanceBasePath =
new Path(
new File(
getNextStoragePath(),
"job_"
+ jobId
+ "_op_"
+ fileCompatibleIdentifier
+ "_uuid_"
+ UUID.randomUUID())
.getAbsolutePath());
Tuple2<Path, Path> localAndRemoteBasePath = getForStBasePath(fileCompatibleIdentifier, env);

LocalRecoveryConfig localRecoveryConfig =
env.getTaskStateManager().createLocalRecoveryConfig();
Expand All @@ -533,10 +506,10 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
final ForStResourceContainer resourceContainer =
createOptionsAndResourceContainer(
sharedResources,
instanceBasePath,
null,
localAndRemoteBasePath.f0,
forceSyncLocal ? null : localAndRemoteBasePath.f1,
env.getCheckpointStorageAccess(),
null,
parameters.getMetricGroup(),
nativeMetricOptions.isStatisticsEnabled());

ExecutionConfig executionConfig = env.getExecutionConfig();
Expand All @@ -549,7 +522,6 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
new ForStSyncKeyedStateBackendBuilder<>(
parameters.getOperatorIdentifier(),
env.getUserCodeClassLoader().asClassLoader(),
instanceBasePath,
resourceContainer,
stateName -> resourceContainer.getColumnOptions(),
parameters.getKvStateRegistry(),
Expand Down Expand Up @@ -818,6 +790,35 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon
return configuration;
}

Tuple2<Path, Path> getForStBasePath(String operatorIdentifier, Environment env) {
String opChildPath =
String.format(
"op_%s_attempt_%s",
operatorIdentifier, env.getTaskInfo().getAttemptNumber());

Path localBasePath =
new Path(
new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath)
.getAbsolutePath());
Path remoteBasePath = null;
if (remoteForStDirectory != null) {
remoteBasePath =
new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath);
} else if (remoteShareWithCheckpoint) {
if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) {
Path sharedStateDirectory =
((FsCheckpointStorageAccess) env.getCheckpointStorageAccess())
.getSharedStateDirectory();
remoteBasePath = new Path(sharedStateDirectory, opChildPath);
LOG.info("Set remote ForSt directory to checkpoint directory {}", remoteBasePath);
} else {
LOG.warn(
"Remote ForSt directory can't be set, because checkpoint directory isn't on file system.");
}
}
return Tuple2.of(localBasePath, remoteBasePath);
}

@VisibleForTesting
ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path localBasePath) {
return createOptionsAndResourceContainer(null, localBasePath, null, null, null, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** The configuration of rocksDB priority queue state implementation. */
/** The configuration of ForSt priority queue state implementation. */
public class ForStPriorityQueueConfig implements Serializable {

private static final long serialVersionUID = 1L;
Expand All @@ -39,17 +39,17 @@ public class ForStPriorityQueueConfig implements Serializable {
private @Nullable ForStStateBackend.PriorityQueueStateType priorityQueueStateType;

/** cache size per keyGroup for rocksDB priority queue state. */
private int rocksDBPriorityQueueSetCacheSize;
private int forStDBPriorityQueueSetCacheSize;

public ForStPriorityQueueConfig() {
this(null, UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE);
}

public ForStPriorityQueueConfig(
ForStStateBackend.PriorityQueueStateType priorityQueueStateType,
int rocksDBPriorityQueueSetCacheSize) {
int forStDBPriorityQueueSetCacheSize) {
this.priorityQueueStateType = priorityQueueStateType;
this.rocksDBPriorityQueueSetCacheSize = rocksDBPriorityQueueSetCacheSize;
this.forStDBPriorityQueueSetCacheSize = forStDBPriorityQueueSetCacheSize;
}

/**
Expand All @@ -70,10 +70,10 @@ public void setPriorityQueueStateType(ForStStateBackend.PriorityQueueStateType t
* Gets the cache size of rocksDB priority queue set. It will fall back to the default value if
* it is not explicitly set.
*/
public int getRocksDBPriorityQueueSetCacheSize() {
return rocksDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
public int getForStDBPriorityQueueSetCacheSize() {
return forStDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
? FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue()
: rocksDBPriorityQueueSetCacheSize;
: forStDBPriorityQueueSetCacheSize;
}

public static ForStPriorityQueueConfig fromOtherAndConfiguration(
Expand All @@ -83,10 +83,10 @@ public static ForStPriorityQueueConfig fromOtherAndConfiguration(
? config.get(TIMER_SERVICE_FACTORY)
: other.priorityQueueStateType;
int cacheSize =
(other.rocksDBPriorityQueueSetCacheSize
(other.forStDBPriorityQueueSetCacheSize
== UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE)
? config.get(FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE)
: other.rocksDBPriorityQueueSetCacheSize;
: other.forStDBPriorityQueueSetCacheSize;
return new ForStPriorityQueueConfig(priorityQueueType, cacheSize);
}

Expand Down
Loading