diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 697e249b4..edbe4450b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -126,6 +126,20 @@ public class BlockWALService implements WriteAheadLog { private BlockWALService() { } + /** + * A protected constructor for testing purpose. + */ + protected BlockWALService(BlockWALServiceBuilder builder) { + BlockWALService that = builder.build(); + this.initialWindowSize = that.initialWindowSize; + this.walChannel = that.walChannel; + this.slidingWindowService = that.slidingWindowService; + this.walHeader = that.walHeader; + this.recoveryMode = that.recoveryMode; + this.nodeId = that.nodeId; + this.epoch = that.epoch; + } + public static BlockWALServiceBuilder builder(String path, long capacity) { return new BlockWALServiceBuilder(path, capacity); } @@ -293,6 +307,13 @@ private long getCurrentStartOffset() { } } + /** + * Protected method for testing purpose. + */ + protected WALHeader tryReadWALHeader() { + return tryReadWALHeader(walChannel); + } + /** * Try to read the header from WAL, return the latest one. */ @@ -792,7 +813,10 @@ public long getJumpNextRecoverOffset() { } } - class RecoverIterator implements Iterator { + /** + * Protected for testing purpose. + */ + protected class RecoverIterator implements Iterator { private final long windowLength; private final long skipRecordAtOffset; private long nextRecoverOffset; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java b/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java index 02bf18d43..d006de683 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java @@ -44,7 +44,7 @@ * 8 - [4B] {@link WALHeader#crc8} CRC of the rest of the WAL header, used to verify the correctness of the * WAL header */ -class WALHeader { +public class WALHeader { public static final int WAL_HEADER_MAGIC_CODE = 0x12345678; public static final int WAL_HEADER_SIZE = 4 // magic code + 8 // capacity diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/RecoverTool.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/RecoverTool.java new file mode 100644 index 000000000..d306410ee --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/RecoverTool.java @@ -0,0 +1,133 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.wal.benchmark; + +import com.automq.stream.s3.StreamRecordBatchCodec; +import com.automq.stream.s3.model.StreamRecordBatch; +import com.automq.stream.s3.wal.BlockWALService; +import com.automq.stream.s3.wal.WALHeader; +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.util.Iterator; +import java.util.function.Function; +import java.util.stream.StreamSupport; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; + +import static com.automq.stream.s3.wal.benchmark.BenchTool.parseArgs; + +/** + * RecoverTool is a tool to recover records in a WAL manually. + * It extends {@link BlockWALService} to use tools provided by {@link BlockWALService} + */ +public class RecoverTool extends BlockWALService implements AutoCloseable { + + public RecoverTool(Config config) throws IOException { + super(BlockWALService.recoveryBuilder(config.path)); + super.start(); + } + + public static void main(String[] args) throws IOException { + Namespace ns = parseArgs(Config.parser(), args); + Config config = new Config(ns); + + try (RecoverTool tool = new RecoverTool(config)) { + tool.run(config); + } + } + + private void run(Config config) { + WALHeader header = super.tryReadWALHeader(); + System.out.println(header); + + Iterable recordsSupplier = () -> recover(header, config); + Function decoder = StreamRecordBatchCodec::decode; + Function stringer = decoder.andThen(StreamRecordBatch::toString); + StreamSupport.stream(recordsSupplier.spliterator(), false) + .map(it -> new RecoverResultWrapper(it, stringer)) + .peek(System.out::println) + .forEach(RecoverResultWrapper::release); + } + + private Iterator recover(WALHeader header, Config config) { + long recoverOffset = config.offset != null ? config.offset : header.getTrimOffset(); + long windowLength = header.getSlidingWindowMaxLength(); + long skipRecordAtOffset = config.skipTrimmed ? header.getTrimOffset() : -1; + return new RecoverIterator(recoverOffset, windowLength, skipRecordAtOffset); + } + + @Override + public void close() { + super.shutdownGracefully(); + } + + /** + * A wrapper for {@link RecoverResult} to provide a function to convert {@link RecoverResult#record} to string + */ + public static class RecoverResultWrapper { + private final RecoverResult inner; + /** + * A function to convert {@link RecoverResult#record} to string + */ + private final Function stringer; + + public RecoverResultWrapper(RecoverResult inner, Function stringer) { + this.inner = inner; + this.stringer = stringer; + } + + public void release() { + inner.record().release(); + } + + @Override + public String toString() { + return String.format("%s{", inner.getClass().getSimpleName()) + + String.format("record=(%d)", inner.record().readableBytes()) + stringer.apply(inner.record()) + + ", offset=" + inner.recordOffset() + + '}'; + } + } + + public static class Config { + final String path; + final Long offset; + final Boolean skipTrimmed; + + Config(Namespace ns) { + this.path = ns.getString("path"); + this.offset = ns.getLong("offset"); + this.skipTrimmed = ns.getBoolean("skipTrimmed"); + } + + static ArgumentParser parser() { + ArgumentParser parser = ArgumentParsers + .newFor("RecoverTool") + .build() + .defaultHelp(true) + .description("Recover records in a WAL file"); + parser.addArgument("-p", "--path") + .required(true) + .help("Path of the WAL file"); + parser.addArgument("--offset") + .type(Long.class) + .help("Offset to start recovering, default to the trimmed offset in the WAL header"); + parser.addArgument("--skip-trimmed") + .dest("skipTrimmed") + .type(Boolean.class) + .setDefault(true) + .help("Whether to skip the record at the trimmed offset"); + return parser; + } + } +}