Skip to content

Commit

Permalink
feat(s3stream): support network inbound/outbound bandwidth throttle (#…
Browse files Browse the repository at this point in the history
…438)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Oct 24, 2023
1 parent 38eb023 commit bd9441b
Show file tree
Hide file tree
Showing 29 changed files with 647 additions and 147 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.15-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.16-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.15-SNAPSHOT</version>
<version>0.1.16-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
54 changes: 54 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public class Config {
private int s3ObjectMaxStreamObjectNumPerCommit = 10000;
private boolean s3MockEnable = false;
private boolean s3ObjectLogEnable = false;
// 100MB/s
private long networkInboundBaselineBandwidth = 100 * 1024 * 1024;
// 10 Times burst
private long networkInboundBurstBandwidth = 10 * networkInboundBaselineBandwidth;
// 100MB/s
private long networkOutboundBaselineBandwidth = 100 * 1024 * 1024;
// 10 Times burst
private long networkOutboundBurstBandwidth = 10 * networkOutboundBaselineBandwidth;
private int refillPeriodMs = 1000;

public int brokerId() {
return brokerId;
Expand Down Expand Up @@ -215,6 +224,26 @@ public String s3SecretKey() {
return s3SecretKey;
}

public long networkInboundBaselineBandwidth() {
return networkInboundBaselineBandwidth;
}

public long networkInboundBurstBandwidth() {
return networkInboundBurstBandwidth;
}

public long networkOutboundBaselineBandwidth() {
return networkOutboundBaselineBandwidth;
}

public long networkOutboundBurstBandwidth() {
return networkOutboundBurstBandwidth;
}

public int refillPeriodMs() {
return refillPeriodMs;
}

public Config brokerId(int brokerId) {
this.brokerId = brokerId;
return this;
Expand Down Expand Up @@ -409,4 +438,29 @@ public Config s3StreamObjectsCompactionNWInBandwidth(long s3StreamObjectsCompact
this.s3StreamObjectsCompactionNWInBandwidth = s3StreamObjectsCompactionNWInBandwidth;
return this;
}

public Config networkInboundBaselineBandwidth(long networkInboundBaselineBandwidth) {
this.networkInboundBaselineBandwidth = networkInboundBaselineBandwidth;
return this;
}

public Config networkInboundBurstBandwidth(long networkInboundBurstBandwidth) {
this.networkInboundBurstBandwidth = networkInboundBurstBandwidth;
return this;
}

public Config networkOutboundBaselineBandwidth(long networkOutboundBaselineBandwidth) {
this.networkOutboundBaselineBandwidth = networkOutboundBaselineBandwidth;
return this;
}

public Config networkOutboundBurstBandwidth(long networkOutboundBurstBandwidth) {
this.networkOutboundBurstBandwidth = networkOutboundBurstBandwidth;
return this;
}

public Config refillPeriodMs(int refillPeriodMs) {
this.refillPeriodMs = refillPeriodMs;
return this;
}
}
28 changes: 23 additions & 5 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.FutureUtil;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -70,11 +71,18 @@ public class S3Stream implements Stream {

private final Set<CompletableFuture<?>> pendingAppends = ConcurrentHashMap.newKeySet();
private final Set<CompletableFuture<?>> pendingFetches = ConcurrentHashMap.newKeySet();
private final AsyncNetworkBandwidthLimiter networkInboundLimiter;
private final AsyncNetworkBandwidthLimiter networkOutboundLimiter;
private CompletableFuture<Void> lastPendingTrim = CompletableFuture.completedFuture(null);

public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
StreamManager streamManager, StreamObjectsCompactionTask.Builder compactionTaskBuilder,
Function<Long, Void> closeHook) {
public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage, StreamManager streamManager,
StreamObjectsCompactionTask.Builder compactionTaskBuilder, Function<Long, Void> closeHook) {
this(streamId, epoch, startOffset, nextOffset, storage, streamManager, compactionTaskBuilder, closeHook, null, null);
}

public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage, StreamManager streamManager,
StreamObjectsCompactionTask.Builder compactionTaskBuilder, Function<Long, Void> closeHook,
AsyncNetworkBandwidthLimiter networkInboundLimiter, AsyncNetworkBandwidthLimiter networkOutboundLimiter) {
this.streamId = streamId;
this.epoch = epoch;
this.startOffset = startOffset;
Expand All @@ -86,6 +94,8 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St
this.streamManager = streamManager;
this.streamObjectsCompactionTask = compactionTaskBuilder.withStream(this).build();
this.closeHook = closeHook;
this.networkInboundLimiter = networkInboundLimiter;
this.networkOutboundLimiter = networkOutboundLimiter;
}

public StreamObjectsCompactionTask.CompactionSummary triggerCompactionTask() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -123,7 +133,12 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
writeLock.lock();
try {
long start = System.currentTimeMillis();
CompletableFuture<AppendResult> cf = exec(() -> append0(recordBatch), LOGGER, "append");
CompletableFuture<AppendResult> cf = exec(() -> {
if (networkInboundLimiter != null) {
networkInboundLimiter.forceConsume(recordBatch.rawPayload().remaining());
}
return append0(recordBatch);
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STREAM).operationCount.inc();
Expand Down Expand Up @@ -167,11 +182,14 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
long start = System.currentTimeMillis();
CompletableFuture<FetchResult> cf = exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((nil, ex) -> {
cf.whenComplete((rs, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.FETCH_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.FETCH_STREAM).operationTime.update(System.currentTimeMillis() - start);
if (ex != null) {
LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex);
} else if (networkOutboundLimiter != null) {
long totalSize = rs.recordBatchList().stream().mapToLong(record -> record.rawPayload().remaining()).sum();
networkOutboundLimiter.forceConsume(totalSize);
}
pendingFetches.remove(cf);
});
Expand Down
52 changes: 27 additions & 25 deletions s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import com.automq.stream.api.OpenStreamOptions;
import com.automq.stream.api.Stream;
import com.automq.stream.api.StreamClient;
import com.automq.stream.s3.compact.AsyncTokenBucketThrottle;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.FutureUtil;
Expand Down Expand Up @@ -56,17 +56,23 @@ public class S3StreamClient implements StreamClient {
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final Config config;
private final AsyncTokenBucketThrottle readThrottle;
private final AsyncNetworkBandwidthLimiter networkInboundBucket;
private final AsyncNetworkBandwidthLimiter networkOutboundBucket;

public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManager objectManager,
S3Operator s3Operator, Config config) {
public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManager objectManager, S3Operator s3Operator, Config config) {
this(streamManager, storage, objectManager, s3Operator, config, null, null);
}

public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManager objectManager, S3Operator s3Operator, Config config,
AsyncNetworkBandwidthLimiter networkInboundBucket, AsyncNetworkBandwidthLimiter networkOutboundBucket) {
this.streamManager = streamManager;
this.storage = storage;
this.openedStreams = new ConcurrentHashMap<>();
this.objectManager = objectManager;
this.s3Operator = s3Operator;
this.config = config;
this.readThrottle = new AsyncTokenBucketThrottle(config.s3StreamObjectsCompactionNWInBandwidth(), 1, "s3-stream-objects-compaction");
this.networkInboundBucket = networkInboundBucket;
this.networkOutboundBucket = networkOutboundBucket;
startStreamObjectsCompactions();
}

Expand Down Expand Up @@ -129,24 +135,23 @@ private void startStreamObjectsCompactions() {
private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationTime.update(timerUtil.elapsed());
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.withCompactedStreamObjectMaxSizeInBytes(config.s3StreamObjectCompactionMaxSizeBytes())
.withEligibleStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeMinutes() * 60L * 1000)
.withS3ObjectLogEnabled(config.s3ObjectLogEnable())
.withReadThrottle(readThrottle);
S3Stream stream = new S3Stream(
metadata.getStreamId(), metadata.getEpoch(),
metadata.getStartOffset(), metadata.getEndOffset(),
storage, streamManager, builder, id -> {
openedStreams.remove(id);
return null;
thenApply(metadata -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationTime.update(timerUtil.elapsed());
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.withCompactedStreamObjectMaxSizeInBytes(config.s3StreamObjectCompactionMaxSizeBytes())
.withEligibleStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeMinutes() * 60L * 1000)
.withS3ObjectLogEnabled(config.s3ObjectLogEnable());
S3Stream stream = new S3Stream(
metadata.getStreamId(), metadata.getEpoch(),
metadata.getStartOffset(), metadata.getEndOffset(),
storage, streamManager, builder, id -> {
openedStreams.remove(id);
return null;
}, networkInboundBucket, networkOutboundBucket);
openedStreams.put(streamId, stream);
return stream;
});
openedStreams.put(streamId, stream);
return stream;
});
}

@Override
Expand All @@ -163,9 +168,6 @@ public void shutdown() {
} catch (InterruptedException e) {
LOGGER.warn("await streamObjectCompactionExecutor close fail", e);
}
if (readThrottle != null) {
readThrottle.stop();
}

TimerUtil timerUtil = new TimerUtil();
Map<Long, CompletableFuture<Void>> streamCloseFutures = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.stream.s3;

import com.automq.stream.s3.compact.AsyncTokenBucketThrottle;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.operator.Writer;
import io.netty.buffer.ByteBuf;
Expand All @@ -43,10 +43,10 @@ public class StreamObjectCopier {

private long size;

public StreamObjectCopier(long objectId, S3Operator s3Operator, AsyncTokenBucketThrottle readThrottle) {
public StreamObjectCopier(long objectId, S3Operator s3Operator) {
this.s3Operator = s3Operator;
// TODO: use a better clusterName
this.writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), readThrottle);
this.writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE);
this.completedObjects = new LinkedList<>();
this.nextObjectDataStartPosition = 0;
this.blockCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.automq.stream.s3;

import com.automq.stream.s3.compact.AsyncTokenBucketThrottle;
import com.automq.stream.s3.objects.CommitStreamObjectRequest;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -61,7 +60,6 @@ public class StreamObjectsCompactionTask {
private final S3Operator s3Operator;
private List<CompactionResult> compactionResults;
private final String logIdent;
private final AsyncTokenBucketThrottle readThrottle;

/**
* Constructor of StreamObjectsCompactionTask.
Expand All @@ -74,9 +72,8 @@ public class StreamObjectsCompactionTask {
* @param eligibleStreamObjectLivingTimeInMs eligible stream object living time in ms.
*/
public StreamObjectsCompactionTask(ObjectManager objectManager, S3Operator s3Operator, S3Stream stream,
long compactedStreamObjectMaxSizeInBytes, long eligibleStreamObjectLivingTimeInMs,
AsyncTokenBucketThrottle readThrottle) {
this(objectManager, s3Operator, stream, compactedStreamObjectMaxSizeInBytes, eligibleStreamObjectLivingTimeInMs, readThrottle, false);
long compactedStreamObjectMaxSizeInBytes, long eligibleStreamObjectLivingTimeInMs) {
this(objectManager, s3Operator, stream, compactedStreamObjectMaxSizeInBytes, eligibleStreamObjectLivingTimeInMs, false);
}

/**
Expand All @@ -88,12 +85,11 @@ public StreamObjectsCompactionTask(ObjectManager objectManager, S3Operator s3Ope
* If it is bigger than {@link Writer#MAX_OBJECT_SIZE},
* it will be set to {@link Writer#MAX_OBJECT_SIZE}.
* @param eligibleStreamObjectLivingTimeInMs eligible stream object living time in ms.
* @param readThrottle read throttle in compaction task.
* @param s3ObjectLogEnabled is s3 object log enabled.
*/
public StreamObjectsCompactionTask(ObjectManager objectManager, S3Operator s3Operator, S3Stream stream,
long compactedStreamObjectMaxSizeInBytes, long eligibleStreamObjectLivingTimeInMs,
AsyncTokenBucketThrottle readThrottle, boolean s3ObjectLogEnabled) {
boolean s3ObjectLogEnabled) {
this.objectManager = objectManager;
this.s3Operator = s3Operator;
this.stream = stream;
Expand All @@ -104,7 +100,6 @@ public StreamObjectsCompactionTask(ObjectManager objectManager, S3Operator s3Ope
this.compactionResults = Collections.emptyList();
this.logIdent = "[StreamObjectsCompactionTask streamId=" + stream.streamId() + "] ";
this.s3ObjectLogger = S3ObjectLogger.logger(logIdent);
this.readThrottle = readThrottle;
}

private CompletableFuture<CompactionResult> doCompaction(List<S3StreamObjectMetadataSplitWrapper> streamObjectMetadataList) {
Expand All @@ -126,7 +121,7 @@ private CompletableFuture<CompactionResult> doCompaction(List<S3StreamObjectMeta
AtomicInteger smallSizeCopyWriteCount = new AtomicInteger(0);
return objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30))
.thenCompose(objId -> {
StreamObjectCopier objectCopier = new StreamObjectCopier(objId, s3Operator, readThrottle);
StreamObjectCopier objectCopier = new StreamObjectCopier(objId, s3Operator);
streamObjectMetadataList.forEach(metadataWrapper -> {
S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(metadataWrapper.s3StreamObjectMetadata().objectId(),
metadataWrapper.s3StreamObjectMetadata().objectSize(),
Expand Down Expand Up @@ -592,7 +587,6 @@ public static class Builder {
private long compactedStreamObjectMaxSizeInBytes;
private long eligibleStreamObjectLivingTimeInMs;
private boolean s3ObjectLogEnabled;
private AsyncTokenBucketThrottle readThrottle;

public Builder(ObjectManager objectManager, S3Operator s3Operator) {
this.objectManager = objectManager;
Expand Down Expand Up @@ -623,14 +617,9 @@ public Builder withS3ObjectLogEnabled(boolean s3ObjectLogEnabled) {
return this;
}

public Builder withReadThrottle(AsyncTokenBucketThrottle readThrottle) {
this.readThrottle = readThrottle;
return this;
}

public StreamObjectsCompactionTask build() {
return new StreamObjectsCompactionTask(objectManager, s3Operator, stream, compactedStreamObjectMaxSizeInBytes,
eligibleStreamObjectLivingTimeInMs, readThrottle, s3ObjectLogEnabled);
eligibleStreamObjectLivingTimeInMs, s3ObjectLogEnabled);
}
}
}
Loading

0 comments on commit bd9441b

Please sign in to comment.