Skip to content

Commit

Permalink
feat(s3stream): block cache merge the same block inflight read
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Oct 18, 2023
1 parent 8ed9239 commit 5146627
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.7-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.8-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.7-SNAPSHOT</version>
<version>0.1.8-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
8 changes: 8 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public ObjectReader(S3ObjectMetadata metadata, S3Operator s3Operator) {
asyncGetBasicObjectInfo();
}

public String objectKey() {
return objectKey;
}

public CompletableFuture<BasicObjectInfo> basicObjectInfo() {
return basicObjectInfoCf;
}
Expand Down Expand Up @@ -342,6 +346,10 @@ public void close() {
};
}

public int recordCount() {
return recordCount;
}

@Override
public void close() {
buf.release();
Expand Down
9 changes: 5 additions & 4 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package com.automq.stream.s3;

import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.cache.LogCache;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -90,7 +90,8 @@ public S3Storage(Config config, WriteAheadLog log, StreamManager streamManager,
this.maxWALCacheSize = config.s3WALCacheSize();
this.log = log;
this.blockCache = blockCache;
this.logCache = new LogCache(config.s3WALObjectSize(), block -> blockCache.put(block.records()));
this.logCache = new LogCache(config.s3WALObjectSize(), block -> {
});
this.streamManager = streamManager;
this.objectManager = objectManager;
this.s3Operator = s3Operator;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.cache;

import com.automq.stream.s3.ObjectReader;
import org.apache.commons.lang3.tuple.Pair;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Accumulate inflight data block read requests to one real read request.
*/
public class DataBlockReadAccumulator {
private final Map<Pair<String, Integer>, DataBlockRecords> inflightDataBlockReads = new ConcurrentHashMap<>();
private final Consumer<DataBlockRecords> dataBlockConsumer;

public DataBlockReadAccumulator(Consumer<DataBlockRecords> dataBlockConsumer) {
this.dataBlockConsumer = dataBlockConsumer;
}

public CompletableFuture<DataBlockRecords> readDataBlock(ObjectReader reader, ObjectReader.DataBlockIndex blockIndex) {
CompletableFuture<DataBlockRecords> cf = new CompletableFuture<>();
BiConsumer<DataBlockRecords, Throwable> listener = (rst, ex) -> {
if (ex != null) {
cf.completeExceptionally(ex);
} else {
cf.complete(rst);
}
};
Pair<String, Integer> key = Pair.of(reader.objectKey(), blockIndex.blockId());
synchronized (inflightDataBlockReads) {
DataBlockRecords records = inflightDataBlockReads.get(key);
if (records == null) {
records = new DataBlockRecords();
records.registerListener(listener);
inflightDataBlockReads.put(key, records);
DataBlockRecords finalRecords = records;
reader.read(blockIndex).whenComplete((rst, ex) -> {
synchronized (inflightDataBlockReads) {
inflightDataBlockReads.remove(key, finalRecords);
}
finalRecords.complete(rst, ex);
dataBlockConsumer.accept(finalRecords);
finalRecords.release();
});
} else {
records.registerListener(listener);
}
}
return cf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.cache;

import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.utils.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

public class DataBlockRecords {
private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockRecords.class);
final AtomicInteger refCount = new AtomicInteger(1);
private final List<BiConsumer<DataBlockRecords, Throwable>> listeners = new LinkedList<>();
private List<StreamRecordBatch> records = Collections.emptyList();

public void registerListener(BiConsumer<DataBlockRecords, Throwable> listener) {
retain();
listeners.add(listener);
}

public void complete(ObjectReader.DataBlock dataBlock, Throwable ex) {
if (ex == null) {
records = new ArrayList<>(dataBlock.recordCount());
try (CloseableIterator<StreamRecordBatch> it = dataBlock.iterator()) {
while (it.hasNext()) {
records.add(it.next());
}
} catch (Throwable e) {
LOGGER.error("parse data block fail", e);
ex = e;
}
}
Throwable finalEx = ex;
listeners.forEach(listener -> {
try {
listener.accept(this, finalEx);
} catch (Throwable e) {
release();
LOGGER.error("DataBlockRecords fail to notify listener {}", listener, e);
}
});
}

public List<StreamRecordBatch> records() {
return Collections.unmodifiableList(records);
}

void retain() {
refCount.incrementAndGet();
}

void release() {
if (refCount.decrementAndGet() == 0) {
records.forEach(StreamRecordBatch::release);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -41,8 +43,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;

Expand All @@ -54,6 +54,7 @@ public class DefaultS3BlockCache implements S3BlockCache {
private final ExecutorService mainExecutor;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final DataBlockReadAccumulator dataBlockReadAccumulator;

public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3Operator s3Operator) {
this.cache = new BlockCache(cacheBytesSize);
Expand All @@ -63,6 +64,14 @@ public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3O
LOGGER);
this.objectManager = objectManager;
this.s3Operator = s3Operator;
dataBlockReadAccumulator = new DataBlockReadAccumulator(dataBlockRecords -> {
List<StreamRecordBatch> records = dataBlockRecords.records();
if (!records.isEmpty()) {
long streamId = records.get(0).getStreamId();
records.forEach(StreamRecordBatch::retain);
cache.put(streamId, records);
}
});
}

@Override
Expand Down Expand Up @@ -168,11 +177,13 @@ private CompletableFuture<ReadDataBlock> readFromS3(long streamId, long endOffse
context.reader = reader;
return reader.find(streamId, context.nextStartOffset, endOffset, context.nextMaxBytes);
});
CompletableFuture<List<CompletableFuture<ObjectReader.DataBlock>>> getDataCf = findIndexCf.thenCompose(blockIndexes -> {
CompletableFuture<List<CompletableFuture<DataBlockRecords>>> getDataCf = findIndexCf.thenCompose(blockIndexes -> {
if (blockIndexes.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
List<CompletableFuture<ObjectReader.DataBlock>> blockCfList = blockIndexes.stream().map(context.reader::read).collect(Collectors.toList());
List<CompletableFuture<DataBlockRecords>> blockCfList = blockIndexes.stream()
.map(i -> dataBlockReadAccumulator.readDataBlock(context.reader, i))
.collect(Collectors.toList());
CompletableFuture<Void> allBlockCf = CompletableFuture.allOf(blockCfList.toArray(new CompletableFuture[0]));
return allBlockCf.thenApply(nil -> blockCfList);
});
Expand All @@ -190,32 +201,27 @@ private CompletableFuture<ReadDataBlock> readFromS3(long streamId, long endOffse
long nextStartOffset = context.nextStartOffset;
int nextMaxBytes = context.nextMaxBytes;
boolean fulfill = false;
List<StreamRecordBatch> remaining = new ArrayList<>();
for (CompletableFuture<ObjectReader.DataBlock> blockCf : blockCfList) {
ObjectReader.DataBlock dataBlock = blockCf.get();
try (CloseableIterator<StreamRecordBatch> it = dataBlock.iterator()) {
while (it.hasNext()) {
StreamRecordBatch recordBatch = it.next();
for (CompletableFuture<DataBlockRecords> blockCf : blockCfList) {
DataBlockRecords dataBlock = blockCf.get();
// TODO: add #getRecords to DataBlockRecords, use binary search to get the records we need.
if (!fulfill) {
for (StreamRecordBatch recordBatch : dataBlock.records()) {
if (fulfill) {
break;
}
if (recordBatch.getLastOffset() <= nextStartOffset) {
recordBatch.release();
continue;
}
if (fulfill) {
remaining.add(recordBatch);
} else {
context.records.add(recordBatch);
nextStartOffset = recordBatch.getLastOffset();
nextMaxBytes -= Math.min(nextMaxBytes, recordBatch.size());
if ((endOffset != NOOP_OFFSET && nextStartOffset >= endOffset) || nextMaxBytes == 0) {
fulfill = true;
}
recordBatch.retain();
context.records.add(recordBatch);
nextStartOffset = recordBatch.getLastOffset();
nextMaxBytes -= Math.min(nextMaxBytes, recordBatch.size());
if ((endOffset != NOOP_OFFSET && nextStartOffset >= endOffset) || nextMaxBytes == 0) {
fulfill = true;
}
}
}
dataBlock.close();
}
if (!remaining.isEmpty()) {
cache.put(streamId, remaining);
dataBlock.release();
}
context.nextStartOffset = nextStartOffset;
context.nextMaxBytes = nextMaxBytes;
Expand All @@ -242,7 +248,6 @@ private void backgroundReadahead(long streamId, BlockCache.Readahead readahead)
ReadingTaskKey readingTaskKey = new ReadingTaskKey(streamId, readahead.getStartOffset());
readaheadTasks.put(readingTaskKey, readaheadCf);
readaheadCf
.thenAccept(readDataBlock -> cache.put(streamId, readDataBlock.getRecords()))
.whenComplete((nil, ex) -> {
if (ex != null) {
LOGGER.error("background readahead {} fail", readahead, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ private void tryRealFree() {
if (b.free) {
size.addAndGet(-b.size);
blockFreeListener.accept(b);
b.free();
}
return b.free;
});
Expand Down Expand Up @@ -277,6 +278,11 @@ public void confirmOffset(long confirmOffset) {
public long size() {
return size;
}

public void free() {
map.forEach((streamId, records) -> records.forEach(StreamRecordBatch::release));
map.clear();
}
}

static class StreamRange {
Expand Down
Loading

0 comments on commit 5146627

Please sign in to comment.