Skip to content

Support more reliable async task retry to guarantee eventual execution (1/2) – Metastore Layer #1523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,12 @@ private void revokeGrantRecord(
@Override
public @Nonnull EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit) {
return loadTasks(callCtx, executorId, limit, false);
}

@Override
public @Nonnull EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit, boolean txnPerTask) {
BasePersistence ms = callCtx.getMetaStore();

// find all available tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,36 @@ EntityResult loadEntity(
@Nonnull PolarisEntityType entityType);

/**
* Fetch a list of tasks to be completed. Tasks
* Fetch a list of tasks to be completed.
*
* <p>This method uses the default task selection logic, which selects tasks that are either
* unassigned or have timed out. All matching tasks are processed within a single transaction.
*
* @param callCtx call context
* @param executorId executor id
* @param limit limit
* @return list of tasks to be completed
* @param limit max number of tasks to lease
* @return list of leased tasks
*/
@Nonnull
EntitiesResult loadTasks(@Nonnull PolarisCallContext callCtx, String executorId, int limit);

/**
* Fetch a list of tasks to be completed with customizable behavior.
*
* <p>Supports custom filtering and per-task transactional execution. When {@code txnPerTask} is
* true, each task is leased within its own transaction to avoid aborting the entire batch on a
* single failure.
*
* @param callCtx call context
* @param executorId executor id
* @param limit max number of tasks to lease
* @param txnPerTask whether to lease each task in its own transaction
* @return list of successfully leased tasks
*/
@Nonnull
EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit, boolean txnPerTask);

/**
* Load change tracking information for a set of entities in one single shot and return for each
* the version for the entity itself and the version associated to its grant records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,13 @@ public EntitiesResult loadTasks(
return null;
}

@Override
public EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit, boolean txnPerTask) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadTasks");
return null;
}

@Override
public ScopedCredentialsResult getSubscopedCredsForEntity(
@Nonnull PolarisCallContext callCtx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
Expand Down Expand Up @@ -1973,9 +1974,6 @@ private PolarisEntityResolver resolveSecurableToRoleGrant(
if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
loadedTasks.add(result.getEntity());
} else {
// TODO: Consider performing incremental leasing of individual tasks one at a time
// instead of requiring all-or-none semantics for all the tasks we think we listed,
// or else contention could be very bad.
ms.rollback();
throw new RetryOnConcurrencyException(
"Failed to lease available task with status %s, info: %s",
Expand All @@ -1985,11 +1983,60 @@ private PolarisEntityResolver resolveSecurableToRoleGrant(
return new EntitiesResult(loadedTasks);
}

private @Nonnull EntitiesResult loadTasksWithIsolatedTxn(
@Nonnull PolarisCallContext callCtx,
@Nonnull TransactionalPersistence ms,
String executorId,
int limit) {
List<EntitiesResult> entitySuccessResults = new ArrayList<>();
final AtomicInteger failedLeaseCount = new AtomicInteger(0);
for (int i = 0; i < limit; i++) {
try {
EntitiesResult result =
ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, 1));
if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
entitySuccessResults.add(result);
} else {
failedLeaseCount.incrementAndGet();
LOGGER.warn(
"Fail to lease task, error status: {}, error info: {}",
result.getReturnStatus(),
result.getExtraInformation());
}
} catch (Exception e) {
failedLeaseCount.incrementAndGet();
LOGGER.warn("Exception while leasing task: {}", e.getMessage());
}
}

if (entitySuccessResults.isEmpty() && failedLeaseCount.get() > 0) {
throw new RetryOnConcurrencyException(
"Failed to lease any of %s tasks due to concurrent leases", failedLeaseCount.get());
}

List<PolarisBaseEntity> entities =
entitySuccessResults.stream()
.flatMap(result -> result.getEntities().stream())
.collect(Collectors.toList());

return new EntitiesResult(entities);
}

@Override
public @Nonnull EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit) {
return loadTasks(callCtx, executorId, limit, false);
}

@Override
public @Nonnull EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit, boolean perTaskTxn) {
TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore());
return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, limit));
if (!perTaskTxn) {
return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, limit));
} else {
return loadTasksWithIsolatedTxn(callCtx, ms, executorId, limit);
}
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.threeten.extra.MutableClock;

/**
Expand Down Expand Up @@ -297,8 +299,9 @@ protected void testPolicyMappingCleanup() {
polarisTestMetaStoreManager.testPolicyMappingCleanup();
}

@Test
protected void testLoadTasks() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
protected void testLoadTasks(boolean txnPerTask) {
for (int i = 0; i < 20; i++) {
polarisTestMetaStoreManager.createEntity(
null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
Expand All @@ -307,7 +310,7 @@ protected void testLoadTasks() {
PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager;
PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext;
List<PolarisBaseEntity> taskList =
metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
metaStoreManager.loadTasks(callCtx, executorId, 5, txnPerTask).getEntities();
Assertions.assertThat(taskList)
.isNotNull()
.isNotEmpty()
Expand All @@ -327,7 +330,7 @@ protected void testLoadTasks() {

// grab a second round of tasks. Assert that none of the original 5 are in the list
List<PolarisBaseEntity> newTaskList =
metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
metaStoreManager.loadTasks(callCtx, executorId, 5, txnPerTask).getEntities();
Assertions.assertThat(newTaskList)
.isNotNull()
.isNotEmpty()
Expand All @@ -341,7 +344,7 @@ protected void testLoadTasks() {

// only 10 tasks are unassigned. Requesting 20, we should only receive those 10
List<PolarisBaseEntity> lastTen =
metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
metaStoreManager.loadTasks(callCtx, executorId, 20, txnPerTask).getEntities();

Assertions.assertThat(lastTen)
.isNotNull()
Expand All @@ -355,15 +358,15 @@ protected void testLoadTasks() {
.collect(Collectors.toSet());

List<PolarisBaseEntity> emtpyList =
metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
metaStoreManager.loadTasks(callCtx, executorId, 20, txnPerTask).getEntities();

Assertions.assertThat(emtpyList).isNotNull().isEmpty();

timeSource.add(Duration.ofMinutes(10));

// all the tasks are unassigned. Fetch them all
List<PolarisBaseEntity> allTasks =
metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
metaStoreManager.loadTasks(callCtx, executorId, 20, txnPerTask).getEntities();

Assertions.assertThat(allTasks)
.isNotNull()
Expand All @@ -378,13 +381,14 @@ protected void testLoadTasks() {
timeSource.add(Duration.ofMinutes(10));

List<PolarisBaseEntity> finalList =
metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
metaStoreManager.loadTasks(callCtx, executorId, 20, txnPerTask).getEntities();

Assertions.assertThat(finalList).isNotNull().isEmpty();
}

@Test
protected void testLoadTasksInParallel() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
protected void testLoadTasksInParallel(boolean txnPerTask) throws Exception {
for (int i = 0; i < 100; i++) {
polarisTestMetaStoreManager.createEntity(
null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
Expand All @@ -406,7 +410,10 @@ protected void testLoadTasksInParallel() throws Exception {
do {
retry = false;
try {
taskList = metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
taskList =
metaStoreManager
.loadTasks(callCtx, executorId, 5, txnPerTask)
.getEntities();
taskList.stream().map(PolarisBaseEntity::getName).forEach(taskNames::add);
} catch (RetryOnConcurrencyException e) {
retry = true;
Expand Down Expand Up @@ -442,6 +449,143 @@ protected void testLoadTasksInParallel() throws Exception {
.allSatisfy((k, v) -> Assertions.assertThat(v).isEqualTo(1));
}

@Test
protected void testLoadTasksOnCompensation() {
for (int i = 0; i < 20; i++) {
polarisTestMetaStoreManager.createEntity(
null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
}
String executorId = "testExecutor_abc";
PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager;
PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext;

// First round of tasks are loaded as before, mock usual scheduling process
List<PolarisBaseEntity> taskList =
metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
Assertions.assertThat(taskList)
.isNotNull()
.isNotEmpty()
.hasSize(5)
.allSatisfy(
entry ->
Assertions.assertThat(entry)
.extracting(
e ->
PolarisObjectMapperUtil.deserializeProperties(
callCtx, e.getProperties()))
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("lastAttemptExecutorId", executorId)
.containsEntry("attemptCount", "1"));
Set<String> firstFiveTasks =
taskList.stream().map(PolarisBaseEntity::getName).collect(Collectors.toSet());

// Remaining tasks are loaded with perTaskTxn, mock periodic compensation
List<PolarisBaseEntity> remainingTaskList =
metaStoreManager.loadTasks(callCtx, executorId, 20, true).getEntities();
Assertions.assertThat(remainingTaskList)
.isNotNull()
.isNotEmpty()
.hasSize(15)
.extracting(PolarisBaseEntity::getName)
.noneMatch(firstFiveTasks::contains);

Set<String> allTaskNames =
Stream.concat(
firstFiveTasks.stream(), remainingTaskList.stream().map(PolarisBaseEntity::getName))
.collect(Collectors.toSet());

// Try to load unfinished tasks again
List<PolarisBaseEntity> emtpyList =
metaStoreManager.loadTasks(callCtx, executorId, 20, true).getEntities();

Assertions.assertThat(emtpyList).isNotNull().isEmpty();

// all the tasks are unassigned. Fetch them all
timeSource.add(Duration.ofMinutes(10));
List<PolarisBaseEntity> allTasks =
metaStoreManager.loadTasks(callCtx, executorId, 20, true).getEntities();

Assertions.assertThat(allTasks)
.isNotNull()
.isNotEmpty()
.hasSize(20)
.extracting(PolarisBaseEntity::getName)
.allMatch(allTaskNames::contains);

// drop all the tasks. Skip the clock forward and fetch. empty list expected
allTasks.forEach(
entity -> metaStoreManager.dropEntityIfExists(callCtx, null, entity, Map.of(), false));
timeSource.add(Duration.ofMinutes(10));

List<PolarisBaseEntity> finalList =
metaStoreManager.loadTasks(callCtx, executorId, 20, true).getEntities();

Assertions.assertThat(finalList).isNotNull().isEmpty();
}

@Test
protected void testLoadTasksOnStartupRecovery() {
for (int i = 0; i < 20; i++) {
polarisTestMetaStoreManager.createEntity(
null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
}
String firstExecutorId = "testExecutor_1";
PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager;
PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext;

// First round of tasks are loaded as before, mock usual scheduling process
List<PolarisBaseEntity> firstRoundtaskList =
metaStoreManager.loadTasks(callCtx, firstExecutorId, 20).getEntities();
Assertions.assertThat(firstRoundtaskList)
.isNotNull()
.isNotEmpty()
.hasSize(20)
.allSatisfy(
entry ->
Assertions.assertThat(entry)
.extracting(
e ->
PolarisObjectMapperUtil.deserializeProperties(
callCtx, e.getProperties()))
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("lastAttemptExecutorId", firstExecutorId)
.containsEntry("attemptCount", "1"));
Set<String> firstRoundTask =
firstRoundtaskList.stream().map(PolarisBaseEntity::getName).collect(Collectors.toSet());

// Mock service dies, try with another executor and load all tasks
timeSource.add(Duration.ofMinutes(10));
String secondExecutorId = "testExecutor_2";
List<PolarisBaseEntity> secondRoundTaskList =
metaStoreManager.loadTasks(callCtx, secondExecutorId, 20, true).getEntities();
Assertions.assertThat(secondRoundTaskList)
.isNotNull()
.isNotEmpty()
.hasSize(20)
.allSatisfy(
entry ->
Assertions.assertThat(entry)
.extracting(
e ->
PolarisObjectMapperUtil.deserializeProperties(
callCtx, e.getProperties()))
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("lastAttemptExecutorId", secondExecutorId)
.containsEntry("attemptCount", "2"))
.extracting(PolarisBaseEntity::getName)
.allMatch(firstRoundTask::contains);

// drop all the tasks. Skip the clock forward and fetch. empty list expected
secondRoundTaskList.forEach(
entity -> metaStoreManager.dropEntityIfExists(callCtx, null, entity, Map.of(), false));
timeSource.add(Duration.ofMinutes(10));

List<PolarisBaseEntity> finalList =
metaStoreManager.loadTasks(callCtx, secondExecutorId, 20, true).getEntities();

Assertions.assertThat(finalList).isNotNull().isEmpty();
}

/** Test generateNewEntityId() function that generates unique ids by creating Tasks in parallel */
@Test
protected void testCreateTasksInParallel() throws Exception {
Expand Down