Skip to content

Commit

Permalink
feat(s3stream): change s3stream metrics time unit to nanoseconds (#650)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 16, 2023
1 parent 63c3e38 commit bc9a53d
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 23 deletions.
6 changes: 3 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void shutdown() {

@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
CompletableFuture<Void> cf = new CompletableFuture<>();
// encoded before append to free heap ByteBuf.
streamRecord.encoded();
Expand Down Expand Up @@ -317,7 +317,7 @@ private void tryDrainBackoffRecords() {

@Override
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf));
cf.whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsed()));
Expand Down Expand Up @@ -423,7 +423,7 @@ CompletableFuture<Void> uploadDeltaWAL(LogCache.LogCacheBlock logCacheBlock) {
}

CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
CompletableFuture<Void> cf = new CompletableFuture<>();
context.cf = cf;
inflightWALUploadTasks.add(cf);
Expand Down
6 changes: 3 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public long nextOffset() {
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
writeLock.lock();
try {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
CompletableFuture<AppendResult> cf = exec(() -> {
if (networkInboundLimiter != null) {
networkInboundLimiter.forceConsume(recordBatch.rawPayload().remaining());
Expand Down Expand Up @@ -181,7 +181,7 @@ private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes) {
readLock.lock();
try {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
CompletableFuture<FetchResult> cf = exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
Expand Down Expand Up @@ -235,7 +235,7 @@ private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset,
public CompletableFuture<Void> trim(long newStartOffset) {
writeLock.lock();
try {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
return exec(() -> {
CompletableFuture<Void> cf = new CompletableFuture<>();
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage

@Override
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
OperationMetricsStats.getHistogram(S3Operation.CREATE_STREAM).update(timerUtil.elapsed());
return openStream0(streamId, options.epoch());
Expand Down Expand Up @@ -136,7 +136,7 @@ private void startStreamObjectsCompactions() {
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
OperationMetricsStats.getHistogram(S3Operation.OPEN_STREAM).update(timerUtil.elapsed());
Expand Down Expand Up @@ -171,7 +171,7 @@ public void shutdown() {
LOGGER.warn("await streamObjectCompactionExecutor close fail", e);
}

TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
Map<Long, CompletableFuture<Void>> streamCloseFutures = new ConcurrentHashMap<>();
openedStreams.forEach((streamId, stream) -> streamCloseFutures.put(streamId, stream.close()));
for (; ; ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3O

@Override
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
// submit read task to mainExecutor to avoid read slower the caller thread.
mainExecutor.execute(() -> FutureUtil.exec(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo
}

public boolean put(StreamRecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
tryRealFree();
size.addAndGet(recordBatch.size());
boolean full = activeBlock.put(recordBatch);
Expand Down Expand Up @@ -102,7 +102,7 @@ public boolean put(StreamRecordBatch recordBatch) {
* Note: the records is retained, the caller should release it.
*/
public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
List<StreamRecordBatch> records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ CompletableFuture<ByteBuf> mergedRangeRead(String path, long start, long end) {
}

void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteBuf> cf) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build();
readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenAccept(responsePublisher -> {
Expand Down Expand Up @@ -289,7 +289,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
}

private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
int objectSize = data.readableBytes();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
Expand Down Expand Up @@ -319,7 +319,7 @@ public Writer writer(String path, ThrottleStrategy throttleStrategy) {

@Override
public CompletableFuture<Void> delete(String path) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECT).update(timerUtil.elapsed());
Expand All @@ -333,7 +333,7 @@ public CompletableFuture<Void> delete(String path) {

@Override
public CompletableFuture<List<String>> delete(List<String> objectKeys) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
ObjectIdentifier[] toDeleteKeys = objectKeys.stream().map(key ->
ObjectIdentifier.builder()
.key(key)
Expand Down Expand Up @@ -367,7 +367,7 @@ public CompletableFuture<String> createMultipartUpload(String path) {
}

void createMultipartUpload0(String path, CompletableFuture<String> cf) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
OperationMetricsStats.getHistogram(S3Operation.CREATE_MULTI_PART_UPLOAD).update(timerUtil.elapsed());
Expand Down Expand Up @@ -408,7 +408,7 @@ public CompletableFuture<CompletedPart> uploadPart(String path, String uploadId,
}

private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf part, CompletableFuture<CompletedPart> cf) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers());
UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId)
.partNumber(partNumber).build();
Expand Down Expand Up @@ -442,7 +442,7 @@ public CompletableFuture<CompletedPart> uploadPartCopy(String sourcePath, String
}

private void uploadPartCopy0(String sourcePath, String path, long start, long end, String uploadId, int partNumber, CompletableFuture<CompletedPart> cf) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
long inclusiveEnd = end - 1;
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
Expand Down Expand Up @@ -476,7 +476,7 @@ public CompletableFuture<Void> completeMultipartUpload(String path, String uploa
}

public void completeMultipartUpload0(String path, String uploadId, List<CompletedPart> parts, CompletableFuture<Void> cf) {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(parts).build();
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class MultiPartWriter implements Writer {
*/
private final long minPartSize;
private ObjectPart objectPart = null;
private final TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
private final TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
private final ThrottleStrategy throttleStrategy;
private final AtomicLong totalWriteSize = new AtomicLong(0L);

Expand Down Expand Up @@ -212,7 +212,7 @@ public void upload() {
}

private void upload0() {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf);
partCf.whenComplete((nil, ex) -> S3ObjectMetricsStats.getHistogram(S3ObjectStage.UPLOAD_PART).update(timerUtil.elapsed()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public AppendResult append(ByteBuf buf, int crc) throws OverCapacityException {
}

public AppendResult append0(ByteBuf body, int crc) throws OverCapacityException {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS);
checkReadyToServe();

final long recordSize = RECORD_HEADER_SIZE + body.readableBytes();
Expand Down

0 comments on commit bc9a53d

Please sign in to comment.