Skip to content

Commit

Permalink
fix(s3stream): fix empty WAL object (#320)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Oct 17, 2023
1 parent db6a9ec commit 71d33ec
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 16 deletions.
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
}

private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableFuture<Void> cf) {
WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(config, logCacheBlock.records(), objectManager, s3Operator, uploadWALExecutor);
WALObjectUploadTask walObjectUploadTask = WALObjectUploadTask.of(config, logCacheBlock.records(), objectManager, s3Operator, uploadWALExecutor);
WALObjectUploadTaskContext context = new WALObjectUploadTaskContext();
context.task = walObjectUploadTask;
context.cache = logCacheBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -47,7 +48,7 @@ public class WALObjectUploadTask {
private final int streamSplitSizeThreshold;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final boolean forceSplit;
final boolean forceSplit;
private final boolean s3ObjectLogEnable;
private final CompletableFuture<Long> prepareCf = new CompletableFuture<>();
private volatile CommitWALObjectRequest commitWALObjectRequest;
Expand All @@ -68,9 +69,20 @@ public WALObjectUploadTask(Config config, Map<Long, List<StreamRecordBatch>> str
this.executor = executor;
}

public WALObjectUploadTask(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor) {
this(config, streamRecordsMap, objectManager, s3Operator, executor, streamRecordsMap.size() == 1);
public static WALObjectUploadTask of(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor) {
boolean forceSplit = streamRecordsMap.size() == 1;
if (!forceSplit) {
Optional<Boolean> hasWALData = streamRecordsMap.values()
.stream()
.map(records -> records.stream().mapToLong(StreamRecordBatch::size).sum() >= config.s3StreamSplitSize())
.filter(split -> !split)
.findAny();
if (hasWALData.isEmpty()) {
forceSplit = true;
}
}
return new WALObjectUploadTask(config, streamRecordsMap, objectManager, s3Operator, executor, forceSplit);
}

public CompletableFuture<Long> prepare() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
DefaultS3Operator(S3AsyncClient s3, String bucket) {
this(s3, bucket, false);
}

// used for test only.
DefaultS3Operator(S3AsyncClient s3, String bucket, boolean manualMergeRead) {
this.s3 = s3;
Expand Down Expand Up @@ -316,9 +317,14 @@ private String range(long start, long end) {

private static boolean isUnrecoverable(Throwable ex) {
ex = cause(ex);
return ex instanceof NoSuchKeyException
|| ex instanceof NoSuchBucketException
|| (ex instanceof S3Exception && ((S3Exception) ex).statusCode() == 403);
if (ex instanceof NoSuchKeyException || ex instanceof NoSuchBucketException) {
return true;
}
if (ex instanceof S3Exception) {
S3Exception s3Ex = (S3Exception) ex;
return s3Ex.statusCode() == 403 || s3Ex.statusCode() == 404;
}
return false;
}

private void checkAvailable() {
Expand Down Expand Up @@ -487,8 +493,13 @@ private void copyWrite0(int partNumber, UploadPartCopyRequest request, Completab
}).exceptionally(ex -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY_FAIL).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY_FAIL).operationTime.update(timerUtil.elapsed());
LOGGER.warn("{} UploadPartCopy for object {}-{} fail, retry later", logIdent, path, partNumber, ex);
scheduler.schedule(() -> copyWrite0(partNumber, request, partCf), 100, TimeUnit.MILLISECONDS);
if (isUnrecoverable(ex)) {
LOGGER.warn("{} UploadPartCopy for object {}-{} fail", logIdent, path, partNumber, ex);
partCf.completeExceptionally(ex);
} else {
LOGGER.warn("{} UploadPartCopy for object {}-{} fail, retry later", logIdent, path, partNumber, ex);
scheduler.schedule(() -> copyWrite0(partNumber, request, partCf), 100, TimeUnit.MILLISECONDS);
}
return null;
});
}
Expand Down Expand Up @@ -670,7 +681,7 @@ void handleReadCompleted(ByteBuf rst, Throwable ex) {
if (ex != null) {
readTasks.forEach(readTask -> readTask.cf.completeExceptionally(ex));
} else {
for (ReadTask readTask: readTasks) {
for (ReadTask readTask : readTasks) {
readTask.cf.complete(rst.retainedSlice((int) (readTask.start - start), (int) (readTask.end - readTask.start)));
}
rst.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package com.automq.stream.s3;

import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.CommitWALObjectRequest;
import com.automq.stream.s3.objects.CommitWALObjectResponse;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.objects.StreamObject;
import com.automq.stream.s3.operator.MemoryS3Operator;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.utils.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -36,11 +36,14 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import static com.automq.stream.s3.TestUtils.random;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -83,7 +86,7 @@ public void testUpload() throws Exception {
.s3ObjectBlockSize(16 * 1024 * 1024)
.s3ObjectPartSize(16 * 1024 * 1024)
.s3StreamSplitSize(1000);
walObjectUploadTask = new WALObjectUploadTask(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());
walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());

walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
Expand Down Expand Up @@ -160,7 +163,7 @@ public void testUpload_oneStream() throws Exception {
.s3ObjectBlockSize(16 * 1024 * 1024)
.s3ObjectPartSize(16 * 1024 * 1024)
.s3StreamSplitSize(16 * 1024 * 1024);
walObjectUploadTask = new WALObjectUploadTask(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());
walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());

walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
Expand All @@ -177,4 +180,26 @@ public void testUpload_oneStream() throws Exception {
assertEquals(0, request.getStreamRanges().size());
assertEquals(1, request.getStreamObjects().size());
}

@Test
public void test_emptyWALData() throws ExecutionException, InterruptedException, TimeoutException {
AtomicLong objectIdAlloc = new AtomicLong(10);
doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong());
when(objectManager.commitWALObject(any())).thenReturn(CompletableFuture.completedFuture(new CommitWALObjectResponse()));

Map<Long, List<StreamRecordBatch>> map = new HashMap<>();
map.put(233L, List.of(
new StreamRecordBatch(233, 0, 10, 2, random(512))
));
map.put(234L, List.of(
new StreamRecordBatch(234, 0, 20, 2, random(128))
));

Config config = new Config()
.s3ObjectBlockSize(16 * 1024 * 1024)
.s3ObjectPartSize(16 * 1024 * 1024)
.s3StreamSplitSize(64);
walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());
assertTrue(walObjectUploadTask.forceSplit);
}
}

0 comments on commit 71d33ec

Please sign in to comment.