Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream): fix empty WAL object #320

Merged
merged 4 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
}

private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableFuture<Void> cf) {
WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(config, logCacheBlock.records(), objectManager, s3Operator, uploadWALExecutor);
WALObjectUploadTask walObjectUploadTask = WALObjectUploadTask.of(config, logCacheBlock.records(), objectManager, s3Operator, uploadWALExecutor);
WALObjectUploadTaskContext context = new WALObjectUploadTaskContext();
context.task = walObjectUploadTask;
context.cache = logCacheBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -47,7 +48,7 @@ public class WALObjectUploadTask {
private final int streamSplitSizeThreshold;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final boolean forceSplit;
final boolean forceSplit;
private final boolean s3ObjectLogEnable;
private final CompletableFuture<Long> prepareCf = new CompletableFuture<>();
private volatile CommitWALObjectRequest commitWALObjectRequest;
Expand All @@ -68,9 +69,20 @@ public WALObjectUploadTask(Config config, Map<Long, List<StreamRecordBatch>> str
this.executor = executor;
}

public WALObjectUploadTask(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor) {
this(config, streamRecordsMap, objectManager, s3Operator, executor, streamRecordsMap.size() == 1);
public static WALObjectUploadTask of(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor) {
boolean forceSplit = streamRecordsMap.size() == 1;
if (!forceSplit) {
Optional<Boolean> hasWALData = streamRecordsMap.values()
.stream()
.map(records -> records.stream().mapToLong(StreamRecordBatch::size).sum() >= config.s3StreamSplitSize())
.filter(split -> !split)
.findAny();
if (hasWALData.isEmpty()) {
forceSplit = true;
}
}
return new WALObjectUploadTask(config, streamRecordsMap, objectManager, s3Operator, executor, forceSplit);
}

public CompletableFuture<Long> prepare() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
DefaultS3Operator(S3AsyncClient s3, String bucket) {
this(s3, bucket, false);
}

// used for test only.
DefaultS3Operator(S3AsyncClient s3, String bucket, boolean manualMergeRead) {
this.s3 = s3;
Expand Down Expand Up @@ -316,9 +317,14 @@ private String range(long start, long end) {

private static boolean isUnrecoverable(Throwable ex) {
ex = cause(ex);
return ex instanceof NoSuchKeyException
|| ex instanceof NoSuchBucketException
|| (ex instanceof S3Exception && ((S3Exception) ex).statusCode() == 403);
if (ex instanceof NoSuchKeyException || ex instanceof NoSuchBucketException) {
return true;
}
if (ex instanceof S3Exception) {
S3Exception s3Ex = (S3Exception) ex;
return s3Ex.statusCode() == 403 || s3Ex.statusCode() == 404;
}
return false;
}

private void checkAvailable() {
Expand Down Expand Up @@ -487,8 +493,13 @@ private void copyWrite0(int partNumber, UploadPartCopyRequest request, Completab
}).exceptionally(ex -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY_FAIL).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY_FAIL).operationTime.update(timerUtil.elapsed());
LOGGER.warn("{} UploadPartCopy for object {}-{} fail, retry later", logIdent, path, partNumber, ex);
scheduler.schedule(() -> copyWrite0(partNumber, request, partCf), 100, TimeUnit.MILLISECONDS);
if (isUnrecoverable(ex)) {
LOGGER.warn("{} UploadPartCopy for object {}-{} fail", logIdent, path, partNumber, ex);
partCf.completeExceptionally(ex);
} else {
LOGGER.warn("{} UploadPartCopy for object {}-{} fail, retry later", logIdent, path, partNumber, ex);
scheduler.schedule(() -> copyWrite0(partNumber, request, partCf), 100, TimeUnit.MILLISECONDS);
}
return null;
});
}
Expand Down Expand Up @@ -670,7 +681,7 @@ void handleReadCompleted(ByteBuf rst, Throwable ex) {
if (ex != null) {
readTasks.forEach(readTask -> readTask.cf.completeExceptionally(ex));
} else {
for (ReadTask readTask: readTasks) {
for (ReadTask readTask : readTasks) {
readTask.cf.complete(rst.retainedSlice((int) (readTask.start - start), (int) (readTask.end - readTask.start)));
}
rst.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package com.automq.stream.s3;

import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.CommitWALObjectRequest;
import com.automq.stream.s3.objects.CommitWALObjectResponse;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.objects.StreamObject;
import com.automq.stream.s3.operator.MemoryS3Operator;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.utils.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -36,11 +36,14 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import static com.automq.stream.s3.TestUtils.random;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -83,7 +86,7 @@ public void testUpload() throws Exception {
.s3ObjectBlockSize(16 * 1024 * 1024)
.s3ObjectPartSize(16 * 1024 * 1024)
.s3StreamSplitSize(1000);
walObjectUploadTask = new WALObjectUploadTask(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());
walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());

walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
Expand Down Expand Up @@ -160,7 +163,7 @@ public void testUpload_oneStream() throws Exception {
.s3ObjectBlockSize(16 * 1024 * 1024)
.s3ObjectPartSize(16 * 1024 * 1024)
.s3StreamSplitSize(16 * 1024 * 1024);
walObjectUploadTask = new WALObjectUploadTask(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());
walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());

walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
Expand All @@ -177,4 +180,26 @@ public void testUpload_oneStream() throws Exception {
assertEquals(0, request.getStreamRanges().size());
assertEquals(1, request.getStreamObjects().size());
}

@Test
public void test_emptyWALData() throws ExecutionException, InterruptedException, TimeoutException {
AtomicLong objectIdAlloc = new AtomicLong(10);
doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong());
when(objectManager.commitWALObject(any())).thenReturn(CompletableFuture.completedFuture(new CommitWALObjectResponse()));

Map<Long, List<StreamRecordBatch>> map = new HashMap<>();
map.put(233L, List.of(
new StreamRecordBatch(233, 0, 10, 2, random(512))
));
map.put(234L, List.of(
new StreamRecordBatch(234, 0, 20, 2, random(128))
));

Config config = new Config()
.s3ObjectBlockSize(16 * 1024 * 1024)
.s3ObjectPartSize(16 * 1024 * 1024)
.s3StreamSplitSize(64);
walObjectUploadTask = WALObjectUploadTask.of(config, map, objectManager, s3Operator, ForkJoinPool.commonPool());
assertTrue(walObjectUploadTask.forceSplit);
}
}