Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): sequential allocate memory for record write #949

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream;

import com.automq.stream.s3.DirectByteBufAlloc;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.util.concurrent.atomic.AtomicReference;

public class DirectByteBufSeqAlloc {
public static final int HUGE_BUF_SIZE = 8 * 1024 * 1024;
// why not use ThreadLocal? the partition open has too much threads
final AtomicReference<HugeBuf>[] hugeBufArray = new AtomicReference[8];
private final int allocType;

public DirectByteBufSeqAlloc(int allocType) {
this.allocType = allocType;
for (int i = 0; i < hugeBufArray.length; i++) {
hugeBufArray[i] = new AtomicReference<>(new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)));
}
}

public ByteBuf byteBuffer(int capacity) {
if (capacity >= HUGE_BUF_SIZE) {
// if the request capacity is larger than HUGE_BUF_SIZE, just allocate a new ByteBuf
return DirectByteBufAlloc.byteBuffer(capacity, allocType);
}
int bufIndex = Math.abs(Thread.currentThread().hashCode() % hugeBufArray.length);

AtomicReference<HugeBuf> bufRef = hugeBufArray[bufIndex];
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (bufRef) {
HugeBuf hugeBuf = bufRef.get();

if (hugeBuf.nextIndex + capacity <= hugeBuf.buf.capacity()) {
// if the request capacity can be satisfied by the current hugeBuf, return a slice of it
int nextIndex = hugeBuf.nextIndex;
hugeBuf.nextIndex += capacity;
ByteBuf slice = hugeBuf.buf.retainedSlice(nextIndex, capacity);
return slice.writerIndex(slice.readerIndex());
}

// if the request capacity cannot be satisfied by the current hugeBuf
// 1. slice the remaining of the current hugeBuf and release the hugeBuf
// 2. create a new hugeBuf and slice the remaining of the required capacity
// 3. return the composite ByteBuf of the two slices
CompositeByteBuf cbf = DirectByteBufAlloc.compositeByteBuffer();
int readLength = hugeBuf.buf.capacity() - hugeBuf.nextIndex;
cbf.addComponent(false, hugeBuf.buf.retainedSlice(hugeBuf.nextIndex, readLength));
capacity -= readLength;
hugeBuf.buf.release();

HugeBuf newHugeBuf = new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType));
bufRef.set(newHugeBuf);

cbf.addComponent(false, newHugeBuf.buf.retainedSlice(0, capacity));
newHugeBuf.nextIndex = capacity;

return cbf;
}
}

static class HugeBuf {
final ByteBuf buf;
int nextIndex;

HugeBuf(ByteBuf buf) {
this.buf = buf;
this.nextIndex = 0;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.stream.s3;

import com.automq.stream.DirectByteBufSeqAlloc;
import com.automq.stream.s3.model.StreamRecordBatch;
import io.netty.buffer.ByteBuf;

Expand All @@ -25,10 +26,12 @@ public class StreamRecordBatchCodec {
+ 8 // baseOffset
+ 4 // lastOffsetDelta
+ 4; // payload length
private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD);

public static ByteBuf encode(StreamRecordBatch streamRecord) {
int totalLength = HEADER_SIZE + streamRecord.size(); // payload
ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength, ENCODE_RECORD);
// use sequential allocator to avoid memory fragmentation
ByteBuf buf = ENCODE_ALLOC.byteBuffer(totalLength);
buf.writeByte(MAGIC_V0);
buf.writeLong(streamRecord.getStreamId());
buf.writeLong(streamRecord.getEpoch());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
package com.automq.stream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class DirectByteBufSeqAllocTest {

@Test
public void testAlloc() {
DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0);

AtomicReference<DirectByteBufSeqAlloc.HugeBuf> bufRef = alloc.hugeBufArray[Math.abs(Thread.currentThread().hashCode() % alloc.hugeBufArray.length)];

ByteBuf buf1 = alloc.byteBuffer(12);
buf1.writeLong(1);
buf1.writeInt(2);

ByteBuf buf2 = alloc.byteBuffer(20);
buf2.writeLong(3);
buf2.writeInt(4);
buf2.writeLong(5);

ByteBuf buf3 = alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12 - 20 - 4);

ByteBuf oldHugeBuf = bufRef.get().buf;

ByteBuf buf4 = alloc.byteBuffer(16);
buf4.writeLong(6);
buf4.writeLong(7);

assertTrue(oldHugeBuf != bufRef.get().buf);

assertEquals(1, buf1.readLong());
assertEquals(2, buf1.readInt());
assertEquals(3, buf2.readLong());
assertEquals(4, buf2.readInt());
assertEquals(5, buf2.readLong());
assertInstanceOf(CompositeByteBuf.class, buf4);
assertEquals(6, buf4.readLong());
assertEquals(7, buf4.readLong());

buf1.release();
buf2.release();
buf3.release();
buf4.release();
assertEquals(0, oldHugeBuf.refCnt());
assertEquals(1, bufRef.get().buf.refCnt());

ByteBuf oldHugeBuf2 = bufRef.get().buf;

alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12).release();
alloc.byteBuffer(12).release();
assertEquals(0, oldHugeBuf2.refCnt());
}

}
Loading