diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 21b3e1d87..80d224075 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -234,7 +234,7 @@ public void shutdown() { @Override public CompletableFuture append(StreamRecordBatch streamRecord) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); CompletableFuture cf = new CompletableFuture<>(); // encoded before append to free heap ByteBuf. streamRecord.encoded(); @@ -317,7 +317,7 @@ private void tryDrainBackoffRecords() { @Override public CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); CompletableFuture cf = new CompletableFuture<>(); mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf)); cf.whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsed())); @@ -423,7 +423,7 @@ CompletableFuture uploadDeltaWAL(LogCache.LogCacheBlock logCacheBlock) { } CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); CompletableFuture cf = new CompletableFuture<>(); context.cf = cf; inflightWALUploadTasks.add(cf); diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 00767e0f0..5bb28c326 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -135,7 +135,7 @@ public long nextOffset() { public CompletableFuture append(RecordBatch recordBatch) { writeLock.lock(); try { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); CompletableFuture cf = exec(() -> { if (networkInboundLimiter != null) { networkInboundLimiter.forceConsume(recordBatch.rawPayload().remaining()); @@ -181,7 +181,7 @@ private CompletableFuture append0(RecordBatch recordBatch) { public CompletableFuture fetch(long startOffset, long endOffset, int maxBytes) { readLock.lock(); try { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); CompletableFuture cf = exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch"); pendingFetches.add(cf); cf.whenComplete((rs, ex) -> { @@ -235,7 +235,7 @@ private CompletableFuture fetch0(long startOffset, long endOffset, public CompletableFuture trim(long newStartOffset) { writeLock.lock(); try { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); return exec(() -> { CompletableFuture cf = new CompletableFuture<>(); lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf)); diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index fa73187a7..675841364 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -82,7 +82,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage @Override public CompletableFuture createAndOpenStream(CreateStreamOptions options) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> { OperationMetricsStats.getHistogram(S3Operation.CREATE_STREAM).update(timerUtil.elapsed()); return openStream0(streamId, options.epoch()); @@ -136,7 +136,7 @@ private void startStreamObjectsCompactions() { } private CompletableFuture openStream0(long streamId, long epoch) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); return streamManager.openStream(streamId, epoch). thenApply(metadata -> { OperationMetricsStats.getHistogram(S3Operation.OPEN_STREAM).update(timerUtil.elapsed()); @@ -171,7 +171,7 @@ public void shutdown() { LOGGER.warn("await streamObjectCompactionExecutor close fail", e); } - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); Map> streamCloseFutures = new ConcurrentHashMap<>(); openedStreams.forEach((streamId, stream) -> streamCloseFutures.put(streamId, stream.close())); for (; ; ) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index 74707212e..f330d5eea 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -77,7 +77,7 @@ public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3O @Override public CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); CompletableFuture readCf = new CompletableFuture<>(); // submit read task to mainExecutor to avoid read slower the caller thread. mainExecutor.execute(() -> FutureUtil.exec(() -> diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index b17fafe83..9bfe2cf74 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -69,7 +69,7 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo } public boolean put(StreamRecordBatch recordBatch) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); tryRealFree(); size.addAndGet(recordBatch.size()); boolean full = activeBlock.put(recordBatch); @@ -102,7 +102,7 @@ public boolean put(StreamRecordBatch recordBatch) { * Note: the records is retained, the caller should release it. */ public List get(long streamId, long startOffset, long endOffset, int maxBytes) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); List records = get0(streamId, startOffset, endOffset, maxBytes); records.forEach(StreamRecordBatch::retain); if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index b2dfe9d4d..3e0dff877 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -244,7 +244,7 @@ CompletableFuture mergedRangeRead(String path, long start, long end) { } void mergedRangeRead0(String path, long start, long end, CompletableFuture cf) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build(); readS3Client.getObject(request, AsyncResponseTransformer.toPublisher()) .thenAccept(responsePublisher -> { @@ -289,7 +289,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy } private void write0(String path, ByteBuf data, CompletableFuture cf) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); int objectSize = data.readableBytes(); PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers()); @@ -319,7 +319,7 @@ public Writer writer(String path, ThrottleStrategy throttleStrategy) { @Override public CompletableFuture delete(String path) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build(); return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> { OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECT).update(timerUtil.elapsed()); @@ -333,7 +333,7 @@ public CompletableFuture delete(String path) { @Override public CompletableFuture> delete(List objectKeys) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); ObjectIdentifier[] toDeleteKeys = objectKeys.stream().map(key -> ObjectIdentifier.builder() .key(key) @@ -367,7 +367,7 @@ public CompletableFuture createMultipartUpload(String path) { } void createMultipartUpload0(String path, CompletableFuture cf) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build(); writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { OperationMetricsStats.getHistogram(S3Operation.CREATE_MULTI_PART_UPLOAD).update(timerUtil.elapsed()); @@ -408,7 +408,7 @@ public CompletableFuture uploadPart(String path, String uploadId, } private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf part, CompletableFuture cf) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers()); UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId) .partNumber(partNumber).build(); @@ -442,7 +442,7 @@ public CompletableFuture uploadPartCopy(String sourcePath, String } private void uploadPartCopy0(String sourcePath, String path, long start, long end, String uploadId, int partNumber, CompletableFuture cf) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); long inclusiveEnd = end - 1; UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath) .destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build(); @@ -476,7 +476,7 @@ public CompletableFuture completeMultipartUpload(String path, String uploa } public void completeMultipartUpload0(String path, String uploadId, List parts, CompletableFuture cf) { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(parts).build(); CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index 9a4734d09..c3c994226 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -49,7 +49,7 @@ public class MultiPartWriter implements Writer { */ private final long minPartSize; private ObjectPart objectPart = null; - private final TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + private final TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); private final ThrottleStrategy throttleStrategy; private final AtomicLong totalWriteSize = new AtomicLong(0L); @@ -212,7 +212,7 @@ public void upload() { } private void upload0() { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf); partCf.whenComplete((nil, ex) -> S3ObjectMetricsStats.getHistogram(S3ObjectStage.UPLOAD_PART).update(timerUtil.elapsed())); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 2a839e86e..e151f63df 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -421,7 +421,7 @@ public AppendResult append(ByteBuf buf, int crc) throws OverCapacityException { } public AppendResult append0(ByteBuf body, int crc) throws OverCapacityException { - TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); + TimerUtil timerUtil = new TimerUtil(TimeUnit.NANOSECONDS); checkReadyToServe(); final long recordSize = RECORD_HEADER_SIZE + body.readableBytes();