Skip to content

Commit

Permalink
feat(s3stream): use put object when data size is less than 32MB (#431)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Oct 23, 2023
1 parent 0f0cc4a commit 0a0f1ec
Show file tree
Hide file tree
Showing 13 changed files with 654 additions and 330 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.14-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.15-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.14-SNAPSHOT</version>
<version>0.1.15-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,12 @@ class DefaultObjectWriter implements ObjectWriter {
*/
public DefaultObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
this.objectId = objectId;
// TODO: use a better clusterName
String objectKey = ObjectUtils.genKey(0, objectId);
this.blockSizeThreshold = blockSizeThreshold;
this.partSizeThreshold = Math.max(Writer.MIN_PART_SIZE, partSizeThreshold);
waitingUploadBlocks = new LinkedList<>();
completedBlocks = new LinkedList<>();
writer = s3Operator.writer(objectKey, "[DefaultObjectWriter objId=" + objectId + "]");
writer = s3Operator.writer(objectKey);
}

public void write(long streamId, List<StreamRecordBatch> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class StreamObjectCopier {
public StreamObjectCopier(long objectId, S3Operator s3Operator, AsyncTokenBucketThrottle readThrottle) {
this.s3Operator = s3Operator;
// TODO: use a better clusterName
this.writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), "[StreamObjectCopier objId=" + objectId + "]", readThrottle);
this.writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), readThrottle);
this.completedObjects = new LinkedList<>();
this.nextObjectDataStartPosition = 0;
this.blockCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public DataBlockWriter(long objectId, S3Operator s3Operator, int partSizeThresho
waitingUploadBlocks = new LinkedList<>();
waitingUploadBlockCfs = new ConcurrentHashMap<>();
completedBlocks = new LinkedList<>();
writer = s3Operator.writer(objectKey, "[DataBlockWriter objId=" + objectId + "]");
writer = s3Operator.writer(objectKey);
}

public long getObjectId() {
Expand Down Expand Up @@ -104,7 +104,7 @@ private void uploadWaitingList() {
waitingUploadBlockCfs.remove(block);
}
});
if (writer.hashBatchingPart()) {
if (writer.hasBatchingPart()) {
// prevent blocking on part that's waiting for batch when force upload waiting list
for (StreamDataBlock block : blocks) {
waitingUploadBlockCfs.remove(block);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.stream.utils.FutureUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import software.amazon.awssdk.services.s3.model.CompletedPart;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -52,7 +53,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data) {
}

@Override
public Writer writer(String path, String logIdent, AsyncTokenBucketThrottle readThrottle) {
public Writer writer(String path, AsyncTokenBucketThrottle readThrottle) {
ByteBuf buf = Unpooled.buffer();
storage.put(path, buf);
return new Writer() {
Expand All @@ -66,7 +67,7 @@ public CompletableFuture<Void> write(ByteBuf part) {
}

@Override
public boolean hashBatchingPart() {
public boolean hasBatchingPart() {
return false;
}

Expand Down Expand Up @@ -97,4 +98,24 @@ public CompletableFuture<List<String>> delete(List<String> objectKeys) {
objectKeys.forEach(storage::remove);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<String> createMultipartUpload(String path) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<CompletedPart> uploadPart(String path, String uploadId, int partNumber, ByteBuf data) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<CompletedPart> uploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, int partNumber) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

@Override
public CompletableFuture<Void> completeMultipartUpload(String path, String uploadId, List<CompletedPart> parts) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.operator;

import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.compact.AsyncTokenBucketThrottle;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import com.automq.stream.s3.metrics.stats.S3ObjectMetricsStats;
import com.automq.stream.utils.FutureUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import software.amazon.awssdk.services.s3.model.CompletedPart;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class MultiPartWriter implements Writer {
private static final long MAX_MERGE_WRITE_SIZE = 16L * 1024 * 1024;
private final S3Operator operator;
private final String path;
private final CompletableFuture<String> uploadIdCf = new CompletableFuture<>();
private String uploadId;
private final List<CompletableFuture<CompletedPart>> parts = new LinkedList<>();
private final AtomicInteger nextPartNumber = new AtomicInteger(1);
private CompletableFuture<Void> closeCf;
/**
* The minPartSize represents the minimum size of a part for a multipart object.
*/
private final long minPartSize;
private ObjectPart objectPart = null;
private final TimerUtil timerUtil = new TimerUtil();
private final AsyncTokenBucketThrottle readThrottle;
private final AtomicLong totalWriteSize = new AtomicLong(0L);

public MultiPartWriter(S3Operator operator, String path, long minPartSize, AsyncTokenBucketThrottle readThrottle) {
this.operator = operator;
this.path = path;
this.minPartSize = minPartSize;
this.readThrottle = readThrottle;
init();
}

private void init() {
if (readThrottle != null && readThrottle.getTokenSize() < minPartSize) {
throw new IllegalArgumentException("Read throttle token size should be larger than minPartSize");
}
FutureUtil.propagate(
operator.createMultipartUpload(path).thenApply(uploadId -> {
this.uploadId = uploadId;
return uploadId;
}),
uploadIdCf
);
}

@Override
public CompletableFuture<Void> write(ByteBuf data) {
totalWriteSize.addAndGet(data.readableBytes());

if (objectPart == null) {
objectPart = new ObjectPart(readThrottle);
}
ObjectPart objectPart = this.objectPart;

objectPart.write(data);
if (objectPart.size() > minPartSize) {
objectPart.upload();
// finish current part.
this.objectPart = null;
}
return objectPart.getFuture();
}

@Override
public boolean hasBatchingPart() {
return objectPart != null;
}

@Override
public void copyWrite(String sourcePath, long start, long end) {
long targetSize = end - start;
if (objectPart == null) {
if (targetSize < minPartSize) {
this.objectPart = new ObjectPart(readThrottle);
objectPart.readAndWrite(sourcePath, start, end);
} else {
new CopyObjectPart(sourcePath, start, end);
}
} else {
if (objectPart.size() + targetSize > MAX_MERGE_WRITE_SIZE) {
long readAndWriteCopyEnd = start + minPartSize - objectPart.size();
objectPart.readAndWrite(sourcePath, start, readAndWriteCopyEnd);
objectPart.upload();
this.objectPart = null;
new CopyObjectPart(sourcePath, readAndWriteCopyEnd, end);
} else {
objectPart.readAndWrite(sourcePath, start, end);
if (objectPart.size() > minPartSize) {
objectPart.upload();
this.objectPart = null;
}
}
}
}

@Override
public CompletableFuture<Void> close() {
if (closeCf != null) {
return closeCf;
}

if (objectPart != null) {
// force upload the last part which can be smaller than minPartSize.
objectPart.upload();
objectPart = null;
}

S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.READY_CLOSE).update(timerUtil.elapsed());
closeCf = new CompletableFuture<>();
CompletableFuture<Void> uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0])));
FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf);
closeCf.whenComplete((nil, ex) -> {
S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.TOTAL).update(timerUtil.elapsed());
S3ObjectMetricsStats.S3_OBJECT_COUNT.inc();
S3ObjectMetricsStats.S3_OBJECT_SIZE.update(totalWriteSize.get());
});
return closeCf;
}

private List<CompletedPart> genCompleteParts() {
return this.parts.stream().map(cf -> {
try {
return cf.get();
} catch (Throwable e) {
// won't happen.
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}

class ObjectPart {
private final int partNumber = nextPartNumber.getAndIncrement();
private final CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer();
private CompletableFuture<Void> lastRangeReadCf = CompletableFuture.completedFuture(null);
private final CompletableFuture<CompletedPart> partCf = new CompletableFuture<>();
private long size;
private final AsyncTokenBucketThrottle readThrottle;

public ObjectPart(AsyncTokenBucketThrottle readThrottle) {
this.readThrottle = readThrottle;
parts.add(partCf);
}

public void write(ByteBuf data) {
size += data.readableBytes();
// ensure addComponent happen before following write or copyWrite.
this.lastRangeReadCf = lastRangeReadCf.thenAccept(nil -> partBuf.addComponent(true, data));
}

public void readAndWrite(String sourcePath, long start, long end) {
size += end - start;
// TODO: parallel read and sequence add.
this.lastRangeReadCf = lastRangeReadCf
.thenCompose(nil -> readThrottle == null ?
CompletableFuture.completedFuture(null) : readThrottle.throttle(end - start))
.thenCompose(nil -> operator.rangeRead(sourcePath, start, end))
.thenAccept(buf -> partBuf.addComponent(true, buf));
}

public void upload() {
this.lastRangeReadCf.whenComplete((nil, ex) -> {
if (ex != null) {
partCf.completeExceptionally(ex);
} else {
upload0();
}
});
}

private void upload0() {
TimerUtil timerUtil = new TimerUtil();
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf)), partCf);
partCf.whenComplete((nil, ex) -> S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.UPLOAD_PART).update(timerUtil.elapsed()));
}

public long size() {
return size;
}

public CompletableFuture<Void> getFuture() {
return partCf.thenApply(nil -> null);
}
}

class CopyObjectPart {
private final CompletableFuture<CompletedPart> partCf = new CompletableFuture<>();

public CopyObjectPart(String sourcePath, long start, long end) {
int partNumber = nextPartNumber.getAndIncrement();
parts.add(partCf);
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPartCopy(sourcePath, path, start, end, uploadId, partNumber)), partCf);
}

public CompletableFuture<Void> getFuture() {
return partCf.thenApply(nil -> null);
}
}
}
Loading

0 comments on commit 0a0f1ec

Please sign in to comment.