diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java index 091e40b08..471ab9c3e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java @@ -56,6 +56,9 @@ protected ByteBuffer initialValue() { public WALBlockDeviceChannel(String blockDevicePath, long blockDeviceCapacityWant) { this.blockDevicePath = blockDevicePath; this.capacityWant = blockDeviceCapacityWant; + if (blockDeviceCapacityWant != WALUtil.alignSmallByBlockSize(blockDeviceCapacityWant)) { + throw new RuntimeException("wal capacity must be aligned by block size when using block device"); + } DirectIOLib lib = DirectIOLib.getLibForPath(blockDevicePath); if (null == lib || !DirectIOLib.binit) { throw new RuntimeException("O_DIRECT not supported"); diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java index aa677fcba..90cbdabcb 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java @@ -78,21 +78,6 @@ public void testSingleThreadAppendBasicDirectIO(boolean mergeWrite) throws IOExc testSingleThreadAppendBasic0(mergeWrite, true); } - /** - * Write "zero"s to the block device to reset it. - */ - private static void resetBlockDevice(String path, long capacity) throws IOException { - WALChannel channel = WALChannel.builder(path) - .capacity(capacity) - .direct(true) - .build(); - channel.open(); - ByteBuf buf = Unpooled.buffer((int) capacity); - buf.writeZero((int) capacity); - channel.write(buf, 0); - channel.close(); - } - private static void testSingleThreadAppendBasic0(boolean mergeWrite, boolean directIO) throws IOException, OverCapacityException { final int recordSize = 4096 + 1; final int recordCount = 100; @@ -174,6 +159,7 @@ private static void testSingleThreadAppendWhenOverCapacity0(boolean mergeWrite, String path = TestUtils.tempFilePath(); if (directIO && TEST_BLOCK_DEVICE != null) { path = TEST_BLOCK_DEVICE; + blockDeviceCapacity = WALUtil.alignLargeByBlockSize(blockDeviceCapacity); resetBlockDevice(path, blockDeviceCapacity); } @@ -393,6 +379,7 @@ private void testSingleThreadRecover0(boolean shutdown, boolean overCapacity, in if (directIO && TEST_BLOCK_DEVICE != null) { path = TEST_BLOCK_DEVICE; + blockDeviceCapacity = WALUtil.alignLargeByBlockSize(blockDeviceCapacity); resetBlockDevice(path, blockDeviceCapacity); } @@ -465,6 +452,7 @@ private static void testRecoverAfterMergeWrite0(boolean shutdown, boolean overCa if (directIO && TEST_BLOCK_DEVICE != null) { path = TEST_BLOCK_DEVICE; + blockDeviceCapacity = WALUtil.alignLargeByBlockSize(blockDeviceCapacity); resetBlockDevice(path, blockDeviceCapacity); } @@ -900,6 +888,7 @@ private void testRecoverFromDisaster0( String path = TestUtils.tempFilePath(); if (directIO && TEST_BLOCK_DEVICE != null) { path = TEST_BLOCK_DEVICE; + capacity = WALUtil.alignLargeByBlockSize(capacity); resetBlockDevice(path, capacity); } @@ -913,6 +902,10 @@ private void testRecoverFromDisaster0( walChannel.open(); writeWALHeader(walChannel, trimOffset, startOffset, nextOffset, maxLength); for (long writeOffset : writeOffsets) { + if (directIO && TEST_BLOCK_DEVICE != null && writeOffset % WALUtil.BLOCK_SIZE != 0) { + // skip the test as we can't write to un-aligned position on block device + return; + } write(walChannel, writeOffset, recordSize); } walChannel.close(); @@ -1033,4 +1026,19 @@ public void testWindowGreaterThanCapacity() throws IOException, OverCapacityExce wal.shutdownGracefully(); } } + + /** + * Write "0"s to the block device to reset it. + */ + private static void resetBlockDevice(String path, long capacity) throws IOException { + WALChannel channel = WALChannel.builder(path) + .capacity(capacity) + .direct(true) + .build(); + channel.open(); + ByteBuf buf = Unpooled.buffer((int) capacity); + buf.writeZero((int) capacity); + channel.write(buf, 0); + channel.close(); + } }