Skip to content

Commit e2b4a93

Browse files
committed
Fix math overflow when copying large AWS S3 files
Signed-off-by: Paolo Di Tommaso <[email protected]>
1 parent 66f4669 commit e2b4a93

File tree

7 files changed

+256
-17
lines changed

7 files changed

+256
-17
lines changed

modules/nextflow/src/main/groovy/nextflow/trace/TraceHelper.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class TraceHelper {
6363
Files.newBufferedWriter(path, Charset.defaultCharset(), openOptions(overwrite))
6464
}
6565
catch (FileAlreadyExistsException e) {
66-
throw new AbortOperationException("$type file already exists: ${path.toUriString()} -- enable the relevant `overwrite` option in your config file to overwrite existing files", e)
66+
throw new AbortOperationException("$type file already exists: ${path.toUriString()} -- enable the '${type.toLowerCase()}.overwrite' option in your config file to overwrite existing files", e)
6767
}
6868
}
6969
}

modules/nextflow/src/test/groovy/nextflow/trace/TraceHelperTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class TraceHelperTest extends Specification {
5656
TraceHelper.newFileWriter(path, false, 'Test')
5757
then:
5858
def e = thrown(AbortOperationException)
59-
e.message == "Test file already exists: $path -- enable the relevant `overwrite` option in your config file to overwrite existing files"
59+
e.message == "Test file already exists: $path -- enable the 'test.overwrite' option in your config file to overwrite existing files"
6060

6161
cleanup:
6262
folder?.deleteDir()

plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@
105105
import org.slf4j.Logger;
106106
import org.slf4j.LoggerFactory;
107107

108+
import static com.upplication.s3fs.util.S3UploadHelper.*;
109+
108110
/**
109111
* Client Amazon S3
110112
* @see com.amazonaws.services.s3.AmazonS3Client
@@ -359,10 +361,12 @@ public ObjectListing listNextBatchOfObjects(ObjectListing objectListing) {
359361
return client.listNextBatchOfObjects(objectListing);
360362
}
361363

364+
362365
public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, List<Tag> tags, String contentType ) {
363366

364367
final String sourceBucketName = s3Source.getBucket();
365368
final String sourceObjectKey = s3Source.getKey();
369+
final String sourceS3Path = "s3://"+sourceBucketName+'/'+sourceObjectKey;
366370
final String targetBucketName = s3Target.getBucket();
367371
final String targetObjectKey = s3Target.getKey();
368372
final ObjectMetadata meta = new ObjectMetadata();
@@ -394,15 +398,25 @@ public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSiz
394398
// Step 3: Save upload Id.
395399
String uploadId = initResult.getUploadId();
396400

397-
final int partSize = opts.getChunkSize(objectSize);
401+
// Multipart upload and copy allows max 10_000 parts
402+
// each part can be up to 5 GB
403+
// Max file size is 5 TB
404+
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
405+
final int defChunkSize = opts.getChunkSize();
406+
final long partSize = computePartSize(objectSize, defChunkSize);
398407
ExecutorService executor = S3OutputStream.getOrCreateExecutor(opts.getMaxThreads());
399408
List<Callable<CopyPartResult>> copyPartRequests = new ArrayList<>();
409+
checkPartSize(partSize);
400410

401411
// Step 4. create copy part requests
402412
long bytePosition = 0;
403413
for (int i = 1; bytePosition < objectSize; i++)
404414
{
405-
long lastPosition = bytePosition + partSize -1 >= objectSize ? objectSize - 1 : bytePosition + partSize - 1;
415+
checkPartIndex(i, sourceS3Path, objectSize, partSize);
416+
417+
long lastPosition = bytePosition + partSize -1;
418+
if( lastPosition >= objectSize )
419+
lastPosition = objectSize - 1;
406420

407421
CopyPartRequest copyRequest = new CopyPartRequest()
408422
.withDestinationBucketName(targetBucketName)

plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3MultipartOptions.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class S3MultipartOptions {
3030

3131
private static final Logger log = LoggerFactory.getLogger(S3MultipartOptions.class);
3232

33-
public static final int DEFAULT_CHUNK_SIZE = 100 << 20; // 100 MB
33+
public static final int DEFAULT_CHUNK_SIZE = 100 << 20; // 100 MiB
3434

3535
public static final int DEFAULT_BUFFER_SIZE = 10485760;
3636

@@ -71,7 +71,7 @@ public class S3MultipartOptions {
7171
private long retrySleep;
7272

7373

74-
/**
74+
/*
7575
* initialize default values
7676
*/
7777
{
@@ -100,16 +100,6 @@ public int getChunkSize() {
100100
return chunkSize;
101101
}
102102

103-
public int getChunkSize( long objectSize ) {
104-
final int MAX_PARTS = 10_000;
105-
long numOfParts = objectSize / chunkSize;
106-
if( numOfParts > MAX_PARTS ) {
107-
chunkSize = (int) objectSize / MAX_PARTS;
108-
}
109-
110-
return chunkSize;
111-
}
112-
113103
public int getMaxThreads() {
114104
return maxThreads;
115105
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2020-2022, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.upplication.s3fs.util;
19+
20+
/**
21+
*
22+
* @author Paolo Di Tommaso <[email protected]>
23+
*/
24+
public class S3UploadHelper {
25+
26+
private static final long _1_KiB = 1024;
27+
private static final long _1_MiB = _1_KiB * _1_KiB;
28+
private static final long _1_GiB = _1_KiB * _1_KiB * _1_KiB;
29+
private static final long _1_TiB = _1_KiB * _1_KiB * _1_KiB * _1_KiB;
30+
31+
/**
32+
* AWS S3 max part size
33+
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
34+
*/
35+
public static final long MIN_PART_SIZE = 5 * _1_MiB;
36+
37+
/**
38+
* AWS S3 min part size
39+
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
40+
*/
41+
public static final long MAX_PART_SIZE = 5 * _1_GiB;
42+
43+
/**
44+
* AWS S3 max object size
45+
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
46+
*/
47+
public static final long MAX_OBJECT_SIZE = 5 * _1_TiB;
48+
49+
/**
50+
* AWS S3 max parts in multi-part upload and copy request
51+
*/
52+
public static final int MAX_PARTS_COUNT = 10_000;
53+
54+
static public long computePartSize( long objectSize, long chunkSize ) {
55+
if( objectSize<0 ) throw new IllegalArgumentException("Argument 'objectSize' cannot be less than zero");
56+
if( chunkSize<MIN_PART_SIZE ) throw new IllegalArgumentException("Argument 'chunkSize' cannot be less than " + MIN_PART_SIZE);
57+
// Multipart upload and copy allows max 10_000 parts
58+
// each part can be up to 5 GB
59+
// Max file size is 5 TB
60+
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
61+
long numOfParts = objectSize / chunkSize;
62+
if( numOfParts > MAX_PARTS_COUNT) {
63+
final long x = ceilDiv(objectSize, MAX_PARTS_COUNT);
64+
return ceilDiv(x, 10* _1_MiB) *10* _1_MiB;
65+
}
66+
return chunkSize;
67+
}
68+
69+
70+
private static long ceilDiv(long x, long y){
71+
return -Math.floorDiv(-x,y);
72+
}
73+
74+
private static long ceilDiv(long x, int y){
75+
return -Math.floorDiv(-x,y);
76+
}
77+
78+
static public void checkPartSize(long partSize) {
79+
if( partSize<MIN_PART_SIZE ) {
80+
String msg = String.format("The minimum part size for S3 multipart copy and upload operation cannot be less than 5 MiB -- offending value: %d", partSize);
81+
throw new IllegalArgumentException(msg);
82+
}
83+
84+
if( partSize>MAX_PART_SIZE ) {
85+
String msg = String.format("The minimum part size for S3 multipart copy and upload operation cannot be less than 5 GiB -- offending value: %d", partSize);
86+
throw new IllegalArgumentException(msg);
87+
}
88+
}
89+
90+
static public void checkPartIndex(int i, String path, long fileSize, long chunkSize) {
91+
if( i < 1 ) {
92+
String msg = String.format("S3 multipart copy request index cannot less than 1 -- offending value: %d; file: '%s'; size: %d; part-size: %d", i, path, fileSize, chunkSize);
93+
throw new IllegalArgumentException(msg);
94+
}
95+
if( i > MAX_PARTS_COUNT) {
96+
String msg = String.format("S3 multipart copy request exceed the number of max allowed parts -- offending value: %d; file: '%s'; size: %d; part-size: %d", i, path, fileSize, chunkSize);
97+
throw new IllegalArgumentException(msg);
98+
}
99+
}
100+
101+
}

plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import java.nio.file.attribute.BasicFileAttributes
1717
import com.amazonaws.services.s3.AmazonS3
1818
import com.amazonaws.services.s3.model.Tag
1919
import groovy.util.logging.Slf4j
20+
import nextflow.Global
21+
import nextflow.Session
2022
import nextflow.exception.AbortOperationException
2123
import nextflow.file.CopyMoveHelper
2224
import nextflow.file.FileHelper
@@ -51,6 +53,10 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec {
5153
s3Client0 = fs.client.getClient()
5254
}
5355

56+
def setup() {
57+
Global.session = Mock(Session) { getConfig() >> [:] }
58+
}
59+
5460
def 'should create a blob' () {
5561
given:
5662
def bucket = createBucket()
@@ -1332,7 +1338,7 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec {
13321338
TraceHelper.newFileWriter(path, false, 'Test')
13331339
then:
13341340
def e = thrown(AbortOperationException)
1335-
e.message == "Test file already exists: ${path.toUriString()}"
1341+
e.message == "Test file already exists: ${path.toUriString()} -- enable the 'test.overwrite' option in your config file to overwrite existing files"
13361342

13371343
cleanup:
13381344
deleteBucket(bucket1)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright 2020-2022, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.upplication.s3fs.util
19+
20+
import com.amazonaws.services.s3.AmazonS3
21+
import com.upplication.s3fs.AmazonS3Client
22+
import spock.lang.Shared
23+
import spock.lang.Specification
24+
import spock.lang.Unroll
25+
/**
26+
*
27+
* @author Paolo Di Tommaso <[email protected]>
28+
*/
29+
class S3UploadHelperTest extends Specification {
30+
31+
@Shared final long _1_KiB = 1_024
32+
@Shared final long _1_MiB = _1_KiB **2
33+
@Shared final long _1_GiB = _1_KiB **3
34+
@Shared final long _1_TiB = _1_KiB **4
35+
36+
@Shared final long _10_MiB = _1_MiB * 10
37+
@Shared final long _100_MiB = _1_MiB * 100
38+
39+
@Unroll
40+
def 'should compute s3 file chunk size' () {
41+
42+
expect:
43+
S3UploadHelper.computePartSize(FILE_SIZE, CHUNK_SIZE) == EXPECTED_CHUNK_SIZE
44+
and:
45+
def parts = FILE_SIZE / EXPECTED_CHUNK_SIZE
46+
parts <= S3UploadHelper.MAX_PARTS_COUNT
47+
parts > 0
48+
49+
where:
50+
FILE_SIZE | EXPECTED_CHUNK_SIZE | CHUNK_SIZE
51+
_1_KiB | _10_MiB | _10_MiB
52+
_1_MiB | _10_MiB | _10_MiB
53+
_1_GiB | _10_MiB | _10_MiB
54+
_1_TiB | 110 * _1_MiB | _10_MiB
55+
5 * _1_TiB | 530 * _1_MiB | _10_MiB
56+
10 * _1_TiB | 1050 * _1_MiB | _10_MiB
57+
and:
58+
_1_KiB | _100_MiB | _100_MiB
59+
_1_MiB | _100_MiB | _100_MiB
60+
_1_GiB | _100_MiB | _100_MiB
61+
_1_TiB | 110 * _1_MiB | _100_MiB
62+
5 * _1_TiB | 530 * _1_MiB | _100_MiB
63+
10 * _1_TiB | 1050 * _1_MiB | _100_MiB
64+
65+
}
66+
67+
68+
def 'should check s3 part size' () {
69+
when:
70+
S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE)
71+
then:
72+
noExceptionThrown()
73+
74+
when:
75+
S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE+1)
76+
then:
77+
noExceptionThrown()
78+
79+
when:
80+
S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE-1)
81+
then:
82+
noExceptionThrown()
83+
84+
when:
85+
S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE)
86+
then:
87+
noExceptionThrown()
88+
89+
when:
90+
S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE+1)
91+
then:
92+
thrown(IllegalArgumentException)
93+
94+
when:
95+
S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE-1)
96+
then:
97+
thrown(IllegalArgumentException)
98+
}
99+
100+
def 'should check part index' () {
101+
given:
102+
def client = new AmazonS3Client(Mock(AmazonS3))
103+
104+
when:
105+
client.checkPartIndex(1, 's3://foo', 1000, 100)
106+
then:
107+
noExceptionThrown()
108+
109+
when:
110+
client.checkPartIndex(S3MultipartOptions.MAX_PARTS_COUNT, 's3://foo', 1000, 100)
111+
then:
112+
noExceptionThrown()
113+
114+
when:
115+
client.checkPartIndex(S3MultipartOptions.MAX_PARTS_COUNT+1, 's3://foo', 1000, 100)
116+
then:
117+
def e1 = thrown(IllegalArgumentException)
118+
e1.message == "S3 multipart copy request exceed the number of max allowed parts -- offending value: 10001; file: 's3://foo'; size: 1000; part-size: 100"
119+
120+
when:
121+
client.checkPartIndex(0, 's3://foo', 1000, 100)
122+
then:
123+
def e2 = thrown(IllegalArgumentException)
124+
e2.message == "S3 multipart copy request index cannot less than 1 -- offending value: 0; file: 's3://foo'; size: 1000; part-size: 100"
125+
126+
127+
}
128+
}

0 commit comments

Comments
 (0)