Skip to content

Commit

Permalink
fix(s3stream/wal): fix imports
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Nov 16, 2023
1 parent 109c06b commit 1587f96
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 23 deletions.
14 changes: 14 additions & 0 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerArgs>
<arg>--add-exports</arg>
<arg>java.base/sun.nio.ch=ALL-UNNAMED</arg>
<arg>--add-exports</arg>
<arg>java.base/jdk.internal.access=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package com.automq.stream.s3.wal.util;

import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOLib;
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOUtils;
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectRandomAccessFile;
import io.netty.buffer.ByteBuf;
import moe.cnkirito.kdio.DirectIOLib;
import moe.cnkirito.kdio.DirectRandomAccessFile;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

public class WALBlockDeviceChannel implements WALChannel {
private static final int BLOCK_SIZE = Integer.parseInt(System.getProperty(
"automq.ebswal.blocksize",
"4096"
));
private static final DirectIOLib DIRECT_IO_LIB = DirectIOLib.getLibForPath("/");
// TODO: move these to config
private static final int PREALLOCATED_BYTE_BUFFER_SIZE = Integer.parseInt(System.getProperty(
"automq.ebswal.preallocatedByteBufferSize",
Expand All @@ -43,31 +39,34 @@ public class WALBlockDeviceChannel implements WALChannel {

final String blockDevicePath;
final long capacityWant;
final DirectIOLib directIOLib;

long capacityFact = 0;

DirectRandomAccessFile randomAccessFile;

ThreadLocal<ByteBuffer> threadLocalByteBuffer = new ThreadLocal<ByteBuffer>() {
ThreadLocal<ByteBuffer> threadLocalByteBuffer = new ThreadLocal<>() {
@Override
protected ByteBuffer initialValue() {
return ByteBuffer.allocateDirect(PREALLOCATED_BYTE_BUFFER_SIZE);
return DirectIOUtils.allocateForDirectIO(directIOLib, PREALLOCATED_BYTE_BUFFER_SIZE);
}
};

public WALBlockDeviceChannel(String blockDevicePath, long blockDeviceCapacityWant) {
this.blockDevicePath = blockDevicePath;
this.capacityWant = blockDeviceCapacityWant;
DirectIOLib lib = DirectIOLib.getLibForPath(blockDevicePath);
if (null == lib || !DirectIOLib.binit) {
throw new RuntimeException("O_DIRECT not supported");
} else {
this.directIOLib = lib;
}
}

@Override
public void open() throws IOException {
if (DirectIOLib.binit) {
randomAccessFile = new DirectRandomAccessFile(new File(blockDevicePath), "rw");
capacityFact = randomAccessFile.length();
} else {
throw new RuntimeException("your system do not support direct io");
}
randomAccessFile = new DirectRandomAccessFile(new File(blockDevicePath), "rw");
capacityFact = randomAccessFile.length();
}

@Override
Expand All @@ -88,9 +87,11 @@ public long capacity() {
private void makeThreadLocalBytebufferMatchDirectIO(int inputBufferDirectIOAlignedSize) {
ByteBuffer byteBufferWrite = threadLocalByteBuffer.get();
if (inputBufferDirectIOAlignedSize > byteBufferWrite.capacity()) {
if (inputBufferDirectIOAlignedSize <= PREALLOCATED_BYTE_BUFFER_MAX_SIZE)
if (inputBufferDirectIOAlignedSize <= PREALLOCATED_BYTE_BUFFER_MAX_SIZE) {
threadLocalByteBuffer.set(ByteBuffer.allocateDirect(inputBufferDirectIOAlignedSize));
else throw new RuntimeException("too large write size");
} else {
throw new RuntimeException("too large write size");
}
}
}

Expand All @@ -107,18 +108,18 @@ public void write(ByteBuf buf, long position) throws IOException {
byteBufferWrite.position(0).limit(bufferDirectIOAlignedSize);

int remaining = byteBufferWrite.limit();
int writen = 0;
int written = 0;
do {
ByteBuffer slice = byteBufferWrite.slice().position(writen).limit(remaining);
ByteBuffer slice = byteBufferWrite.slice().position(written).limit(remaining);
// FIXME: make sure the position is aligned
int write = randomAccessFile.write(slice, position + writen);
int write = randomAccessFile.write(slice, position + written);
if (write == -1) {
throw new IOException("write -1");
} else if (write % BLOCK_SIZE != 0) {
} else if (write % WALUtil.BLOCK_SIZE != 0) {
// Should not happen. If it happens, it means that the system does not support direct IO
write -= write % BLOCK_SIZE;
write -= write % WALUtil.BLOCK_SIZE;
}
writen += write;
written += write;
remaining -= write;
} while (remaining > 0);
}
Expand Down

0 comments on commit 1587f96

Please sign in to comment.