Skip to content

Commit 3b56e2a

Browse files
alexr17voonhous
authored andcommitted
[HUDI-9159] S3 implementation of StorageLock for StorageBasedLockProvider (apache#13126)
* Adds the storage based lock provider implementation for s3 hudi tables. --------- Co-authored-by: Y Ethan Guo <[email protected]> Co-authored-by: sivabalan <[email protected]> (cherry picked from commit ed1ef93)
1 parent b49507c commit 3b56e2a

File tree

15 files changed

+1229
-187
lines changed

15 files changed

+1229
-187
lines changed

Diff for: hudi-aws/pom.xml

+24
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,13 @@
174174
<version>${aws.sdk.version}</version>
175175
</dependency>
176176

177+
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 -->
178+
<dependency>
179+
<groupId>software.amazon.awssdk</groupId>
180+
<artifactId>s3</artifactId>
181+
<version>${aws.sdk.version}</version>
182+
</dependency>
183+
177184
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sqs -->
178185
<dependency>
179186
<groupId>software.amazon.awssdk</groupId>
@@ -205,11 +212,28 @@
205212
<version>${project.version}</version>
206213
<scope>test</scope>
207214
</dependency>
215+
<dependency>
216+
<groupId>org.apache.hudi</groupId>
217+
<artifactId>hudi-client-common</artifactId>
218+
<version>${project.version}</version>
219+
<classifier>tests</classifier>
220+
<scope>test</scope>
221+
</dependency>
208222
<dependency>
209223
<groupId>org.mockito</groupId>
210224
<artifactId>mockito-inline</artifactId>
211225
<scope>test</scope>
212226
</dependency>
227+
<dependency>
228+
<groupId>org.testcontainers</groupId>
229+
<artifactId>testcontainers</artifactId>
230+
<scope>test</scope>
231+
</dependency>
232+
<dependency>
233+
<groupId>org.testcontainers</groupId>
234+
<artifactId>localstack</artifactId>
235+
<scope>test</scope>
236+
</dependency>
213237
</dependencies>
214238

215239
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
16+
* or implied.
17+
* See the License for the specific language governing
18+
* permissions and limitations under the License.
19+
*/
20+
21+
package org.apache.hudi.aws.transaction.lock;
22+
23+
import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
24+
import org.apache.hudi.client.transaction.lock.StorageLockClient;
25+
import org.apache.hudi.client.transaction.lock.models.LockGetResult;
26+
import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
27+
import org.apache.hudi.client.transaction.lock.models.StorageLockData;
28+
import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
29+
import org.apache.hudi.common.util.Functions;
30+
import org.apache.hudi.common.util.Option;
31+
import org.apache.hudi.common.util.StringUtils;
32+
import org.apache.hudi.common.util.VisibleForTesting;
33+
import org.apache.hudi.common.util.collection.Pair;
34+
import org.apache.hudi.exception.HoodieLockException;
35+
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
39+
import software.amazon.awssdk.core.ResponseInputStream;
40+
import software.amazon.awssdk.core.exception.SdkClientException;
41+
import software.amazon.awssdk.core.sync.RequestBody;
42+
import software.amazon.awssdk.regions.Region;
43+
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
44+
import software.amazon.awssdk.services.s3.S3Client;
45+
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
46+
import software.amazon.awssdk.services.s3.model.GetBucketLocationResponse;
47+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
48+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
49+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
50+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
51+
import software.amazon.awssdk.services.s3.model.S3Exception;
52+
53+
import javax.annotation.concurrent.ThreadSafe;
54+
55+
import java.io.IOException;
56+
import java.io.UncheckedIOException;
57+
import java.net.URI;
58+
import java.net.URISyntaxException;
59+
import java.time.Duration;
60+
import java.util.Properties;
61+
62+
import static org.apache.hudi.config.StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS;
63+
64+
/**
65+
* S3-based distributed lock client using ETag checks (AWS SDK v2).
66+
* See RFC: <a href="https://github.com/apache/hudi/blob/master/rfc/rfc-91/rfc-91.md">RFC-91</a>
67+
*/
68+
@ThreadSafe
69+
public class S3StorageLockClient implements StorageLockClient {
70+
71+
private static final Logger LOG = LoggerFactory.getLogger(S3StorageLockClient.class);
72+
private static final int PRECONDITION_FAILURE_ERROR_CODE = 412;
73+
private static final int NOT_FOUND_ERROR_CODE = 404;
74+
private static final int CONDITIONAL_REQUEST_CONFLICT_ERROR_CODE = 409;
75+
private static final int RATE_LIMIT_ERROR_CODE = 429;
76+
private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500;
77+
78+
private final Logger logger;
79+
private final S3Client s3Client;
80+
private final String bucketName;
81+
private final String lockFilePath;
82+
private final String ownerId;
83+
84+
/**
85+
* Constructor that is used by reflection to instantiate an S3-based storage locking client.
86+
*
87+
* @param ownerId The owner id.
88+
* @param lockFileUri The full table base path where the lock will be written.
89+
* @param props The properties for the lock config, can be used to customize client.
90+
*/
91+
public S3StorageLockClient(String ownerId, String lockFileUri, Properties props) {
92+
this(ownerId, lockFileUri, props, createDefaultS3Client(), LOG);
93+
}
94+
95+
@VisibleForTesting
96+
S3StorageLockClient(String ownerId, String lockFileUri, Properties props, Functions.Function2<String, Properties, S3Client> s3ClientSupplier, Logger logger) {
97+
try {
98+
// This logic can likely be extended to other lock client implementations.
99+
// Consider creating base class with utilities, incl error handling.
100+
URI uri = new URI(lockFileUri);
101+
this.bucketName = uri.getHost();
102+
this.lockFilePath = uri.getPath().replaceFirst("/", "");
103+
this.s3Client = s3ClientSupplier.apply(bucketName, props);
104+
105+
if (StringUtils.isNullOrEmpty(this.bucketName)) {
106+
throw new IllegalArgumentException("LockFileUri does not contain a valid bucket name.");
107+
}
108+
if (StringUtils.isNullOrEmpty(this.lockFilePath)) {
109+
throw new IllegalArgumentException("LockFileUri does not contain a valid lock file path.");
110+
}
111+
this.ownerId = ownerId;
112+
this.logger = logger;
113+
} catch (URISyntaxException e) {
114+
throw new HoodieLockException(e);
115+
}
116+
}
117+
118+
@Override
119+
public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() {
120+
try (ResponseInputStream<GetObjectResponse> in = s3Client.getObject(
121+
GetObjectRequest.builder()
122+
.bucket(bucketName)
123+
.key(lockFilePath)
124+
.build())) {
125+
String eTag = in.response().eTag();
126+
return Pair.of(LockGetResult.SUCCESS, Option.of(StorageLockFile.createFromStream(in, eTag)));
127+
} catch (S3Exception e) {
128+
int status = e.statusCode();
129+
// Default to unknown error
130+
LockGetResult result = LockGetResult.UNKNOWN_ERROR;
131+
if (status == NOT_FOUND_ERROR_CODE) {
132+
logger.info("OwnerId: {}, Object not found: {}", ownerId, lockFilePath);
133+
result = LockGetResult.NOT_EXISTS;
134+
} else if (status == CONDITIONAL_REQUEST_CONFLICT_ERROR_CODE) {
135+
logger.info("OwnerId: {}, Conflicting operation has occurred: {}", ownerId, lockFilePath);
136+
} else if (status == RATE_LIMIT_ERROR_CODE) {
137+
logger.warn("OwnerId: {}, Rate limit exceeded: {}", ownerId, lockFilePath);
138+
} else if (status >= INTERNAL_SERVER_ERROR_CODE_MIN) {
139+
logger.warn("OwnerId: {}, S3 internal server error: {}", ownerId, lockFilePath, e);
140+
} else {
141+
throw e;
142+
}
143+
return Pair.of(result, Option.empty());
144+
} catch (IOException e) {
145+
throw new UncheckedIOException("Failed reading lock file from S3: " + lockFilePath, e);
146+
}
147+
}
148+
149+
@Override
150+
public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(StorageLockData newLockData, Option<StorageLockFile> previousLockFile) {
151+
boolean isLockRenewal = previousLockFile.isPresent();
152+
String currentEtag = isLockRenewal ? previousLockFile.get().getVersionId() : null;
153+
LockUpsertResult result = LockUpsertResult.UNKNOWN_ERROR;
154+
try {
155+
StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, currentEtag);
156+
return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated));
157+
} catch (S3Exception e) {
158+
result = handleUpsertS3Exception(e);
159+
} catch (AwsServiceException | SdkClientException e) {
160+
logger.warn("OwnerId: {}, Unexpected SDK error while writing lock file: {}", ownerId, lockFilePath, e);
161+
if (!isLockRenewal) {
162+
// We should always throw errors early when we are creating the lock file.
163+
// This is likely indicative of a larger issue that should bubble up sooner.
164+
throw e;
165+
}
166+
}
167+
168+
return Pair.of(result, Option.empty());
169+
}
170+
171+
/**
172+
* Internal helper to create or update the lock file with optional ETag precondition.
173+
*/
174+
private StorageLockFile createOrUpdateLockFileInternal(StorageLockData lockData, String expectedEtag) {
175+
byte[] bytes = StorageLockFile.toByteArray(lockData);
176+
PutObjectRequest.Builder putRequestBuilder = PutObjectRequest.builder()
177+
.bucket(bucketName)
178+
.key(lockFilePath);
179+
180+
// ETag-based constraints:
181+
// - If expectedEtag is not null:
182+
// We assume that the file already exists on S3 with the ETag "expectedEtag".
183+
// The update operation will include an ifMatch(expectedEtag) condition, meaning the update will only
184+
// succeed if the current file's ETag exactly matches expectedEtag.
185+
// If the actual ETag of the file on S3 differs from expectedEtag, the update attempt will fail.
186+
// - If expectedEtag is null:
187+
// We assume that the file does not currently exist on S3.
188+
// The operation will use ifNoneMatch("*"), which instructs S3 to create the file only if it doesn't already exist.
189+
// If a file with the same name is present (i.e., there is an existing ETag), the creation attempt will fail.
190+
if (expectedEtag == null) {
191+
putRequestBuilder.ifNoneMatch("*");
192+
} else {
193+
putRequestBuilder.ifMatch(expectedEtag);
194+
}
195+
196+
PutObjectResponse response = s3Client.putObject(putRequestBuilder.build(), RequestBody.fromBytes(bytes));
197+
String newEtag = response.eTag();
198+
199+
return new StorageLockFile(lockData, newEtag);
200+
}
201+
202+
private LockUpsertResult handleUpsertS3Exception(S3Exception e) {
203+
int status = e.statusCode();
204+
if (status == PRECONDITION_FAILURE_ERROR_CODE) {
205+
logger.warn("OwnerId: {}, Lockfile modified by another process: {}", ownerId, lockFilePath);
206+
return LockUpsertResult.ACQUIRED_BY_OTHERS;
207+
} else if (status == CONDITIONAL_REQUEST_CONFLICT_ERROR_CODE) {
208+
logger.warn("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFilePath);
209+
} else if (status == RATE_LIMIT_ERROR_CODE) {
210+
logger.warn("OwnerId: {}, Rate limit exceeded for: {}", ownerId, lockFilePath);
211+
} else if (status >= INTERNAL_SERVER_ERROR_CODE_MIN) {
212+
logger.warn("OwnerId: {}, internal server error for: {}", ownerId, lockFilePath, e);
213+
} else {
214+
logger.error("OwnerId: {}, Error writing lock file: {}", ownerId, lockFilePath, e);
215+
}
216+
217+
return LockUpsertResult.UNKNOWN_ERROR;
218+
}
219+
220+
private static Functions.Function2<String, Properties, S3Client> createDefaultS3Client() {
221+
return (bucketName, props) -> {
222+
Region region;
223+
boolean requiredFallbackRegion = false;
224+
try {
225+
region = DefaultAwsRegionProviderChain.builder().build().getRegion();
226+
} catch (SdkClientException e) {
227+
// Fallback to us-east-1 if no region is found
228+
region = Region.US_EAST_1;
229+
requiredFallbackRegion = true;
230+
}
231+
232+
// Set all request timeouts to be 1/5 of the default validity.
233+
// Each call to acquire a lock requires 2 requests.
234+
// Each renewal requires 1 request.
235+
long validityTimeoutSecs = (long) props.getOrDefault(VALIDITY_TIMEOUT_SECONDS.key(), VALIDITY_TIMEOUT_SECONDS.defaultValue());
236+
long s3CallTimeoutSecs = validityTimeoutSecs / 5;
237+
S3Client s3Client = createS3Client(region, s3CallTimeoutSecs, props);
238+
if (requiredFallbackRegion) {
239+
GetBucketLocationResponse bucketLocationResponse = s3Client.getBucketLocation(
240+
GetBucketLocationRequest.builder().bucket(bucketName).build());
241+
// This is null when the region is US_EAST_1, so we do not need to worry about duplicate logic.
242+
String regionString = bucketLocationResponse.locationConstraintAsString();
243+
if (!StringUtils.isNullOrEmpty(regionString)) {
244+
// Close existing client and create another.
245+
s3Client.close();
246+
return createS3Client(
247+
Region.of(regionString),
248+
s3CallTimeoutSecs,
249+
props);
250+
}
251+
}
252+
253+
return s3Client;
254+
};
255+
}
256+
257+
private static S3Client createS3Client(Region region, long timeoutSecs, Properties props) {
258+
// Set the timeout, credentials, and region
259+
return S3Client.builder()
260+
.overrideConfiguration(
261+
b -> b.apiCallTimeout(Duration.ofSeconds(timeoutSecs)))
262+
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
263+
.region(region).build();
264+
}
265+
266+
@Override
267+
public void close() {
268+
s3Client.close();
269+
}
270+
}

0 commit comments

Comments
 (0)