diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index dbcaf9f6d..a884fbd52 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -343,7 +343,7 @@ CompletableFuture uploadWALObject(LogCache.LogCacheBlock logCacheBlock) { } private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableFuture cf) { - WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(config, logCacheBlock.records(), objectManager, s3Operator, uploadWALExecutor); + WALObjectUploadTask walObjectUploadTask = WALObjectUploadTask.of(config, logCacheBlock.records(), objectManager, s3Operator, uploadWALExecutor); WALObjectUploadTaskContext context = new WALObjectUploadTaskContext(); context.task = walObjectUploadTask; context.cache = logCacheBlock; diff --git a/s3stream/src/main/java/com/automq/stream/s3/WALObjectUploadTask.java b/s3stream/src/main/java/com/automq/stream/s3/WALObjectUploadTask.java index b214d3faa..26716d159 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/WALObjectUploadTask.java +++ b/s3stream/src/main/java/com/automq/stream/s3/WALObjectUploadTask.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -47,7 +48,7 @@ public class WALObjectUploadTask { private final int streamSplitSizeThreshold; private final ObjectManager objectManager; private final S3Operator s3Operator; - private final boolean forceSplit; + final boolean forceSplit; private final boolean s3ObjectLogEnable; private final CompletableFuture prepareCf = new CompletableFuture<>(); private volatile CommitWALObjectRequest commitWALObjectRequest; @@ -68,9 +69,20 @@ public WALObjectUploadTask(Config config, Map> str this.executor = executor; } - public WALObjectUploadTask(Config config, Map> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator, - ExecutorService executor) { - this(config, streamRecordsMap, objectManager, s3Operator, executor, streamRecordsMap.size() == 1); + public static WALObjectUploadTask of(Config config, Map> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator, + ExecutorService executor) { + boolean forceSplit = streamRecordsMap.size() == 1; + if (!forceSplit) { + Optional hasWALData = streamRecordsMap.values() + .stream() + .map(records -> records.stream().mapToLong(StreamRecordBatch::size).sum() >= config.s3StreamSplitSize()) + .filter(split -> !split) + .findAny(); + if (hasWALData.isEmpty()) { + forceSplit = true; + } + } + return new WALObjectUploadTask(config, streamRecordsMap, objectManager, s3Operator, executor, forceSplit); } public CompletableFuture prepare() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 8824f6208..4848eaf75 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -114,6 +114,7 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean DefaultS3Operator(S3AsyncClient s3, String bucket) { this(s3, bucket, false); } + // used for test only. DefaultS3Operator(S3AsyncClient s3, String bucket, boolean manualMergeRead) { this.s3 = s3; @@ -316,9 +317,14 @@ private String range(long start, long end) { private static boolean isUnrecoverable(Throwable ex) { ex = cause(ex); - return ex instanceof NoSuchKeyException - || ex instanceof NoSuchBucketException - || (ex instanceof S3Exception && ((S3Exception) ex).statusCode() == 403); + if (ex instanceof NoSuchKeyException || ex instanceof NoSuchBucketException) { + return true; + } + if (ex instanceof S3Exception) { + S3Exception s3Ex = (S3Exception) ex; + return s3Ex.statusCode() == 403 || s3Ex.statusCode() == 404; + } + return false; } private void checkAvailable() { @@ -487,8 +493,13 @@ private void copyWrite0(int partNumber, UploadPartCopyRequest request, Completab }).exceptionally(ex -> { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY_FAIL).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY_FAIL).operationTime.update(timerUtil.elapsed()); - LOGGER.warn("{} UploadPartCopy for object {}-{} fail, retry later", logIdent, path, partNumber, ex); - scheduler.schedule(() -> copyWrite0(partNumber, request, partCf), 100, TimeUnit.MILLISECONDS); + if (isUnrecoverable(ex)) { + LOGGER.warn("{} UploadPartCopy for object {}-{} fail", logIdent, path, partNumber, ex); + partCf.completeExceptionally(ex); + } else { + LOGGER.warn("{} UploadPartCopy for object {}-{} fail, retry later", logIdent, path, partNumber, ex); + scheduler.schedule(() -> copyWrite0(partNumber, request, partCf), 100, TimeUnit.MILLISECONDS); + } return null; }); } @@ -670,7 +681,7 @@ void handleReadCompleted(ByteBuf rst, Throwable ex) { if (ex != null) { readTasks.forEach(readTask -> readTask.cf.completeExceptionally(ex)); } else { - for (ReadTask readTask: readTasks) { + for (ReadTask readTask : readTasks) { readTask.cf.complete(rst.retainedSlice((int) (readTask.start - start), (int) (readTask.end - readTask.start))); } rst.release(); diff --git a/s3stream/src/test/java/com/automq/stream/s3/WALObjectUploadTaskTest.java b/s3stream/src/test/java/com/automq/stream/s3/WALObjectUploadTaskTest.java index fc41ea3a2..e4f053ffa 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/WALObjectUploadTaskTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/WALObjectUploadTaskTest.java @@ -17,7 +17,8 @@ package com.automq.stream.s3; -import com.automq.stream.utils.CloseableIterator; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.s3.objects.CommitWALObjectRequest; import com.automq.stream.s3.objects.CommitWALObjectResponse; @@ -25,8 +26,7 @@ import com.automq.stream.s3.objects.StreamObject; import com.automq.stream.s3.operator.MemoryS3Operator; import com.automq.stream.s3.operator.S3Operator; -import com.automq.stream.s3.metadata.S3ObjectMetadata; -import com.automq.stream.s3.metadata.S3ObjectType; +import com.automq.stream.utils.CloseableIterator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -36,11 +36,14 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import static com.automq.stream.s3.TestUtils.random; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -83,7 +86,7 @@ public void testUpload() throws Exception { .s3ObjectBlockSize(16 * 1024 * 1024) .s3ObjectPartSize(16 * 1024 * 1024) .s3StreamSplitSize(1000); - walObjectUploadTask = new WALObjectUploadTask(config, map, objectManager, s3Operator, ForkJoinPool.commonPool()); + walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool()); walObjectUploadTask.prepare().get(); walObjectUploadTask.upload().get(); @@ -160,7 +163,7 @@ public void testUpload_oneStream() throws Exception { .s3ObjectBlockSize(16 * 1024 * 1024) .s3ObjectPartSize(16 * 1024 * 1024) .s3StreamSplitSize(16 * 1024 * 1024); - walObjectUploadTask = new WALObjectUploadTask(config, map, objectManager, s3Operator, ForkJoinPool.commonPool()); + walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool()); walObjectUploadTask.prepare().get(); walObjectUploadTask.upload().get(); @@ -177,4 +180,26 @@ public void testUpload_oneStream() throws Exception { assertEquals(0, request.getStreamRanges().size()); assertEquals(1, request.getStreamObjects().size()); } + + @Test + public void test_emptyWALData() throws ExecutionException, InterruptedException, TimeoutException { + AtomicLong objectIdAlloc = new AtomicLong(10); + doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong()); + when(objectManager.commitWALObject(any())).thenReturn(CompletableFuture.completedFuture(new CommitWALObjectResponse())); + + Map> map = new HashMap<>(); + map.put(233L, List.of( + new StreamRecordBatch(233, 0, 10, 2, random(512)) + )); + map.put(234L, List.of( + new StreamRecordBatch(234, 0, 20, 2, random(128)) + )); + + Config config = new Config() + .s3ObjectBlockSize(16 * 1024 * 1024) + .s3ObjectPartSize(16 * 1024 * 1024) + .s3StreamSplitSize(64); + walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool()); + assertTrue(walObjectUploadTask.forceSplit); + } }