Skip to content

Commit

Permalink
feat(s3stream/wal/benchmark): block device support (#669)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 20, 2023
1 parent 0acc27b commit 0049b3d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package com.automq.stream.s3.wal.benchmark;

import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.wal.BlockWALService;
import com.automq.stream.s3.wal.WriteAheadLog;
import com.automq.stream.s3.wal.util.WALChannel;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -74,24 +76,25 @@ public static void main(String[] args) throws IOException {
}
Config config = new Config(ns);

resetWALHeader(config.path);
try (WriteBench bench = new WriteBench(config)) {
bench.run(config);
}
}

private static void logIt(Config config, Stat stat) {
ScheduledExecutorService statExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stat-thread-%d", true), null);
statExecutor.scheduleAtFixedRate(() -> {
Stat.Result result = stat.reset();
if (0 != result.count()) {
System.out.printf("Append task | Append Rate %d msg/s %d KB/s | Avg Latency %.3f ms | Max Latency %.3f ms\n",
TimeUnit.SECONDS.toNanos(1) * result.count() / result.elapsedTimeNanos(),
TimeUnit.SECONDS.toNanos(1) * (result.count() * config.recordSizeBytes) / result.elapsedTimeNanos() / 1024,
(double) result.costNanos() / TimeUnit.MILLISECONDS.toNanos(1) / result.count(),
(double) result.maxCostNanos() / TimeUnit.MILLISECONDS.toNanos(1));
}
}, LOG_INTERVAL_SECONDS, LOG_INTERVAL_SECONDS, TimeUnit.SECONDS);
private static void resetWALHeader(String path) throws IOException {
if (!path.startsWith(WALChannel.WALChannelBuilder.DEVICE_PREFIX)) {
return;
}
System.out.println("Resetting WAL header");
int capacity = BlockWALService.WAL_HEADER_TOTAL_CAPACITY;
WALChannel channel = WALChannel.builder(path).capacity(capacity).build();
channel.open();
ByteBuf buf = DirectByteBufAlloc.byteBuffer(capacity);
buf.writeZero(capacity);
channel.write(buf, 0);
buf.release();
channel.close();
}

private void run(Config config) {
Expand Down Expand Up @@ -190,6 +193,21 @@ private void runAppendTask(int index, AppendTaskConfig config, Stat stat) throws
System.out.printf("Append task %d finished\n", index);
}

private static void logIt(Config config, Stat stat) {
ScheduledExecutorService statExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stat-thread-%d", true), null);
statExecutor.scheduleAtFixedRate(() -> {
Stat.Result result = stat.reset();
if (0 != result.count()) {
System.out.printf("Append task | Append Rate %d msg/s %d KB/s | Avg Latency %.3f ms | Max Latency %.3f ms\n",
TimeUnit.SECONDS.toNanos(1) * result.count() / result.elapsedTimeNanos(),
TimeUnit.SECONDS.toNanos(1) * (result.count() * config.recordSizeBytes) / result.elapsedTimeNanos() / 1024,
(double) result.costNanos() / TimeUnit.MILLISECONDS.toNanos(1) / result.count(),
(double) result.maxCostNanos() / TimeUnit.MILLISECONDS.toNanos(1));
}
}, LOG_INTERVAL_SECONDS, LOG_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

@Override
public void close() {
log.shutdownGracefully();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ default void writeAndFlush(ByteBuf src, long position) throws IOException {
int read(ByteBuf dst, long position) throws IOException;

class WALChannelBuilder {
static final String DEVICE_PREFIX = "/dev/";
public static final String DEVICE_PREFIX = "/dev/";
private final String path;
private boolean direct;
private long capacity;
Expand Down

0 comments on commit 0049b3d

Please sign in to comment.