Skip to content

Add delete payload batch feature. #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>software.amazon.payloadoffloading</groupId>
<artifactId>payloadoffloading-common</artifactId>
<version>2.2.0</version>
<version>2.2.1</version>
<packaging>jar</packaging>
<name>Payload offloading common library for AWS</name>
<description>Common library between extended Amazon AWS clients to save payloads up to 2GB on Amazon S3.</description>
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/software/amazon/payloadoffloading/PayloadStore.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package software.amazon.payloadoffloading;

import java.util.Collection;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.S3Exception;

Expand Down Expand Up @@ -60,4 +61,20 @@ public interface PayloadStore {
* a server side issue.
*/
void deleteOriginalPayload(String payloadPointer);

/**
* Deletes original payloads using the given payloadPointers. The pointers must
* have been obtained using {@link storeOriginalPayload}
* <p>
* This call will be more efficient than deleting payloads one at a time if the payloads
* are in the same S3 bucket.
*
* @param payloadPointers
* @throws SdkClientException If any internal errors are encountered on the client side while
* attempting to make the request or handle the response to/from PayloadStore.
* For example, if payloadPointer is invalid or a network connection is not available.
* @throws S3Exception If an error response is returned by actual PayloadStore indicating
* a server side issue.
*/
void deleteOriginalPayloads(Collection<String> payloadPointers);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package software.amazon.payloadoffloading;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.S3Exception;
Expand Down Expand Up @@ -75,4 +76,24 @@ public interface PayloadStoreAsync {
* a server side issue.
*/
CompletableFuture<Void> deleteOriginalPayload(String payloadPointer);

/**
* Deletes the original payload using the given payloadPointer. The pointer must
* have been obtained using {@link #storeOriginalPayload(String)}
* <p>
* This call will be more efficient than deleting payloads one at a time if the payloads
* are in the same S3 bucket.
* <p>
* This call is asynchronous, and so documented return values and exceptions are propagated through
* the returned {@link CompletableFuture}.
*
* @param payloadPointers
* @return future value that completes when the delete operation finishes
* @throws SdkClientException If any internal errors are encountered on the client side while
* attempting to make the request or handle the response to/from PayloadStore.
* For example, if payloadPointer is invalid or a network connection is not available.
* @throws S3Exception If an error response is returned by actual PayloadStore indicating
* a server side issue.
*/
CompletableFuture<Void> deleteOriginalPayloads(Collection<String> payloadPointers);
}
34 changes: 34 additions & 0 deletions src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package software.amazon.payloadoffloading;

import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseBytes;
Expand All @@ -11,9 +13,12 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

/**
Expand Down Expand Up @@ -115,4 +120,33 @@ public CompletableFuture<Void> deletePayloadFromS3(String s3BucketName, String s
return null;
});
}

public CompletableFuture<Void> deletePayloadsFromS3(String s3BucketName, Collection<String> s3Keys) {
DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder()
.bucket(s3BucketName)
.delete(Delete.builder()
.objects(s3Keys.stream()
.map(s3Key -> ObjectIdentifier.builder()
.key(s3Key)
.build())
.collect(Collectors.toList()))
.build())
.build();

return s3Client.deleteObjects(deleteObjectsRequest)
.handle((v, tIn) -> {
if (tIn != null) {
Throwable t = Util.unwrapFutureException(tIn);
if (t instanceof SdkException) {
String errorMessage = "Failed to delete the S3 object which contains the payload";
LOG.error(errorMessage, t);
throw SdkException.create(errorMessage, t);
}
throw new CompletionException(t);
}

LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object keys: " + s3Keys + ".");
return null;
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package software.amazon.payloadoffloading;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,4 +60,20 @@ public void deleteOriginalPayload(String payloadPointer) {
String s3Key = s3Pointer.getS3Key();
s3Dao.deletePayloadFromS3(s3BucketName, s3Key);
}

@Override
public void deleteOriginalPayloads(Collection<String> payloadPointers) {
// Sort by S3 bucket.
Map<String, List<PayloadS3Pointer>> offloadedMessages = payloadPointers.stream()
.map(PayloadS3Pointer::fromJson)
.collect(Collectors.groupingBy(PayloadS3Pointer::getS3BucketName));

for (Map.Entry<String, List<PayloadS3Pointer>> bucket : offloadedMessages.entrySet()) {
String s3BucketName = bucket.getKey();
List<String> s3Keys = bucket.getValue().stream()
.map(PayloadS3Pointer::getS3Key)
.collect(Collectors.toList());
s3Dao.deletePayloadsFromS3(s3BucketName, s3Keys);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package software.amazon.payloadoffloading;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.payloadoffloading.PayloadS3Pointer;

/**
* S3 based implementation for PayloadStoreAsync.
Expand Down Expand Up @@ -74,4 +78,34 @@ public CompletableFuture<Void> deleteOriginalPayload(String payloadPointer) {
return futureEx;
}
}

@Override
public CompletableFuture<Void> deleteOriginalPayloads(Collection<String> payloadPointers) {
// Skip the delete if there are no payloads to delete.
if (payloadPointers.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

try {
// Sort by S3 bucket.
Map<String, List<PayloadS3Pointer>> offloadedMessages = payloadPointers.stream()
.map(PayloadS3Pointer::fromJson)
.collect(Collectors.groupingBy(PayloadS3Pointer::getS3BucketName));

List<CompletableFuture<Void>> deleteFutures = new ArrayList<>(offloadedMessages.size());
for (Map.Entry<String, List<PayloadS3Pointer>> bucket : offloadedMessages.entrySet()) {
String s3BucketName = bucket.getKey();
List<String> s3Keys = bucket.getValue().stream()
.map(PayloadS3Pointer::getS3Key)
.collect(Collectors.toList());
deleteFutures.add(s3Dao.deletePayloadsFromS3(s3BucketName, s3Keys));
}

return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0]));
} catch (Exception e) {
CompletableFuture<Void> futureEx = new CompletableFuture<>();
futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
return futureEx;
}
}
}
27 changes: 27 additions & 0 deletions src/main/java/software/amazon/payloadoffloading/S3Dao.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package software.amazon.payloadoffloading;

import java.util.Collection;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.IoUtils;

Expand Down Expand Up @@ -104,4 +109,26 @@ public void deletePayloadFromS3(String s3BucketName, String s3Key) {

LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
}

public void deletePayloadsFromS3(String s3BucketName, Collection<String> s3Keys) {
try {
DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder()
.bucket(s3BucketName)
.delete(Delete.builder()
.objects(s3Keys.stream()
.map(s3Key -> ObjectIdentifier.builder()
.key(s3Key)
.build())
.collect(Collectors.toList()))
.build())
.build();
s3Client.deleteObjects(deleteObjectsRequest);
} catch (SdkException e) {
String errorMessage = "Failed to delete the S3 object which contains the payload";
LOG.error(errorMessage, e);
throw SdkException.create(errorMessage, e);
}

LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object keys: " + s3Keys + ".");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -22,8 +27,10 @@

public class S3BackedPayloadStoreAsyncTest {
private static final String S3_BUCKET_NAME = "test-bucket-name";
private static final String OTHER_S3_BUCKET_NAME = "other-bucket-name";
private static final String ANY_PAYLOAD = "AnyPayload";
private static final String ANY_S3_KEY = "AnyS3key";
private static final String ANY_OTHER_S3_KEY = "AnyOtherS3key";
private static final String INCORRECT_POINTER_EXCEPTION_MSG = "Failed to read the S3 object pointer from given string";
private PayloadStoreAsync payloadStore;
private S3AsyncDao s3AsyncDao;
Expand Down Expand Up @@ -175,4 +182,70 @@ public void testDeleteOriginalPayloadIncorrectPointer() {
verifyNoInteractions(s3AsyncDao);
}

@Test
public void testDeleteOriginalPayloadsOnSuccess() {
when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));

List<String> payloadPointers = new ArrayList<>();
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
payloadStore.deleteOriginalPayloads(payloadPointers).join();

ArgumentCaptor<String> bucketNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Collection> keyCaptor = ArgumentCaptor.forClass(Collection.class);
verify(s3AsyncDao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());

assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getValue());
assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
}

@Test
public void testDeleteOriginalPayloadsSameBucket() {
when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));

List<String> payloadPointers = new ArrayList<>();
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson());
payloadStore.deleteOriginalPayloads(payloadPointers).join();

ArgumentCaptor<String> bucketNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Collection> keyCaptor = ArgumentCaptor.forClass(Collection.class);
verify(s3AsyncDao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());

assertEquals(Arrays.asList(ANY_S3_KEY, ANY_OTHER_S3_KEY), keyCaptor.getValue());
assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
}

@Test
public void testDeleteOriginalPayloadsDifferentBuckets() {
when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));

List<String> payloadPointers = new ArrayList<>();
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
payloadPointers.add(new PayloadS3Pointer(OTHER_S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson());
payloadStore.deleteOriginalPayloads(payloadPointers).join();

ArgumentCaptor<String> bucketNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Collection> keyCaptor = ArgumentCaptor.forClass(Collection.class);
verify(s3AsyncDao, times(2)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());

assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getAllValues().get(0));
assertEquals(Collections.singletonList(ANY_OTHER_S3_KEY), keyCaptor.getAllValues().get(1));
assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(0));
assertEquals(OTHER_S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(1));
}

@Test
public void testDeleteOriginalPayloadsIncorrectPointer() {
List<String> payloadPointers = new ArrayList<>();
payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
payloadPointers.add("IncorrectPointer");

CompletionException exception = assertThrows(CompletionException.class, () -> {
payloadStore.deleteOriginalPayloads(payloadPointers).join();
});

assertTrue(exception.getMessage().contains(INCORRECT_POINTER_EXCEPTION_MSG));
verifyNoInteractions(s3AsyncDao);
}

}
Loading