Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in new tests for ByteBuf based Encoder/Decoder instances #18

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion protostream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ THE POSSIBILITY OF SUCH DAMAGE.
<artifactId>protostream-processor</artifactId>
<version>${protostream.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.92.Final</version>
</dependency>

</dependencies>

Expand All @@ -81,7 +86,7 @@ THE POSSIBILITY OF SUCH DAMAGE.
<!--
Java source/target to use for compilation.
-->
<javac.target>11</javac.target>
<javac.target>17</javac.target>

<!--
Name of the benchmark Uber-JAR to generate.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package org.infinispan.protostream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import org.infinispan.protostream.impl.TagReaderImpl.Decoder;

import io.netty.buffer.ByteBuf;

public class ByteBufDecoder extends Decoder {
private final ByteBuf buf;
private final int initialOffset;

public ByteBufDecoder(ByteBuf buf) {
this.buf = buf;
this.initialOffset = buf.readerIndex();
}

MalformedProtobufException truncated() {
// TODO: add this from log eventually
return new MalformedProtobufException("Input data ended unexpectedly in the middle of a field. The message is corrupt.");
}

MalformedProtobufException malformedVarint() {
// TODO: add this from log eventually
return new MalformedProtobufException("Encountered a malformed varint.");
}

@Override
protected int getEnd() {
return buf.writerIndex() - initialOffset;
}

@Override
protected int getPos() {
return buf.readableBytes() - initialOffset;
}

@Override
protected byte[] getBufferArray() {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
return bytes;
}

@Override
protected boolean isAtEnd() {
return buf.readableBytes() == 0;
}

@Override
protected void skipVarint() throws IOException {
for (int i = 0; i < 5; ++i) {
if (readRawByte() >= 0) {
return;
}
}
throw malformedVarint();
}

@Override
protected void skipRawBytes(int length) throws IOException {
try {
buf.skipBytes(length);
} catch (IndexOutOfBoundsException e) {
throw truncated();
}
}

@Override
protected String readString() throws IOException {
int length = readVarint32();
if (length > buf.readableBytes()) {
throw truncated();
}
String string = buf.toString(buf.readerIndex(), length, StandardCharsets.UTF_8);
buf.skipBytes(length);
return string;
}

@Override
protected byte readRawByte() throws IOException {
try {
return buf.readByte();
} catch (IndexOutOfBoundsException e) {
throw truncated();
}
}

@Override
protected byte[] readRawByteArray(int length) throws IOException {
if (length > buf.readableBytes()) {
throw truncated();
}
byte[] bytes = new byte[length];
buf.readBytes(bytes);
return bytes;
}

@Override
protected ByteBuffer readRawByteBuffer(int length) throws IOException {
if (length > buf.readableBytes()) {
throw truncated();
}
return buf.nioBuffer(buf.readerIndex(), length);
}

@Override
protected long readVarint64() throws IOException {
long value = 0;
for (int i = 0; i < 64; i += 7) {
byte b = readRawByte();
value |= (long) (b & 0x7F) << i;
if (b >= 0) {
return value;
}
}
throw malformedVarint();
}

@Override
protected int readFixed32() throws IOException {
try {
return buf.readInt();
} catch (IndexOutOfBoundsException e) {
throw truncated();
}
}

@Override
protected long readFixed64() throws IOException {
try {
return buf.readLong();
} catch (IndexOutOfBoundsException e) {
throw truncated();
}
}

@Override
protected int pushLimit(int newLimit) throws IOException {
int prev = buf.writerIndex() - initialOffset;
if (prev < newLimit) {
throw truncated();
}
buf.writerIndex(buf.readerIndex() + newLimit);
return prev;
}

@Override
protected void popLimit(int oldLimit) {
buf.writerIndex(oldLimit + initialOffset);
}

@Override
protected Decoder decoderFromLength(int length) {
ByteBuf slicedBuf = buf.readSlice(length);
return new ByteBufDecoder(slicedBuf);
}

@Override
protected int setGlobalLimit(int globalLimit) {
return Integer.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.infinispan.protostream;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.infinispan.protostream.impl.TagWriterImpl;

import io.netty.buffer.ByteBuf;

public class ByteBufEncoder extends TagWriterImpl.FixedVarintEncoder {
private final ByteBuf buf;

public ByteBufEncoder(ByteBuf buf) {
this.buf = buf;
}

private IOException outOfWriteBufferSpace(Throwable t) {
return new IOException("Ran out of buffer space", t);
}

@Override
protected void writeVarint32(int value) throws IOException {
try {
while (true) {
if ((value & 0xFFFFFF80) == 0) {
buf.writeByte((byte) value);
break;
} else {
buf.writeByte((byte) (value & 0x7F | 0x80));
value >>>= 7;
}
}
} catch (IndexOutOfBoundsException e) {
throw outOfWriteBufferSpace(e);
}
}

@Override
protected void writeVarint64(long value) throws IOException {
try {
while (true) {
if ((value & 0xFFFFFFFFFFFFFF80L) == 0) {
buf.writeByte((byte) value);
break;
} else {
buf.writeByte((byte) ((int) value & 0x7F | 0x80));
value >>>= 7;
}
}
} catch (IndexOutOfBoundsException e) {
throw outOfWriteBufferSpace(e);
}
}

@Override
protected void writeFixed32(int value) throws IOException {
try {
buf.writeInt(value);
} catch (IndexOutOfBoundsException e) {
throw outOfWriteBufferSpace(e);
}
}

@Override
protected void writeFixed64(long value) throws IOException {
try {
buf.writeLong(value);
} catch (IndexOutOfBoundsException e) {
throw outOfWriteBufferSpace(e);
}
}

@Override
protected void writeByte(byte value) throws IOException {
try {
buf.writeByte(value);
} catch (IndexOutOfBoundsException e) {
throw outOfWriteBufferSpace(e);
}
}

@Override
protected void writeBytes(byte[] value, int offset, int length) throws IOException {
try {
buf.writeBytes(value, offset, length);
} catch (IndexOutOfBoundsException e) {
throw outOfWriteBufferSpace(e);
}
}

@Override
protected void writeBytes(ByteBuffer value) throws IOException {
try {
buf.writeBytes(value);
} catch (IndexOutOfBoundsException e) {
throw outOfWriteBufferSpace(e);
}
}

@Override
protected int skipFixedVarint() {
int prev = buf.writerIndex();
buf.skipBytes(5);
return prev;
}

@Override
protected void writePositiveFixedVarint(int pos) {
int length = buf.writerIndex() - pos - 5;
buf.setByte(pos++, (byte) (length & 0x7F | 0x80));
buf.setByte(pos++, (byte) ((length >>> 7) & 0x7F | 0x80));
buf.setByte(pos++, (byte) ((length >>> 14) & 0x7F | 0x80));
buf.setByte(pos++, (byte) ((length >>> 21) & 0x7F | 0x80));
buf.setByte(pos, (byte) ((length >>> 28) & 0x7F));
}
}
Loading