Skip to content

Commit

Permalink
[ISSUE apache#7757] Use CompositeByteBuf to prevent memory copy. (a…
Browse files Browse the repository at this point in the history
…pache#7694)

* Use CompositeByteBuf to prevent mem_copy.

* Fix code

* Add tests

* Remove useless UTs

* Remove unused imports.

---------

Co-authored-by: RongtongJin <[email protected]>
  • Loading branch information
dao-jun and RongtongJin authored Jan 17, 2024
1 parent 1242a58 commit 7a36d4d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.rocketmq.remoting.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToByteEncoder;
Expand Down Expand Up @@ -51,9 +54,12 @@ protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf o
WritableByteChannel writableByteChannel = new WritableByteChannel() {
@Override
public int write(ByteBuffer src) {
int prev = out.writerIndex();
out.writeBytes(src);
return out.writerIndex() - prev;
// To prevent mem_copy.
CompositeByteBuf b = (CompositeByteBuf) out;
// Have to increase writerIndex manually.
ByteBuf unpooled = Unpooled.wrappedBuffer(src);
b.addComponent(true, unpooled);
return unpooled.readableBytes();
}

@Override
Expand All @@ -76,4 +82,10 @@ public void close() throws IOException {
msg.transferTo(writableByteChannel, transferred);
}
}
}

@Override
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, FileRegion msg, boolean preferDirect) throws Exception {
ByteBufAllocator allocator = ctx.alloc();
return preferDirect ? allocator.compositeDirectBuffer() : allocator.compositeHeapBuffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.FileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;

public class FileRegionEncoderTest {

Expand Down

0 comments on commit 7a36d4d

Please sign in to comment.