From 2aa149b583110a3fe0214d7ce0b117955ed0b5d3 Mon Sep 17 00:00:00 2001 From: minux Date: Mon, 5 Dec 2022 17:30:37 +0900 Subject: [PATCH] Add Bytes that represents binary data (#4505) Motivation: We create an `HttpData` by wrapping `ByteBuf` or `byte[]` that represent binary data. `ByteArrayHttpData` or `ByteBufHttpData` is internally used to operate on the binary data. Because it's an `HttpData` we cannot share the logic if we add other classes that are binary data such as WebSocket frames. Modifications: - Add `Bytes` and its implementations: `ByteArrayBytes`, `ByteBufBytes` - `HttpData` uses them internally. - Add more methods to `StreamDecoderInput` that will be used for WebSocket. Result: - Ready to work on #3904 --- .../armeria/core/HttpServerBenchmark.java | 29 +- .../armeria/common/ByteArrayHttpData.java | 193 +++-------- .../armeria/common/ByteBufHttpData.java | 299 ++++++------------ .../com/linecorp/armeria/common/Bytes.java | 197 ++++++++++++ .../com/linecorp/armeria/common/HttpData.java | 175 +--------- .../common/stream/StreamDecoderInput.java | 25 ++ .../internal/common/ByteArrayBytes.java | 183 +++++++++++ .../armeria/internal/common/ByteBufBytes.java | 268 ++++++++++++++++ .../common/stream/ByteBufsDecoderInput.java | 95 ++++++ .../armeria/unsafe/PooledObjects.java | 46 +-- .../linecorp/armeria/common/HttpDataTest.java | 77 +++++ .../PublisherBasedHttpResponseTest.java | 2 +- .../common/stream/StreamMessageTest.java | 4 +- .../common/ByteArrayBytesTest.java} | 88 ++---- .../common/ByteBufBytesTest.java} | 78 +++-- .../stream/ByteBufsDecoderInputTest.java | 70 ++++ .../server/grpc/UnaryDecoderInput.java | 10 + 17 files changed, 1163 insertions(+), 676 deletions(-) create mode 100644 core/src/main/java/com/linecorp/armeria/common/Bytes.java create mode 100644 core/src/main/java/com/linecorp/armeria/internal/common/ByteArrayBytes.java create mode 100644 core/src/main/java/com/linecorp/armeria/internal/common/ByteBufBytes.java rename core/src/test/java/com/linecorp/armeria/{common/ByteArrayHttpDataTest.java => internal/common/ByteArrayBytesTest.java} (50%) rename core/src/test/java/com/linecorp/armeria/{common/ByteBufHttpDataTest.java => internal/common/ByteBufBytesTest.java} (69%) diff --git a/benchmarks/jmh/src/jmh/java/com/linecorp/armeria/core/HttpServerBenchmark.java b/benchmarks/jmh/src/jmh/java/com/linecorp/armeria/core/HttpServerBenchmark.java index ff6e5a4d82f..b258f9fb12b 100644 --- a/benchmarks/jmh/src/jmh/java/com/linecorp/armeria/core/HttpServerBenchmark.java +++ b/benchmarks/jmh/src/jmh/java/com/linecorp/armeria/core/HttpServerBenchmark.java @@ -30,9 +30,12 @@ import com.linecorp.armeria.client.Clients; import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpResponseWriter; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.ResponseHeaders; import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.metric.NoopMeterRegistry; import com.linecorp.armeria.server.Server; @@ -67,13 +70,30 @@ String uriText() { @Param private Protocol protocol; + @Param("100") + private int chunkCount; + @Setup public void startServer() throws Exception { - final byte[] PLAINTEXT = "Hello, World!".getBytes(StandardCharsets.UTF_8); + final byte[] plaintext = "Hello, World!".getBytes(StandardCharsets.UTF_8); + server = Server.builder() .service("/empty", (ctx, req) -> HttpResponse.of(HttpStatus.OK)) .service("/plaintext", (ctx, req) -> HttpResponse - .of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, PLAINTEXT)) + .of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, plaintext)) + .service("/streaming", (ctx, req) -> { + final HttpResponseWriter writer = HttpResponse.streaming(); + writer.write(ResponseHeaders.of(200)); + for (int i = 0; i < chunkCount; i++) { + if (i == chunkCount - 1) { + writer.write(HttpData.wrap(plaintext).withEndOfStream()); + writer.close(); + } else { + writer.write(HttpData.wrap(plaintext)); + } + } + return writer; + }) .requestTimeout(Duration.ZERO) .meterRegistry(NoopMeterRegistry.get()) .build(); @@ -108,6 +128,11 @@ public void empty(Blackhole bh, AsyncCounters counters) throws Exception { })); } + @Benchmark + public void streaming(Blackhole bh) throws Exception { + bh.consume(webClient.get("/streaming").aggregate().join()); + } + /** * A benchmark for a test designed to demonstrate the capacity about {@code MediaType.PLAIN_TEXT_UTF_8}. * diff --git a/core/src/main/java/com/linecorp/armeria/common/ByteArrayHttpData.java b/core/src/main/java/com/linecorp/armeria/common/ByteArrayHttpData.java index f9c19ec6453..5e71d2b4620 100644 --- a/core/src/main/java/com/linecorp/armeria/common/ByteArrayHttpData.java +++ b/core/src/main/java/com/linecorp/armeria/common/ByteArrayHttpData.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 LINE Corporation + * Copyright 2022 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -13,201 +13,82 @@ * License for the specific language governing permissions and limitations * under the License. */ -package com.linecorp.armeria.common; -import static java.util.Objects.requireNonNull; +package com.linecorp.armeria.common; -import java.io.InputStream; -import java.nio.charset.Charset; import java.util.Arrays; -import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals; +import com.linecorp.armeria.internal.common.ByteArrayBytes; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.util.internal.EmptyArrays; -import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; +final class ByteArrayHttpData extends ByteArrayBytes implements HttpData { -/** - * A {@code byte[]}-based {@link HttpData}. - */ -final class ByteArrayHttpData implements HttpData { + private static final byte[] EMPTY_BYTES = {}; - static final ByteArrayHttpData EMPTY = new ByteArrayHttpData(EmptyArrays.EMPTY_BYTES, false); - static final ByteArrayHttpData EMPTY_EOS = new ByteArrayHttpData(EmptyArrays.EMPTY_BYTES, true); + static final ByteArrayHttpData EMPTY = new ByteArrayHttpData(EMPTY_BYTES); + static final ByteArrayHttpData EMPTY_EOS = new ByteArrayHttpData(EMPTY_BYTES, true); - private static final byte[] SAFE_OCTETS = new byte[256]; - - static { - final String safeOctetStr = "`~!@#$%^&*()-_=+\t[{]}\\|;:'\",<.>/?" + - "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; - for (int i = 0; i < safeOctetStr.length(); i++) { - SAFE_OCTETS[safeOctetStr.charAt(i)] = -1; - } - } - - private final byte[] array; private final boolean endOfStream; + /** + * Creates a new instance. + */ ByteArrayHttpData(byte[] array) { - this(array, false); + super(array); + endOfStream = false; } private ByteArrayHttpData(byte[] array, boolean endOfStream) { - this.array = array; + super(array); this.endOfStream = endOfStream; } @Override - public byte[] array() { - return array; - } - - @Override - public int length() { - return array.length; - } - - @Override - public String toString(Charset charset) { - requireNonNull(charset, "charset"); - return new String(array, charset); - } - - @Override - public String toString() { - if (array.length == 0) { - return isEndOfStream() ? "{0B, EOS}" : "{0B}"; - } - - try (TemporaryThreadLocals tempThreadLocals = TemporaryThreadLocals.acquire()) { - final StringBuilder buf = tempThreadLocals.stringBuilder(); - buf.append('{').append(array.length); - - if (isEndOfStream()) { - buf.append("B, EOS, "); - } else { - buf.append("B, "); - } - - return appendPreviews(buf, array, 0, Math.min(16, array.length)) - .append('}').toString(); - } - } - - static StringBuilder appendPreviews(StringBuilder buf, byte[] array, int offset, int previewLength) { - // Append the hex preview if contains non-ASCII chars. - final int endOffset = offset + previewLength; - for (int i = offset; i < endOffset; i++) { - if (SAFE_OCTETS[array[i] & 0xFF] == 0) { - return buf.append("hex=").append(ByteBufUtil.hexDump(array, offset, previewLength)); - } - } - - // Append the text preview otherwise. - return buf.append("text=").append(new String(array, 0, offset, previewLength)); - } - - @Override - public InputStream toInputStream() { - return new FastByteArrayInputStream(array); - } - - @Override - public boolean isEndOfStream() { - return endOfStream; - } - - @Override - public ByteArrayHttpData withEndOfStream(boolean endOfStream) { - if (isEndOfStream() == endOfStream) { + public HttpData withEndOfStream(boolean endOfStream) { + if (this.endOfStream == endOfStream) { return this; } - if (isEmpty()) { return endOfStream ? EMPTY_EOS : EMPTY; } - return new ByteArrayHttpData(array, endOfStream); - } - - @Override - public boolean isPooled() { - return false; - } - - @Override - public ByteBuf byteBuf(ByteBufAccessMode mode) { - requireNonNull(mode, "mode"); - if (isEmpty()) { - return Unpooled.EMPTY_BUFFER; - } - - if (mode != ByteBufAccessMode.FOR_IO) { - return Unpooled.wrappedBuffer(array); - } else { - final ByteBuf copy = newDirectByteBuf(); - copy.writeBytes(array); - return copy; - } + return new ByteArrayHttpData(array(), endOfStream); } @Override - public ByteBuf byteBuf(int offset, int length, ByteBufAccessMode mode) { - requireNonNull(mode, "mode"); - if (length == 0) { - return Unpooled.EMPTY_BUFFER; - } - - if (mode != ByteBufAccessMode.FOR_IO) { - return Unpooled.wrappedBuffer(array, offset, length); - } else { - final ByteBuf copy = newDirectByteBuf(length); - copy.writeBytes(array, offset, length); - return copy; - } - } - - private ByteBuf newDirectByteBuf() { - return newDirectByteBuf(length()); - } - - private static ByteBuf newDirectByteBuf(int length) { - return PooledByteBufAllocator.DEFAULT.directBuffer(length); + public boolean isEndOfStream() { + return endOfStream; } - @Override - public void close() {} - + @SuppressWarnings("RedundantMethodOverride") @Override public int hashCode() { - // Calculate the hash code from the first 32 bytes. - final int end = Math.min(32, length()); - - int hash = 0; - for (int i = 0; i < end; i++) { - hash = hash * 31 + array[i]; - } - return hash; + // Use hashcode in ByteBufBytes because we don't use endOfStream in equals. + return super.hashCode(); } @Override - public boolean equals(Object obj) { - if (!(obj instanceof HttpData)) { - return false; - } - - if (obj == this) { + public boolean equals(Object o) { + if (this == o) { return true; } + if (!(o instanceof HttpData)) { + return false; + } - final HttpData that = (HttpData) obj; + final HttpData that = (HttpData) o; if (length() != that.length()) { return false; } - return Arrays.equals(array, that.array()); + return Arrays.equals(array(), that.array()); + } + + @Override + public String toString() { + final String toString = super.toString(); + if (!isEndOfStream()) { + return toString; + } + return toString + ", {EOS}"; } } diff --git a/core/src/main/java/com/linecorp/armeria/common/ByteBufHttpData.java b/core/src/main/java/com/linecorp/armeria/common/ByteBufHttpData.java index c7024647454..bdb6240d1fe 100644 --- a/core/src/main/java/com/linecorp/armeria/common/ByteBufHttpData.java +++ b/core/src/main/java/com/linecorp/armeria/common/ByteBufHttpData.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 LINE Corporation + * Copyright 2022 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -13,276 +13,153 @@ * License for the specific language governing permissions and limitations * under the License. */ + package com.linecorp.armeria.common; -import static com.google.common.base.MoreObjects.firstNonNull; -import static java.util.Objects.requireNonNull; +import static com.linecorp.armeria.common.ByteArrayHttpData.EMPTY; +import static com.linecorp.armeria.common.ByteArrayHttpData.EMPTY_EOS; import java.io.InputStream; import java.nio.charset.Charset; -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals; +import com.linecorp.armeria.internal.common.ByteBufBytes; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.util.IllegalReferenceCountException; import io.netty.util.ResourceLeakHint; -import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; - -/** - * A {@link ByteBuf}-based {@link HttpData}. - */ -final class ByteBufHttpData implements HttpData, ResourceLeakHint { - private static final int FLAG_POOLED = 1; - private static final int FLAG_END_OF_STREAM = 2; - private static final int FLAG_CLOSED = 4; - - private final ByteBuf buf; - @Nullable - private byte[] array; - private int flags; +final class ByteBufHttpData extends ByteBufBytes implements HttpData { ByteBufHttpData(ByteBuf buf, boolean pooled) { - this(buf, pooled ? FLAG_POOLED : 0, null); - } - - private ByteBufHttpData(ByteBuf buf, int flags, @Nullable byte[] array) { - this.buf = buf; - this.array = array; - this.flags = flags; + super(buf, pooled); } @Override - public byte[] array() { - if (array != null) { - return array; + public HttpData withEndOfStream(boolean endOfStream) { + if (!endOfStream) { + return this; } - final int length = buf.readableBytes(); - if (isPooled()) { - buf.touch(this); - // We don't use the pooled buffer's underlying array here, - // because it will be in use by others when 'buf' is released. - } else if (buf.hasArray() && buf.arrayOffset() == 0 && buf.readerIndex() == 0) { - final byte[] bufArray = buf.array(); - if (bufArray.length == length) { - return array = bufArray; - } + if (isEmpty()) { + return EMPTY_EOS; } - return array = ByteBufUtil.getBytes(buf, buf.readerIndex(), length); + return new EndOfStreamByteBufHttpData(this); } @Override - public int length() { - return buf.readableBytes(); + public boolean isEndOfStream() { + return false; } + @SuppressWarnings("RedundantMethodOverride") @Override - public String toString(Charset charset) { - requireNonNull(charset, "charset"); - if (array != null) { - return new String(array, charset); - } else { - return buf.toString(charset); - } + public int hashCode() { + // Use hashcode in ByteBufBytes because we don't use endOfStream in equals. + return super.hashCode(); } @Override - public String toString() { - return toString(false); - } - - private String toString(boolean hint) { - final int length = buf.readableBytes(); - - try (TemporaryThreadLocals tempThreadLocals = TemporaryThreadLocals.acquire()) { - final StringBuilder strBuf = tempThreadLocals.stringBuilder(); - strBuf.append('{').append(length).append("B, "); - - if (isEndOfStream()) { - strBuf.append("EOS, "); - } - if (isPooled()) { - strBuf.append("pooled, "); - } - if ((flags & FLAG_CLOSED) != 0) { - if (buf.refCnt() == 0) { - return strBuf.append("closed}").toString(); - } else { - strBuf.append("closed, "); - } - } + public boolean equals(Object obj) { + if (!(obj instanceof HttpData)) { + return false; + } - // Generate the preview array. - final int previewLength = Math.min(16, length); - byte[] array = this.array; - final int offset; - if (array == null) { - try { - if (buf.hasArray()) { - array = buf.array(); - offset = buf.arrayOffset() + buf.readerIndex(); - } else if (!hint) { - array = ByteBufUtil.getBytes(buf, buf.readerIndex(), previewLength); - offset = 0; - if (previewLength == length) { - this.array = array; - } - } else { - // Can't call getBytes() when generating the hint string - // because it will also create a leak record. - return strBuf.append("}").toString(); - } - } catch (IllegalReferenceCountException e) { - // Shouldn't really happen when used ByteBuf correctly, - // but we just don't make toString() fail because of this. - return strBuf.append("badRefCnt}").toString(); - } - } else { - offset = 0; - } + if (obj == this) { + return true; + } - return ByteArrayHttpData.appendPreviews(strBuf, array, offset, previewLength) - .append('}').toString(); + final HttpData that = (HttpData) obj; + if (length() != that.length()) { + return false; } - } - @Override - public String toHintString() { - return toString(true); + return ByteBufUtil.equals(buf(), that.byteBuf()); } - @Override - public InputStream toInputStream() { - if (array != null) { - return new FastByteArrayInputStream(array); - } else { - return new ByteBufInputStream(buf.duplicate(), false); - } - } + private static class EndOfStreamByteBufHttpData implements ResourceLeakHint, HttpData { - @Override - public boolean isEndOfStream() { - return (flags & FLAG_END_OF_STREAM) != 0; - } + private final ByteBufHttpData delegate; - @Override - public ByteBufHttpData withEndOfStream(boolean endOfStream) { - if (isEndOfStream() == endOfStream) { - return this; + EndOfStreamByteBufHttpData(ByteBufHttpData delegate) { + this.delegate = delegate; } - int newFlags = flags & ~FLAG_END_OF_STREAM; - if (endOfStream) { - newFlags |= FLAG_END_OF_STREAM; + @Override + public byte[] array() { + return delegate.array(); } - return new ByteBufHttpData(buf, newFlags, array); - } - - @Override - public boolean isPooled() { - return (flags & FLAG_POOLED) != 0; - } - - @Override - public ByteBuf byteBuf(ByteBufAccessMode mode) { - switch (requireNonNull(mode, "mode")) { - case DUPLICATE: - return buf.duplicate(); - case RETAINED_DUPLICATE: - return buf.retainedDuplicate(); - case FOR_IO: - if (buf.isDirect()) { - return buf.retainedDuplicate(); - } - - final ByteBuf copy = newDirectByteBuf(); - copy.writeBytes(buf, buf.readerIndex(), buf.readableBytes()); - return copy; + @Override + public int length() { + return delegate.length(); } - throw new Error(); // Never reaches here. - } - - @Override - public ByteBuf byteBuf(int offset, int length, ByteBufAccessMode mode) { - final int startIndex = buf.readerIndex() + offset; - switch (requireNonNull(mode, "mode")) { - case DUPLICATE: - return buf.slice(startIndex, length); - case RETAINED_DUPLICATE: - return buf.retainedSlice(startIndex, length); - case FOR_IO: - if (buf.isDirect()) { - return buf.retainedSlice(startIndex, length); - } - - final ByteBuf copy = newDirectByteBuf(length); - copy.writeBytes(buf, startIndex, length); - return copy; + @Override + public String toString(Charset charset) { + return delegate.toString(charset); } - throw new Error(); // Never reaches here. - } + @Override + public String toString() { + return delegate + ", {EOS}"; + } - private ByteBuf newDirectByteBuf() { - return newDirectByteBuf(buf.readableBytes()); - } + @Override + public InputStream toInputStream() { + return delegate.toInputStream(); + } - private static ByteBuf newDirectByteBuf(int length) { - return PooledByteBufAllocator.DEFAULT.directBuffer(length); - } + @Override + public boolean isPooled() { + return delegate.isPooled(); + } - @Override - public void touch(@Nullable Object hint) { - if (isPooled()) { - buf.touch(firstNonNull(hint, this)); + @Override + public ByteBuf byteBuf(ByteBufAccessMode mode) { + return delegate.byteBuf(mode); } - } - @Override - public void close() { - // This is not thread safe, but an attempt to close one instance from multiple threads would fail - // with an IllegalReferenceCountException anyway. - if ((flags & (FLAG_POOLED | FLAG_CLOSED)) == FLAG_POOLED) { - flags |= FLAG_CLOSED; - buf.release(); + @Override + public ByteBuf byteBuf(int offset, int length, ByteBufAccessMode mode) { + return delegate.byteBuf(offset, length, mode); } - } - @Override - public int hashCode() { - // Calculate the hash code from the first 32 bytes. - int hash = 0; - final int bufStart = buf.readerIndex(); - final int bufEnd = bufStart + Math.min(32, buf.readableBytes()); - for (int i = bufStart; i < bufEnd; i++) { - hash = hash * 31 + buf.getByte(i); + @Override + public void close() { + delegate.close(); } - return hash; - } - @Override - public boolean equals(Object obj) { - if (!(obj instanceof HttpData)) { - return false; + @Override + public HttpData withEndOfStream(boolean endOfStream) { + if (endOfStream) { + return this; + } + if (isEmpty()) { + return EMPTY; + } + return delegate; } - if (obj == this) { + @Override + public boolean isEndOfStream() { return true; } - final HttpData that = (HttpData) obj; - if (buf.readableBytes() != that.length()) { - return false; + @Override + public String toHintString() { + return delegate.toHintString(); + } + + @Override + public int hashCode() { + return delegate.hashCode(); } - return ByteBufUtil.equals(buf, that.byteBuf()); + @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") + @Override + public boolean equals(Object obj) { + return delegate.equals(obj); + } } } diff --git a/core/src/main/java/com/linecorp/armeria/common/Bytes.java b/core/src/main/java/com/linecorp/armeria/common/Bytes.java new file mode 100644 index 00000000000..810c5abd917 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/Bytes.java @@ -0,0 +1,197 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.common; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executor; + +import org.reactivestreams.Subscriber; + +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.stream.StreamMessage; +import com.linecorp.armeria.common.stream.SubscriptionOption; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.server.file.HttpFile; +import com.linecorp.armeria.unsafe.PooledObjects; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + +/** + * Represents binary data. + */ +@UnstableApi +public interface Bytes extends SafeCloseable { + + /** + * Returns the underlying byte array of this data. Do not modify the content of the returned byte array. + * Any changes made in the returned array may or may not affect the content of this data. + */ + byte[] array(); + + /** + * Returns the length of this data. + */ + int length(); + + /** + * Returns whether the {@link #length()} of this data is 0. + */ + default boolean isEmpty() { + return length() == 0; + } + + /** + * Decodes this data into a {@link String}. + * + * @param charset the {@link Charset} to use for decoding this data + * + * @return the decoded {@link String} + */ + String toString(Charset charset); + + /** + * Decodes this data into a {@link String} using UTF-8 encoding. + * + * @return the decoded {@link String} + */ + default String toStringUtf8() { + return toString(StandardCharsets.UTF_8); + } + + /** + * Decodes this data into a {@link String} using US-ASCII encoding. + * + * @return the decoded {@link String} + */ + default String toStringAscii() { + return toString(StandardCharsets.US_ASCII); + } + + /** + * Returns a new {@link InputStream} that is sourced from this data. + */ + InputStream toInputStream(); + + /** + * Returns a new {@link Reader} that is sourced from this data and decoded using the specified + * {@link Charset}. + */ + default Reader toReader(Charset charset) { + requireNonNull(charset, "charset"); + return new InputStreamReader(toInputStream(), charset); + } + + /** + * Returns a new {@link Reader} that is sourced from this data and decoded using + * {@link StandardCharsets#UTF_8}. + */ + default Reader toReaderUtf8() { + return toReader(StandardCharsets.UTF_8); + } + + /** + * Returns a new {@link Reader} that is sourced from this data and decoded using + * {@link StandardCharsets#US_ASCII}. + */ + default Reader toReaderAscii() { + return toReader(StandardCharsets.US_ASCII); + } + + /** + * (Advanced users only) Returns whether this data is pooled. Note, if this method returns {@code true}, + * you must call {@link #close()} once you no longer need this data, because its underlying {@link ByteBuf} + * will not be released automatically. + * + * @see PooledObjects + */ + boolean isPooled(); + + /** + * (Advanced users only) Returns a new duplicate of the underlying {@link ByteBuf} of this data. + * This method does not transfer the ownership of the underlying {@link ByteBuf}, i.e. the reference + * count of the {@link ByteBuf} does not change. If this data is not pooled, the returned {@link ByteBuf} + * is not pooled, either, which means you need to worry about releasing it only when you created this data + * with pooled objects. Any changes made in the content of the returned {@link ByteBuf} affects + * the content of this data. + * + * @see PooledObjects + */ + default ByteBuf byteBuf() { + return byteBuf(ByteBufAccessMode.DUPLICATE); + } + + /** + * (Advanced users only) Returns a new duplicate, retained duplicate or direct copy of the underlying + * {@link ByteBuf} of this data based on the specified {@link ByteBufAccessMode}. + * This method does not transfer the ownership of the underlying {@link ByteBuf}, i.e. the reference + * count of the {@link ByteBuf} does not change. If this data is not pooled, the returned {@link ByteBuf} + * is not pooled, either, which means you need to worry about releasing it only when you created this data + * with pooled objects. Any changes made in the content of the returned {@link ByteBuf} affects + * the content of this data. + * + * @see PooledObjects + */ + ByteBuf byteBuf(ByteBufAccessMode mode); + + /** + * (Advanced users only) Returns a new slice, retained slice or direct copy of the underlying + * {@link ByteBuf} of this data based on the specified {@link ByteBufAccessMode}. + * This method does not transfer the ownership of the underlying {@link ByteBuf}, i.e. the reference + * count of the {@link ByteBuf} does not change. If this data is not pooled, the returned {@link ByteBuf} + * is not pooled, either, which means you need to worry about releasing it only when you created this data + * with pooled objects. Any changes made in the content of the returned {@link ByteBuf} affects + * the content of this data. + * + * @see PooledObjects + */ + ByteBuf byteBuf(int offset, int length, ByteBufAccessMode mode); + + /** + * (Advanced users only) Records the current access location of this data for debugging purposes. + * If this data is determined to be leaked, the information recorded by this operation will be provided to + * you via {@link ResourceLeakDetector}. + */ + default void touch(@Nullable Object hint) {} + + /** + * Releases the underlying {@link ByteBuf} if this data was created with pooled objects. + * Otherwise, this method does nothing. You may want to call this method to reclaim the underlying + * {@link ByteBuf} when using operations that return pooled objects, such as: + * + * If you don't use such operations, you don't need to call this method. + * + * @see PooledObjects + */ + @Override + void close(); +} diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpData.java b/core/src/main/java/com/linecorp/armeria/common/HttpData.java index baa7f6529e3..4fe43ff9bed 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpData.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpData.java @@ -17,9 +17,6 @@ import static java.util.Objects.requireNonNull; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; @@ -27,31 +24,21 @@ import java.util.Arrays; import java.util.Formatter; import java.util.Locale; -import java.util.concurrent.Executor; - -import org.reactivestreams.Subscriber; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; -import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.common.stream.StreamMessage; -import com.linecorp.armeria.common.stream.SubscriptionOption; -import com.linecorp.armeria.common.util.SafeCloseable; -import com.linecorp.armeria.server.file.HttpFile; import com.linecorp.armeria.unsafe.PooledObjects; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import io.netty.util.ResourceLeakDetector; /** * HTTP/2 data that contains a chunk of bytes. */ -public interface HttpData extends HttpObject, SafeCloseable { +public interface HttpData extends HttpObject, Bytes { /** * Returns an empty {@link HttpData}. @@ -116,10 +103,10 @@ static HttpData wrap(ByteBuf buf) { final int length = buf.readableBytes(); if (length == 0) { buf.release(); - return ByteArrayHttpData.EMPTY; + return empty(); } - final ByteBufHttpData data = new ByteBufHttpData(buf, true); + final HttpData data = new ByteBufHttpData(buf, true); buf.touch(data); return data; } @@ -319,81 +306,6 @@ static HttpData ofAscii(@FormatString String format, Object... args) { return of(StandardCharsets.US_ASCII, format, args); } - /** - * Returns the underlying byte array of this data. Any changes made in the returned array affects - * the content of this data. - */ - byte[] array(); - - /** - * Returns the length of this data. - */ - int length(); - - /** - * Returns whether the {@link #length()} of this data is 0. - */ - default boolean isEmpty() { - return length() == 0; - } - - /** - * Decodes this data into a {@link String}. - * - * @param charset the {@link Charset} to use for decoding this data - * - * @return the decoded {@link String} - */ - String toString(Charset charset); - - /** - * Decodes this data into a {@link String} using UTF-8 encoding. - * - * @return the decoded {@link String} - */ - default String toStringUtf8() { - return toString(StandardCharsets.UTF_8); - } - - /** - * Decodes this data into a {@link String} using US-ASCII encoding. - * - * @return the decoded {@link String} - */ - default String toStringAscii() { - return toString(StandardCharsets.US_ASCII); - } - - /** - * Returns a new {@link InputStream} that is sourced from this data. - */ - InputStream toInputStream(); - - /** - * Returns a new {@link Reader} that is sourced from this data and decoded using the specified - * {@link Charset}. - */ - default Reader toReader(Charset charset) { - requireNonNull(charset, "charset"); - return new InputStreamReader(toInputStream(), charset); - } - - /** - * Returns a new {@link Reader} that is sourced from this data and decoded using - * {@link StandardCharsets#UTF_8}. - */ - default Reader toReaderUtf8() { - return toReader(StandardCharsets.UTF_8); - } - - /** - * Returns a new {@link Reader} that is sourced from this data and decoded using - * {@link StandardCharsets#US_ASCII}. - */ - default Reader toReaderAscii() { - return toReader(StandardCharsets.US_ASCII); - } - /** * Returns the {@link HttpData} that has the same content with this data and its HTTP/2 {@code endOfStream} * flag set. If this data already has {@code endOfStream} set, {@code this} will be returned. @@ -408,85 +320,4 @@ default HttpData withEndOfStream() { * {@code this} will be returned. */ HttpData withEndOfStream(boolean endOfStream); - - /** - * (Advanced users only) Returns whether this data is pooled. Note, if this method returns {@code true}, - * you must call {@link #close()} once you no longer need this data, because its underlying {@link ByteBuf} - * will not be released automatically. - * - * @see PooledObjects - */ - @UnstableApi - boolean isPooled(); - - /** - * (Advanced users only) Returns a new duplicate of the underlying {@link ByteBuf} of this data. - * This method does not transfer the ownership of the underlying {@link ByteBuf}, i.e. the reference - * count of the {@link ByteBuf} does not change. If this data is not pooled, the returned {@link ByteBuf} - * is not pooled, either, which means you need to worry about releasing it only when you created this data - * with {@link #wrap(ByteBuf)}. Any changes made in the content of the returned {@link ByteBuf} affects - * the content of this data. - * - * @see PooledObjects - */ - @UnstableApi - default ByteBuf byteBuf() { - return byteBuf(ByteBufAccessMode.DUPLICATE); - } - - /** - * (Advanced users only) Returns a new duplicate, retained duplicate or direct copy of the underlying - * {@link ByteBuf} of this data based on the specified {@link ByteBufAccessMode}. - * This method does not transfer the ownership of the underlying {@link ByteBuf}, i.e. the reference - * count of the {@link ByteBuf} does not change. If this data is not pooled, the returned {@link ByteBuf} - * is not pooled, either, which means you need to worry about releasing it only when you created this data - * with {@link #wrap(ByteBuf)}. Any changes made in the content of the returned {@link ByteBuf} affects - * the content of this data. - * - * @see PooledObjects - */ - @UnstableApi - ByteBuf byteBuf(ByteBufAccessMode mode); - - /** - * (Advanced users only) Returns a new slice, retained slice or direct copy of the underlying - * {@link ByteBuf} of this data based on the specified {@link ByteBufAccessMode}. - * This method does not transfer the ownership of the underlying {@link ByteBuf}, i.e. the reference - * count of the {@link ByteBuf} does not change. If this data is not pooled, the returned {@link ByteBuf} - * is not pooled, either, which means you need to worry about releasing it only when you created this data - * with {@link #wrap(ByteBuf)}. Any changes made in the content of the returned {@link ByteBuf} affects - * the content of this data. - * - * @see PooledObjects - */ - @UnstableApi - ByteBuf byteBuf(int offset, int length, ByteBufAccessMode mode); - - /** - * (Advanced users only) Records the current access location of this data for debugging purposes. - * If this data is determined to be leaked, the information recorded by this operation will be provided to - * you via {@link ResourceLeakDetector}. - */ - @UnstableApi - default void touch(@Nullable Object hint) {} - - /** - * Releases the underlying {@link ByteBuf} if this data was created via {@link #wrap(ByteBuf)}. - * Otherwise, this method does nothing. You may want to call this method to reclaim the underlying - * {@link ByteBuf} when using operations that return pooled objects, such as: - * - * If you don't use such operations, you don't need to call this method. - * - * @see PooledObjects - */ - @Override - void close(); } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamDecoderInput.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamDecoderInput.java index c0552d0e0f1..e52b42158d5 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamDecoderInput.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamDecoderInput.java @@ -51,11 +51,29 @@ default short readUnsignedByte() { return (short) (readByte() & 0xFF); } + /** + * Reads an unsigned short from the readable bytes. + * + * @throws IllegalStateException if the {@link #readableBytes()} is less than {@code 2} bytes. + */ + default int readUnsignedShort() { + return (readByte() & 0xFF) << 8 | readByte(); + } + /** * Reads a 32-bit integer from the readable bytes. + * + * @throws IllegalStateException if the {@link #readableBytes()} is less than {@code 4} bytes. */ int readInt(); + /** + * Reads a 64-bit long from the readable bytes. + * + * @throws IllegalStateException if the {@link #readableBytes()} is less than {@code 8} bytes. + */ + long readLong(); + /** * Reads a newly retained slice of this {@link ByteBuf} from the readable bytes. * @@ -63,6 +81,13 @@ default short readUnsignedByte() { */ ByteBuf readBytes(int length); + /** + * Reads data to the specified {@code dst}. + * + * @throws IllegalStateException if the length of the {@code dst} is greater than {@link #readableBytes()} + */ + void readBytes(byte[] dst); + /** * Returns a byte at the specified absolute {@code index} in this {@link StreamDecoderInput}. * diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/ByteArrayBytes.java b/core/src/main/java/com/linecorp/armeria/internal/common/ByteArrayBytes.java new file mode 100644 index 00000000000..25db6b21930 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/internal/common/ByteArrayBytes.java @@ -0,0 +1,183 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.internal.common; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; + +import com.linecorp.armeria.common.ByteBufAccessMode; +import com.linecorp.armeria.common.Bytes; +import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; + +/** + * A {@code byte[]}-based {@link Bytes}. + */ +public class ByteArrayBytes implements Bytes { + + private static final byte[] SAFE_OCTETS = new byte[256]; + + static { + final String safeOctetStr = "`~!@#$%^&*()-_=+\t[{]}\\|;:'\",<.>/?" + + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + for (int i = 0; i < safeOctetStr.length(); i++) { + SAFE_OCTETS[safeOctetStr.charAt(i)] = -1; + } + } + + private final byte[] array; + + /** + * Creates a new instance. + */ + protected ByteArrayBytes(byte[] array) { + this.array = requireNonNull(array, "array"); + } + + @Override + public byte[] array() { + return array; + } + + @Override + public int length() { + return array.length; + } + + @Override + public String toString(Charset charset) { + requireNonNull(charset, "charset"); + return new String(array, charset); + } + + @Override + public String toString() { + if (array.length == 0) { + return "{0B}"; + } + try (TemporaryThreadLocals tempThreadLocals = TemporaryThreadLocals.acquire()) { + final StringBuilder buf = tempThreadLocals.stringBuilder(); + buf.append('{').append(array.length).append("B, "); + + return appendPreviews(buf, array, 0, Math.min(16, array.length)) + .append('}').toString(); + } + } + + static StringBuilder appendPreviews(StringBuilder buf, byte[] array, int offset, int previewLength) { + // Append the hex preview if contains non-ASCII chars. + final int endOffset = offset + previewLength; + for (int i = offset; i < endOffset; i++) { + if (SAFE_OCTETS[array[i] & 0xFF] == 0) { + return buf.append("hex=").append(ByteBufUtil.hexDump(array, offset, previewLength)); + } + } + + // Append the text preview otherwise. + return buf.append("text=").append(new String(array, 0, offset, previewLength)); + } + + @Override + public InputStream toInputStream() { + return new FastByteArrayInputStream(array); + } + + @Override + public boolean isPooled() { + return false; + } + + @Override + public ByteBuf byteBuf(ByteBufAccessMode mode) { + requireNonNull(mode, "mode"); + if (isEmpty()) { + return Unpooled.EMPTY_BUFFER; + } + + if (mode != ByteBufAccessMode.FOR_IO) { + return Unpooled.wrappedBuffer(array); + } else { + final ByteBuf copy = newDirectByteBuf(); + copy.writeBytes(array); + return copy; + } + } + + @Override + public ByteBuf byteBuf(int offset, int length, ByteBufAccessMode mode) { + requireNonNull(mode, "mode"); + if (length == 0) { + return Unpooled.EMPTY_BUFFER; + } + + if (mode != ByteBufAccessMode.FOR_IO) { + return Unpooled.wrappedBuffer(array, offset, length); + } else { + final ByteBuf copy = newDirectByteBuf(length); + copy.writeBytes(array, offset, length); + return copy; + } + } + + private ByteBuf newDirectByteBuf() { + return newDirectByteBuf(length()); + } + + private static ByteBuf newDirectByteBuf(int length) { + return PooledByteBufAllocator.DEFAULT.directBuffer(length); + } + + @Override + public void close() {} + + @Override + public int hashCode() { + // Calculate the hash code from the first 32 bytes. + final int end = Math.min(32, length()); + + int hash = 0; + for (int i = 0; i < end; i++) { + hash = hash * 31 + array[i]; + } + return hash; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Bytes)) { + return false; + } + + if (obj == this) { + return true; + } + + final Bytes that = (Bytes) obj; + if (length() != that.length()) { + return false; + } + + return Arrays.equals(array, that.array()); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/ByteBufBytes.java b/core/src/main/java/com/linecorp/armeria/internal/common/ByteBufBytes.java new file mode 100644 index 00000000000..2a39d5eee13 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/internal/common/ByteBufBytes.java @@ -0,0 +1,268 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.internal.common; + +import static com.google.common.base.MoreObjects.firstNonNull; +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.charset.Charset; + +import com.linecorp.armeria.common.ByteBufAccessMode; +import com.linecorp.armeria.common.Bytes; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.IllegalReferenceCountException; +import io.netty.util.ResourceLeakHint; +import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; + +/** + * A {@link ByteBuf}-based {@link Bytes}. + */ +public class ByteBufBytes implements Bytes, ResourceLeakHint { + + private final ByteBuf buf; + private final boolean pooled; + private boolean closed; + @Nullable + private byte[] array; + + /** + * Creates a new instance. + */ + protected ByteBufBytes(ByteBuf buf, boolean pooled) { + this.buf = requireNonNull(buf, "buf"); + this.pooled = pooled; + } + + protected ByteBuf buf() { + return buf; + } + + @Override + public byte[] array() { + if (array != null) { + return array; + } + + final int length = buf.readableBytes(); + if (pooled) { + buf.touch(this); + // We don't use the pooled buffer's underlying array here, + // because it will be in use by others when 'buf' is released. + } else if (buf.hasArray() && buf.arrayOffset() == 0 && buf.readerIndex() == 0) { + final byte[] bufArray = buf.array(); + if (bufArray.length == length) { + return array = bufArray; + } + } + + return array = ByteBufUtil.getBytes(buf, buf.readerIndex(), length); + } + + @Override + public int length() { + return buf.readableBytes(); + } + + @Override + public String toString(Charset charset) { + requireNonNull(charset, "charset"); + if (array != null) { + return new String(array, charset); + } else { + return buf.toString(charset); + } + } + + @Override + public String toString() { + return toString(false); + } + + private String toString(boolean hint) { + final int length = buf.readableBytes(); + + try (TemporaryThreadLocals tempThreadLocals = TemporaryThreadLocals.acquire()) { + final StringBuilder strBuf = tempThreadLocals.stringBuilder(); + strBuf.append('{').append(length).append("B, "); + if (isPooled()) { + strBuf.append("pooled, "); + } + if (closed) { + if (buf.refCnt() == 0) { + return strBuf.append("closed}").toString(); + } else { + strBuf.append("closed, "); + } + } + + // Generate the preview array. + final int previewLength = Math.min(16, length); + byte[] array = this.array; + final int offset; + if (array == null) { + try { + if (buf.hasArray()) { + array = buf.array(); + offset = buf.arrayOffset() + buf.readerIndex(); + } else if (!hint) { + array = ByteBufUtil.getBytes(buf, buf.readerIndex(), previewLength); + offset = 0; + if (previewLength == length) { + this.array = array; + } + } else { + // Can't call getBytes() when generating the hint string + // because it will also create a leak record. + return strBuf.append("}").toString(); + } + } catch (IllegalReferenceCountException e) { + // Shouldn't really happen when used ByteBuf correctly, + // but we just don't make toString() fail because of this. + return strBuf.append("badRefCnt}").toString(); + } + } else { + offset = 0; + } + + return ByteArrayBytes.appendPreviews(strBuf, array, offset, previewLength) + .append('}').toString(); + } + } + + @Override + public String toHintString() { + return toString(true); + } + + @Override + public InputStream toInputStream() { + if (array != null) { + return new FastByteArrayInputStream(array); + } else { + return new ByteBufInputStream(buf.duplicate(), false); + } + } + + @Override + public boolean isPooled() { + return pooled; + } + + @Override + public ByteBuf byteBuf(ByteBufAccessMode mode) { + switch (requireNonNull(mode, "mode")) { + case DUPLICATE: + return buf.duplicate(); + case RETAINED_DUPLICATE: + return buf.retainedDuplicate(); + case FOR_IO: + if (buf.isDirect()) { + return buf.retainedDuplicate(); + } + + final ByteBuf copy = newDirectByteBuf(); + copy.writeBytes(buf, buf.readerIndex(), buf.readableBytes()); + return copy; + } + + throw new Error(); // Never reaches here. + } + + @Override + public ByteBuf byteBuf(int offset, int length, ByteBufAccessMode mode) { + final int startIndex = buf.readerIndex() + offset; + switch (requireNonNull(mode, "mode")) { + case DUPLICATE: + return buf.slice(startIndex, length); + case RETAINED_DUPLICATE: + return buf.retainedSlice(startIndex, length); + case FOR_IO: + if (buf.isDirect()) { + return buf.retainedSlice(startIndex, length); + } + + final ByteBuf copy = newDirectByteBuf(length); + copy.writeBytes(buf, startIndex, length); + return copy; + } + + throw new Error(); // Never reaches here. + } + + private ByteBuf newDirectByteBuf() { + return newDirectByteBuf(buf.readableBytes()); + } + + private static ByteBuf newDirectByteBuf(int length) { + return PooledByteBufAllocator.DEFAULT.directBuffer(length); + } + + @Override + public void touch(@Nullable Object hint) { + if (isPooled()) { + buf.touch(firstNonNull(hint, this)); + } + } + + @Override + public void close() { + // This is not thread safe, but an attempt to close one instance from multiple threads would fail + // with an IllegalReferenceCountException anyway. + if (!closed) { + closed = true; + if (pooled) { + buf.release(); + } + } + } + + @Override + public int hashCode() { + // Calculate the hash code from the first 32 bytes. + int hash = 0; + final int bufStart = buf.readerIndex(); + final int bufEnd = bufStart + Math.min(32, buf.readableBytes()); + for (int i = bufStart; i < bufEnd; i++) { + hash = hash * 31 + buf.getByte(i); + } + return hash; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Bytes)) { + return false; + } + + if (obj == this) { + return true; + } + + final Bytes that = (Bytes) obj; + if (buf.readableBytes() != that.length()) { + return false; + } + + return ByteBufUtil.equals(buf, that.byteBuf()); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/stream/ByteBufsDecoderInput.java b/core/src/main/java/com/linecorp/armeria/internal/common/stream/ByteBufsDecoderInput.java index d5110749231..313e9e0b656 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/stream/ByteBufsDecoderInput.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/stream/ByteBufsDecoderInput.java @@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; public final class ByteBufsDecoderInput implements StreamDecoderInput { @@ -76,6 +77,26 @@ public byte readByte() { return value; } + @Override + public int readUnsignedShort() { + final ByteBuf firstBuf = queue.peek(); + if (firstBuf == null) { + throw newEndOfInputException(); + } + + final int readableBytes = firstBuf.readableBytes(); + if (readableBytes >= 2) { + final int value = firstBuf.readUnsignedShort(); + if (readableBytes == 2) { + queue.remove(); + firstBuf.release(); + } + this.readableBytes -= 2; + return value; + } + return StreamDecoderInput.super.readUnsignedShort(); + } + @Override public int readInt() { final ByteBuf firstBuf = queue.peek(); @@ -134,8 +155,33 @@ private int readIntSlow() { throw newEndOfInputException(); } + @Override + public long readLong() { + final ByteBuf firstBuf = queue.peek(); + if (firstBuf == null) { + throw newEndOfInputException(); + } + + final int readableBytes = firstBuf.readableBytes(); + if (readableBytes >= 8) { + final long value = firstBuf.readLong(); + if (readableBytes == 8) { + queue.remove(); + firstBuf.release(); + } + this.readableBytes -= 8; + return value; + } + + // readableBytes is decreased in readInt. + return (long) readInt() << 32 | readInt(); + } + @Override public ByteBuf readBytes(int length) { + if (length == 0) { + return Unpooled.EMPTY_BUFFER; + } checkArgument(length > 0, "length %s (expected: length > 0)", length); final ByteBuf firstBuf = queue.peek(); if (firstBuf == null) { @@ -156,6 +202,29 @@ public ByteBuf readBytes(int length) { return byteBuf; } + @Override + public void readBytes(byte[] dst) { + final int length = dst.length; + if (length == 0) { + return; + } + final ByteBuf firstBuf = queue.peek(); + if (firstBuf == null) { + throw newEndOfInputException(); + } + + final int readableBytes = firstBuf.readableBytes(); + if (readableBytes == length) { + queue.remove().readBytes(dst).release(); + } else if (readableBytes > length) { + firstBuf.readBytes(dst, 0, length); + } else { + readBytesSlow(dst); + } + + this.readableBytes -= length; + } + private ByteBuf readBytesSlow(int length) { final ByteBuf value = alloc.buffer(length); int remaining = length; @@ -181,6 +250,29 @@ private ByteBuf readBytesSlow(int length) { throw newEndOfInputException(); } + private void readBytesSlow(byte[] dst) { + int remaining = dst.length; + for (final Iterator it = queue.iterator(); it.hasNext();) { + final ByteBuf buf = it.next(); + final int readableBytes = buf.readableBytes(); + assert readableBytes > 0 : buf; + + final int readSize = Math.min(remaining, readableBytes); + buf.readBytes(dst, dst.length - remaining, readSize); + if (readableBytes == readSize) { + it.remove(); + buf.release(); + } + + remaining -= readSize; + if (remaining == 0) { + return; + } + } + + throw newEndOfInputException(); + } + @Override public byte getByte(int index) { final ByteBuf firstBuf = queue.peek(); @@ -216,6 +308,9 @@ private byte getByteSlow(int remaining) { @Override public void skipBytes(int length) { + if (length == 0) { + return; + } final ByteBuf firstBuf = queue.peek(); if (firstBuf == null) { throw newEndOfInputException(); diff --git a/core/src/main/java/com/linecorp/armeria/unsafe/PooledObjects.java b/core/src/main/java/com/linecorp/armeria/unsafe/PooledObjects.java index 838a0f15315..c34d664e39e 100644 --- a/core/src/main/java/com/linecorp/armeria/unsafe/PooledObjects.java +++ b/core/src/main/java/com/linecorp/armeria/unsafe/PooledObjects.java @@ -20,6 +20,7 @@ import org.reactivestreams.Subscriber; import com.linecorp.armeria.common.AggregationOptions; +import com.linecorp.armeria.common.Bytes; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; @@ -33,19 +34,20 @@ import io.netty.buffer.ByteBufAllocator; /** - * Utility class that provides ways to create a pooled {@link HttpData} and manage its life cycle. + * Utility class that provides ways to create a pooled {@link Bytes} and manage its life cycle. * - *

Warning: Using a pooled {@link HttpData} is very advanced and can open up much more complicated - * management of a reference counted {@link ByteBuf}. You should only ever do this if you are very comfortable + *

Warning: Using a pooled {@link Bytes} is very advanced and can open up much more complicated + * management of a reference counted {@link Bytes}. You should only ever do this if you are very comfortable * with Netty. It is recommended to also read through * Reference counted objects * for more information on pooled objects.

* - *

What is a pooled {@link HttpData}?

+ *

What is a pooled {@link Bytes}?

* - *

A pooled {@link HttpData} is a special variant of {@link HttpData} whose {@link HttpData#isPooled()} - * returns {@code true}. It's usually created via {@link HttpData#wrap(ByteBuf)} by wrapping an existing - * {@link ByteBuf}. It can appear when you consume data using the operations such as: + *

A pooled {@link Bytes} is a special variant of {@link Bytes} whose {@link Bytes#isPooled()} + * returns {@code true}. Currently, {@link HttpData} is a {@link Bytes} and it's usually created via + * {@link HttpData#wrap(ByteBuf)} by wrapping an existing {@link ByteBuf}. + * It can appear when you consume data using the operations such as: *

    *
  • {@link StreamMessage#subscribe(Subscriber, SubscriptionOption...)} with * {@link SubscriptionOption#WITH_POOLED_OBJECTS}
  • @@ -56,16 +58,16 @@ *
  • {@link HttpFile#aggregateWithPooledObjects(Executor, ByteBufAllocator)}
  • *
* - *

To put it another way, you'll never see a pooled {@link HttpData} if you did not use such + *

To put it another way, you'll never see a pooled {@link Bytes} if you did not use such * operations. You can ignore the rest of this section if that's the case.

* - *

Any time you receive a pooled {@link HttpData}, it will have an underlying {@link ByteBuf} that must be + *

Any time you receive a pooled {@link Bytes}, it will have an underlying {@link ByteBuf} that must be * released - failure to release the {@link ByteBuf} will result in a memory leak and poor performance. * You must make sure to do this by calling {@link HttpData#close()}, usually in a try-with-resources structure * to avoid side effects, e.g. *

{@code
  * HttpResponse res = client.get("/");
- * res.aggregateWithPooledObjects(ctx.alloc(), ctx.executor())
+ * res.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.executor()))
  *    .thenApply(aggResp -> {
  *        // try-with-resources here ensures the content is released
  *        // if it is a pooled HttpData, or otherwise it's no-op.
@@ -91,36 +93,36 @@
 public final class PooledObjects {
 
     /**
-     * Closes the given pooled {@link HttpData}. Does nothing if it's not a pooled {@link HttpData}.
+     * Closes the given pooled {@link Bytes}. Does nothing if it's not a pooled {@link Bytes}.
      *
-     * @param obj maybe an {@link HttpData} to close
+     * @param obj maybe an {@link Bytes} to close
      */
     public static void close(Object obj) {
-        if (obj instanceof HttpData) {
-            ((HttpData) obj).close();
+        if (obj instanceof Bytes) {
+            ((Bytes) obj).close();
         }
     }
 
     /**
-     * Calls {@link ByteBuf#touch(Object)} on the specified {@link HttpData}'s underlying {@link ByteBuf}.
-     * Uses the specified {@link HttpData} as a hint. Does nothing if it's not a pooled {@link HttpData}.
+     * Calls {@link ByteBuf#touch(Object)} on the specified {@link Bytes}' underlying {@link ByteBuf}.
+     * Uses the specified {@link Bytes} as a hint. Does nothing if it's not a pooled {@link Bytes}.
      *
-     * @param obj maybe a pooled {@link HttpData} to touch its underlying {@link ByteBuf}
+     * @param obj maybe a pooled {@link Bytes} to touch its underlying {@link ByteBuf}
      */
     public static  T touch(T obj) {
         return touch(obj, obj);
     }
 
     /**
-     * Calls {@link ByteBuf#touch(Object)} on the specified {@link HttpData}'s underlying {@link ByteBuf}.
-     * Does nothing if it's not a pooled {@link HttpData}.
+     * Calls {@link ByteBuf#touch(Object)} on the specified {@link Bytes}'s underlying {@link ByteBuf}.
+     * Does nothing if it's not a pooled {@link Bytes}.
      *
-     * @param obj maybe a pooled {@link HttpData} to touch its underlying {@link ByteBuf}
+     * @param obj maybe a pooled {@link Bytes} to touch its underlying {@link ByteBuf}
      * @param hint the hint to specify when calling {@link ByteBuf#touch(Object)}
      */
     public static  T touch(T obj, @Nullable Object hint) {
-        if (obj instanceof HttpData) {
-            ((HttpData) obj).touch(hint);
+        if (obj instanceof Bytes) {
+            ((Bytes) obj).touch(hint);
         }
         return obj;
     }
diff --git a/core/src/test/java/com/linecorp/armeria/common/HttpDataTest.java b/core/src/test/java/com/linecorp/armeria/common/HttpDataTest.java
index 0eef87b92cf..579b9b7b5d5 100644
--- a/core/src/test/java/com/linecorp/armeria/common/HttpDataTest.java
+++ b/core/src/test/java/com/linecorp/armeria/common/HttpDataTest.java
@@ -156,4 +156,81 @@ void fromAsciiCharSequence() throws Exception {
         assertThat(HttpData.ofAscii((CharSequence) "가A").toStringUtf8()).isEqualTo("?A");
         assertThat(HttpData.ofAscii(CharBuffer.wrap("あB")).toStringUtf8()).isEqualTo("?B");
     }
+
+    @Test
+    void empty() {
+        final HttpData data = ByteArrayHttpData.EMPTY;
+        assertThat(data.array()).isEmpty();
+        assertThat(data.byteBuf()).isSameAs(Unpooled.EMPTY_BUFFER);
+        assertThat(data.isEmpty()).isTrue();
+        assertThat(data.isEndOfStream()).isFalse();
+        assertThat(data.withEndOfStream()).isSameAs(ByteArrayHttpData.EMPTY_EOS);
+        assertThat(data.withEndOfStream(false)).isSameAs(data);
+        assertThat(data.isPooled()).isFalse();
+
+        for (int i = 0; i < 2; i++) {
+            // close() should not release anything.
+            data.close();
+            assertThat(data.byteBuf().refCnt()).isOne();
+        }
+    }
+
+    @Test
+    void emptyEoS() {
+        final HttpData data = ByteArrayHttpData.EMPTY_EOS;
+        assertThat(data.array()).isEmpty();
+        assertThat(data.byteBuf()).isSameAs(Unpooled.EMPTY_BUFFER);
+        assertThat(data.isEmpty()).isTrue();
+        assertThat(data.isEndOfStream()).isTrue();
+        assertThat(data.withEndOfStream()).isSameAs(data);
+        assertThat(data.withEndOfStream(false)).isSameAs(ByteArrayHttpData.EMPTY);
+        assertThat(data.isPooled()).isFalse();
+    }
+
+    @Test
+    void arrayBacked() {
+        final byte[] array = { 1, 2, 3, 4 };
+        final HttpData data = HttpData.wrap(array);
+        assertThat(data.array()).isSameAs(array);
+        assertThat(data.byteBuf().array()).isSameAs(array);
+        assertThat(data.isEmpty()).isFalse();
+        assertThat(data.length()).isEqualTo(4);
+        assertThat(data.isEndOfStream()).isFalse();
+        assertThat(data.withEndOfStream(false)).isSameAs(data);
+        assertThat(data.toInputStream()).hasBinaryContent(array);
+        assertThat(data.isPooled()).isFalse();
+
+        final HttpData dataEoS = data.withEndOfStream();
+        assertThat(dataEoS.array()).isSameAs(array);
+        assertThat(dataEoS.byteBuf().array()).isSameAs(array);
+        assertThat(dataEoS.isEmpty()).isFalse();
+        assertThat(dataEoS.length()).isSameAs(4);
+        assertThat(dataEoS.isEndOfStream()).isTrue();
+        assertThat(dataEoS.withEndOfStream()).isSameAs(dataEoS);
+        assertThat(dataEoS.withEndOfStream(false).isEndOfStream()).isFalse();
+        assertThat(dataEoS.toInputStream()).hasBinaryContent(array);
+        assertThat(dataEoS.isPooled()).isFalse();
+    }
+
+    @Test
+    void unpooled() {
+        final byte[] array = { 1, 2, 3, 4 };
+        final ByteBuf buf = Unpooled.wrappedBuffer(array);
+        final HttpData data = new ByteBufHttpData(buf, false);
+        assertThat(data.isPooled()).isFalse();
+        assertThat(data.length()).isEqualTo(4);
+        assertThat(data.array()).isSameAs(buf.array());
+        assertThat(data.array()).isSameAs(data.array()); // Should be cached
+        assertThat(data.withEndOfStream(false)).isSameAs(data);
+
+        final HttpData dataEoS = data.withEndOfStream();
+        assertThat(dataEoS.isPooled()).isFalse();
+        assertThat(dataEoS.array()).isSameAs(data.array());
+        assertThat(dataEoS.withEndOfStream()).isSameAs(dataEoS);
+
+        // close() on an unpooled data should not release the buffer.
+        data.close();
+        assertThat(buf.refCnt()).isOne();
+        buf.release();
+    }
 }
diff --git a/core/src/test/java/com/linecorp/armeria/common/PublisherBasedHttpResponseTest.java b/core/src/test/java/com/linecorp/armeria/common/PublisherBasedHttpResponseTest.java
index 07ca99f7078..ea7fe4912fd 100644
--- a/core/src/test/java/com/linecorp/armeria/common/PublisherBasedHttpResponseTest.java
+++ b/core/src/test/java/com/linecorp/armeria/common/PublisherBasedHttpResponseTest.java
@@ -75,6 +75,6 @@ void emptyContentPublisher() {
         final AggregatedHttpResponse response = WebClient.of(server.httpUri())
                                                          .get("/empty-content").aggregate().join();
         assertThat(response.status()).isEqualTo(HttpStatus.OK);
-        assertThat(response.content()).isEqualTo(HttpData.empty());
+        assertThat(response.content()).isEqualTo(HttpData.empty().withEndOfStream());
     }
 }
diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageTest.java
index 395b34cc5da..40fe2519c7d 100644
--- a/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageTest.java
+++ b/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageTest.java
@@ -333,7 +333,7 @@ void noopSubscribe_aborted() {
         final StreamMessage source = StreamMessage.of(httpData);
         final List collected = new ArrayList<>();
         final StreamMessage aborted = source.peek(x -> {
-            if (x.equals(HttpData.wrap(Unpooled.wrappedBuffer("6".getBytes())))) {
+            if (x.equals(HttpData.wrap("6".getBytes()))) {
                 source.abort();
             } else {
                 collected.add(x);
@@ -363,7 +363,7 @@ void noopSubscribe_error_thrown() throws Exception {
         final StreamMessage source = StreamMessage.of(httpData);
         final List collected = new ArrayList<>();
         final StreamMessage aborted = source.peek(x -> {
-            if (x.equals(HttpData.wrap(Unpooled.wrappedBuffer("6".getBytes())))) {
+            if (x.equals(HttpData.wrap("6".getBytes()))) {
                 throw new RuntimeException();
             } else {
                 collected.add(x);
diff --git a/core/src/test/java/com/linecorp/armeria/common/ByteArrayHttpDataTest.java b/core/src/test/java/com/linecorp/armeria/internal/common/ByteArrayBytesTest.java
similarity index 50%
rename from core/src/test/java/com/linecorp/armeria/common/ByteArrayHttpDataTest.java
rename to core/src/test/java/com/linecorp/armeria/internal/common/ByteArrayBytesTest.java
index a56a3c411b2..76b6379117c 100644
--- a/core/src/test/java/com/linecorp/armeria/common/ByteArrayHttpDataTest.java
+++ b/core/src/test/java/com/linecorp/armeria/internal/common/ByteArrayBytesTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020 LINE Corporation
+ * Copyright 2021 LINE Corporation
  *
  * LINE Corporation licenses this file to you under the Apache License,
  * version 2.0 (the "License"); you may not use this file except in compliance
@@ -13,7 +13,7 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
-package com.linecorp.armeria.common;
+package com.linecorp.armeria.internal.common;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -21,71 +21,30 @@
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
+import com.linecorp.armeria.common.ByteBufAccessMode;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
-class ByteArrayHttpDataTest {
-
-    @Test
-    void empty() {
-        final ByteArrayHttpData data = ByteArrayHttpData.EMPTY;
-        assertThat(data.array()).isEmpty();
-        assertThat(data.byteBuf()).isSameAs(Unpooled.EMPTY_BUFFER);
-        assertThat(data.isEmpty()).isTrue();
-        assertThat(data.isEndOfStream()).isFalse();
-        assertThat(data.withEndOfStream()).isSameAs(ByteArrayHttpData.EMPTY_EOS);
-        assertThat(data.withEndOfStream(false)).isSameAs(data);
-        assertThat(data.isPooled()).isFalse();
-
-        for (int i = 0; i < 2; i++) {
-            // close() should not release anything.
-            data.close();
-            assertThat(data.byteBuf().refCnt()).isOne();
-        }
-    }
-
-    @Test
-    void emptyEoS() {
-        final ByteArrayHttpData data = ByteArrayHttpData.EMPTY_EOS;
-        assertThat(data.array()).isEmpty();
-        assertThat(data.byteBuf()).isSameAs(Unpooled.EMPTY_BUFFER);
-        assertThat(data.isEmpty()).isTrue();
-        assertThat(data.isEndOfStream()).isTrue();
-        assertThat(data.withEndOfStream()).isSameAs(data);
-        assertThat(data.withEndOfStream(false)).isSameAs(ByteArrayHttpData.EMPTY);
-        assertThat(data.isPooled()).isFalse();
-    }
+class ByteArrayBytesTest {
 
     @Test
     void arrayBacked() {
         final byte[] array = { 1, 2, 3, 4 };
-        final ByteArrayHttpData data = new ByteArrayHttpData(array);
+        final ByteArrayBytes data = new ByteArrayBytes(array);
         assertThat(data.array()).isSameAs(array);
         assertThat(data.byteBuf().array()).isSameAs(array);
         assertThat(data.isEmpty()).isFalse();
         assertThat(data.length()).isEqualTo(4);
-        assertThat(data.isEndOfStream()).isFalse();
-        assertThat(data.withEndOfStream(false)).isSameAs(data);
         assertThat(data.toInputStream()).hasBinaryContent(array);
         assertThat(data.isPooled()).isFalse();
-
-        final HttpData dataEoS = data.withEndOfStream();
-        assertThat(dataEoS.array()).isSameAs(array);
-        assertThat(dataEoS.byteBuf().array()).isSameAs(array);
-        assertThat(dataEoS.isEmpty()).isFalse();
-        assertThat(dataEoS.length()).isSameAs(4);
-        assertThat(dataEoS.isEndOfStream()).isTrue();
-        assertThat(dataEoS.withEndOfStream()).isSameAs(dataEoS);
-        assertThat(dataEoS.withEndOfStream(false).isEndOfStream()).isFalse();
-        assertThat(dataEoS.toInputStream()).hasBinaryContent(array);
-        assertThat(dataEoS.isPooled()).isFalse();
     }
 
     @ParameterizedTest
     @EnumSource(value = ByteBufAccessMode.class, names = { "DUPLICATE", "RETAINED_DUPLICATE" })
     void duplicateOrSlice(ByteBufAccessMode mode) {
         final byte[] array = { 1, 2, 3, 4 };
-        final ByteArrayHttpData data = new ByteArrayHttpData(array);
+        final ByteArrayBytes data = new ByteArrayBytes(array);
         final ByteBuf buf = data.byteBuf(mode);
         assertThat(buf.isDirect()).isFalse();
         assertThat(buf.readableBytes()).isEqualTo(4);
@@ -106,7 +65,7 @@ void duplicateOrSlice(ByteBufAccessMode mode) {
 
     @Test
     void directCopy() {
-        final ByteArrayHttpData data = new ByteArrayHttpData(new byte[] { 1, 2, 3, 4 });
+        final ByteArrayBytes data = new ByteArrayBytes(new byte[] { 1, 2, 3, 4 });
         final ByteBuf buf = data.byteBuf(ByteBufAccessMode.FOR_IO);
         assertThat(buf.isDirect()).isTrue();
         assertThat(buf.readableBytes()).isEqualTo(4);
@@ -122,27 +81,28 @@ void directCopy() {
 
     @Test
     void hash() {
-        final ByteArrayHttpData data = new ByteArrayHttpData(new byte[] { 2, 3, 4, 5 });
+        final ByteArrayBytes data = new ByteArrayBytes(new byte[] { 2, 3, 4, 5 });
         assertThat(data.hashCode()).isEqualTo(((2 * 31 + 3) * 31 + 4) * 31 + 5);
 
         // Ensure 33rd+ bytes are ignored.
         final byte[] bigArray = new byte[33];
         bigArray[32] = 1;
-        final ByteArrayHttpData bigData = new ByteArrayHttpData(bigArray);
+        final ByteArrayBytes bigData = new ByteArrayBytes(bigArray);
         assertThat(bigData.hashCode()).isZero();
     }
 
     @Test
     void equals() {
-        final HttpData a = new ByteArrayHttpData(new byte[] { 1, 2, 3, 4 });
-        final HttpData b = new ByteArrayHttpData(new byte[] { 1, 2, 3, 4 });
-        final HttpData c = new ByteArrayHttpData(new byte[] { 1, 2, 3 });
-        final HttpData d = new ByteArrayHttpData(new byte[] { 4, 5, 6, 7 });
-        final HttpData bufData = new ByteBufHttpData(Unpooled.directBuffer().writeInt(0x01020304), true);
+        final ByteArrayBytes a = new ByteArrayBytes(new byte[] { 1, 2, 3, 4 });
+        final ByteArrayBytes b = new ByteArrayBytes(new byte[] { 1, 2, 3, 4 });
+        final ByteArrayBytes c = new ByteArrayBytes(new byte[] { 1, 2, 3 });
+        final ByteArrayBytes d = new ByteArrayBytes(new byte[] { 4, 5, 6, 7 });
+        final ByteBufBytes bufData =
+                new ByteBufBytes(Unpooled.directBuffer().writeInt(0x01020304), true);
 
         assertThat(a).isEqualTo(a);
         assertThat(a).isEqualTo(b);
-        assertThat(a).isEqualTo(bufData);
+        assertThat(a.array()).isEqualTo(bufData.array());
         assertThat(a).isNotEqualTo(c);
         assertThat(a).isNotEqualTo(d);
         assertThat(a).isNotEqualTo(new Object());
@@ -152,19 +112,11 @@ void equals() {
 
     @Test
     void testToString() {
-        assertThat(ByteArrayHttpData.EMPTY).hasToString("{0B}");
-        assertThat(new ByteArrayHttpData(new byte[] { 'f', 'o', 'o' })).hasToString("{3B, text=foo}");
-        assertThat(new ByteArrayHttpData(new byte[] { 1, 2, 3 })).hasToString("{3B, hex=010203}");
-
-        // endOfStream
-        assertThat(ByteArrayHttpData.EMPTY_EOS).hasToString("{0B, EOS}");
-        assertThat(new ByteArrayHttpData(new byte[] { 'f', 'o', 'o' })
-                           .withEndOfStream()).hasToString("{3B, EOS, text=foo}");
-        assertThat(new ByteArrayHttpData(new byte[] { 1, 2, 3 })
-                           .withEndOfStream()).hasToString("{3B, EOS, hex=010203}");
+        assertThat(new ByteArrayBytes(new byte[] { 'f', 'o', 'o' })).hasToString("{3B, text=foo}");
+        assertThat(new ByteArrayBytes(new byte[] { 1, 2, 3 })).hasToString("{3B, hex=010203}");
 
         // Longer than 16 bytes
-        assertThat(new ByteArrayHttpData(new byte[] {
+        assertThat(new ByteArrayBytes(new byte[] {
                 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', -1
         })).hasToString("{17B, text=0123456789abcdef}");
     }
diff --git a/core/src/test/java/com/linecorp/armeria/common/ByteBufHttpDataTest.java b/core/src/test/java/com/linecorp/armeria/internal/common/ByteBufBytesTest.java
similarity index 69%
rename from core/src/test/java/com/linecorp/armeria/common/ByteBufHttpDataTest.java
rename to core/src/test/java/com/linecorp/armeria/internal/common/ByteBufBytesTest.java
index 15bd90b933d..02da9236c9d 100644
--- a/core/src/test/java/com/linecorp/armeria/common/ByteBufHttpDataTest.java
+++ b/core/src/test/java/com/linecorp/armeria/internal/common/ByteBufBytesTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020 LINE Corporation
+ * Copyright 2021 LINE Corporation
  *
  * LINE Corporation licenses this file to you under the Apache License,
  * version 2.0 (the "License"); you may not use this file except in compliance
@@ -13,7 +13,7 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
-package com.linecorp.armeria.common;
+package com.linecorp.armeria.internal.common;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -21,25 +21,22 @@
 
 import org.junit.jupiter.api.Test;
 
+import com.linecorp.armeria.common.ByteBufAccessMode;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
-class ByteBufHttpDataTest {
+class ByteBufBytesTest {
+
     @Test
     void unpooled() {
         final byte[] array = { 1, 2, 3, 4 };
         final ByteBuf buf = Unpooled.wrappedBuffer(array);
-        final ByteBufHttpData data = new ByteBufHttpData(buf, false);
+        final ByteBufBytes data = new ByteBufBytes(buf, false);
         assertThat(data.isPooled()).isFalse();
         assertThat(data.length()).isEqualTo(4);
         assertThat(data.array()).isSameAs(buf.array());
         assertThat(data.array()).isSameAs(data.array()); // Should be cached
-        assertThat(data.withEndOfStream(false)).isSameAs(data);
-
-        final HttpData dataEoS = data.withEndOfStream();
-        assertThat(dataEoS.isPooled()).isFalse();
-        assertThat(dataEoS.array()).isSameAs(data.array());
-        assertThat(dataEoS.withEndOfStream()).isSameAs(dataEoS);
 
         // close() on an unpooled data should not release the buffer.
         data.close();
@@ -51,7 +48,7 @@ void unpooled() {
     void unpooledSlicedArray() {
         final byte[] array = { 1, 2, 3, 4 };
         final ByteBuf buf = Unpooled.wrappedBuffer(array, 1, 2);
-        final ByteBufHttpData data = new ByteBufHttpData(buf, false);
+        final ByteBufBytes data = new ByteBufBytes(buf, false);
         assertThat(data.length()).isEqualTo(2);
 
         final byte[] slicedArray = data.array();
@@ -63,7 +60,7 @@ void unpooledSlicedArray() {
     @Test
     void byteBuf() {
         final ByteBuf buf = Unpooled.buffer(4).writeInt(0x01020304);
-        final ByteBufHttpData data = new ByteBufHttpData(buf, true);
+        final ByteBufBytes data = new ByteBufBytes(buf, true);
         assertThat(data.isPooled()).isTrue();
 
         // Test DUPLICATE mode.
@@ -101,7 +98,7 @@ void byteBuf() {
     @Test
     void slicedByteBuf() {
         final ByteBuf buf = Unpooled.buffer(4).writeInt(0x01020304);
-        final ByteBufHttpData data = new ByteBufHttpData(buf, true);
+        final ByteBufBytes data = new ByteBufBytes(buf, true);
         assertThat(data.isPooled()).isTrue();
 
         // Test DUPLICATE mode.
@@ -137,7 +134,7 @@ void slicedByteBuf() {
     @Test
     void directBufferShouldNotBeCopied() {
         final ByteBuf buf = Unpooled.directBuffer(4).writeInt(0x01020304);
-        final ByteBufHttpData data = new ByteBufHttpData(buf, true);
+        final ByteBufBytes data = new ByteBufBytes(buf, true);
 
         final ByteBuf duplicate = data.byteBuf(ByteBufAccessMode.FOR_IO);
         final ByteBuf slice = data.byteBuf(1, 2, ByteBufAccessMode.FOR_IO);
@@ -156,7 +153,7 @@ void directBufferShouldNotBeCopied() {
     @Test
     void doubleFree() {
         final ByteBuf buf = Unpooled.directBuffer(4).writeInt(0x01020304).retain();
-        final ByteBufHttpData data = new ByteBufHttpData(buf, true);
+        final ByteBufBytes data = new ByteBufBytes(buf, true);
         for (int i = 0; i < 2; i++) {
             data.close();
             assertThat(buf.refCnt()).isOne();
@@ -166,30 +163,31 @@ void doubleFree() {
 
     @Test
     void hash() {
-        final ByteBufHttpData data = new ByteBufHttpData(Unpooled.directBuffer().writeInt(0x02030405), true);
+        final ByteBufBytes data =
+                new ByteBufBytes(Unpooled.directBuffer().writeInt(0x02030405), true);
         assertThat(data.hashCode()).isEqualTo(((2 * 31 + 3) * 31 + 4) * 31 + 5);
         data.close();
 
         // Ensure 33rd+ bytes are ignored.
-        final ByteBufHttpData bigData = new ByteBufHttpData(Unpooled.directBuffer()
-                                                                    .writeZero(32)
-                                                                    .writeByte(1),
-                                                            true);
+        final ByteBufBytes bigData = new ByteBufBytes(Unpooled.directBuffer()
+                                                              .writeZero(32)
+                                                              .writeByte(1),
+                                                      true);
         assertThat(bigData.hashCode()).isZero();
         bigData.close();
     }
 
     @Test
     void equals() {
-        final HttpData a = new ByteBufHttpData(Unpooled.directBuffer().writeInt(0x01020304), true);
-        final HttpData b = new ByteBufHttpData(Unpooled.directBuffer().writeInt(0x01020304), true);
-        final HttpData c = new ByteBufHttpData(Unpooled.directBuffer().writeMedium(0x010203), true);
-        final HttpData d = new ByteBufHttpData(Unpooled.directBuffer().writeInt(0x04050607), true);
-        final HttpData arrayData = new ByteArrayHttpData(new byte[] { 1, 2, 3, 4 });
+        final ByteBufBytes a = new ByteBufBytes(Unpooled.directBuffer().writeInt(0x01020304), true);
+        final ByteBufBytes b = new ByteBufBytes(Unpooled.directBuffer().writeInt(0x01020304), true);
+        final ByteBufBytes c = new ByteBufBytes(Unpooled.directBuffer().writeMedium(0x010203), true);
+        final ByteBufBytes d = new ByteBufBytes(Unpooled.directBuffer().writeInt(0x04050607), true);
+        final ByteArrayBytes arrayData = new ByteArrayBytes(new byte[] { 1, 2, 3, 4 });
 
         assertThat(a).isEqualTo(a);
         assertThat(a).isEqualTo(b);
-        assertThat(a).isEqualTo(arrayData);
+        assertThat(a.array()).isEqualTo(arrayData.array());
         assertThat(a).isNotEqualTo(c);
         assertThat(a).isNotEqualTo(d);
         assertThat(a).isNotEqualTo(new Object());
@@ -202,39 +200,35 @@ void equals() {
 
     @Test
     void testToString() {
-        assertThat(new ByteBufHttpData(Unpooled.copiedBuffer("foo", StandardCharsets.US_ASCII), false))
+        assertThat(new ByteBufBytes(Unpooled.copiedBuffer("foo", StandardCharsets.US_ASCII), false))
                 .hasToString("{3B, text=foo}");
-        assertThat(new ByteBufHttpData(Unpooled.copiedBuffer("\u0001\u0002", StandardCharsets.US_ASCII), false))
+        assertThat(new ByteBufBytes(Unpooled.copiedBuffer("\u0001\u0002", StandardCharsets.US_ASCII),
+                                    false))
                 .hasToString("{2B, hex=0102}");
 
-        // endOfStream
-        assertThat(new ByteBufHttpData(Unpooled.copiedBuffer("foo", StandardCharsets.US_ASCII), false)
-                           .withEndOfStream()).hasToString("{3B, EOS, text=foo}");
-        assertThat(new ByteBufHttpData(Unpooled.copiedBuffer("\u0001\u0002", StandardCharsets.US_ASCII), false)
-                           .withEndOfStream()).hasToString("{2B, EOS, hex=0102}");
-
         // pooled
-        assertThat(new ByteBufHttpData(Unpooled.copiedBuffer("foo", StandardCharsets.US_ASCII), true))
+        assertThat(new ByteBufBytes(Unpooled.copiedBuffer("foo", StandardCharsets.US_ASCII), true))
                 .hasToString("{3B, pooled, text=foo}");
-        assertThat(new ByteBufHttpData(Unpooled.copiedBuffer("\u0001\u0002", StandardCharsets.US_ASCII), true))
+        assertThat(new ByteBufBytes(Unpooled.copiedBuffer("\u0001\u0002", StandardCharsets.US_ASCII),
+                                    true))
                 .hasToString("{2B, pooled, hex=0102}");
 
         // closed and freed
-        final ByteBufHttpData data1 =
-                new ByteBufHttpData(Unpooled.copiedBuffer("bar", StandardCharsets.US_ASCII), true);
+        final ByteBufBytes data1 =
+                new ByteBufBytes(Unpooled.copiedBuffer("bar", StandardCharsets.US_ASCII), true);
         data1.close();
         assertThat(data1).hasToString("{3B, pooled, closed}");
 
         // closed but not freed
-        final ByteBufHttpData data2 =
-                new ByteBufHttpData(Unpooled.unreleasableBuffer(
+        final ByteBufBytes data2 =
+                new ByteBufBytes(Unpooled.unreleasableBuffer(
                         Unpooled.copiedBuffer("bar", StandardCharsets.US_ASCII)), true);
         data2.close();
         assertThat(data2).hasToString("{3B, pooled, closed, text=bar}");
 
         // Longer than 16 bytes
-        assertThat(new ByteBufHttpData(Unpooled.copiedBuffer("0123456789abcdef\u0001",
-                                                             StandardCharsets.US_ASCII), false))
+        assertThat(new ByteBufBytes(Unpooled.copiedBuffer("0123456789abcdef\u0001",
+                                                          StandardCharsets.US_ASCII), false))
                 .hasToString("{17B, text=0123456789abcdef}");
     }
 }
diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/stream/ByteBufsDecoderInputTest.java b/core/src/test/java/com/linecorp/armeria/internal/common/stream/ByteBufsDecoderInputTest.java
index b7cdebcfe63..59f1adf7544 100644
--- a/core/src/test/java/com/linecorp/armeria/internal/common/stream/ByteBufsDecoderInputTest.java
+++ b/core/src/test/java/com/linecorp/armeria/internal/common/stream/ByteBufsDecoderInputTest.java
@@ -99,6 +99,21 @@ void readableBytes_readByte() {
         assertThat(empty.readableBytes()).isEqualTo(0);
     }
 
+    @Test
+    void readableBytes_readUnsignedShort() {
+        assertThat(input.readableBytes()).isEqualTo(9);
+        assertThat(input.readUnsignedShort()).isEqualTo(1 * 256 + 2);
+        assertThat(input.readableBytes()).isEqualTo(7);
+        assertThat(input.readUnsignedShort()).isEqualTo(3 * 256 + 4);
+        assertThat(input.readableBytes()).isEqualTo(5);
+        input.readUnsignedShort();
+        input.readUnsignedShort();
+        assertThat(input.readableBytes()).isEqualTo(1);
+        assertThatThrownBy(() -> input.readUnsignedShort())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("end of deframer input");
+    }
+
     @Test
     void readableBytes_readInt() {
         assertThat(input.readableBytes()).isEqualTo(9);
@@ -108,6 +123,16 @@ void readableBytes_readInt() {
         assertThat(input.readableBytes()).isEqualTo(1);
     }
 
+    @Test
+    void readableBytes_readLong() {
+        assertThat(input.readableBytes()).isEqualTo(9);
+        assertThat(input.readLong()).isEqualTo(
+                1 * (long) Math.pow(256, 7) + 2 * (long) Math.pow(256, 6) + 3 * (long) Math.pow(256, 5) +
+                4 * (long) Math.pow(256, 4) + 5 * (long) Math.pow(256, 3) + 6 * (long) Math.pow(256, 2) +
+                7 * 256 + 8);
+        assertThat(input.readableBytes()).isEqualTo(1);
+    }
+
     @ValueSource(ints = { 1, 2, 3, 4, 5, 6, 7, 8, 9 })
     @ParameterizedTest
     void readableBytes_readBytes(int size) {
@@ -204,6 +229,51 @@ void readBytes() {
         assertThat(byteBuf4.refCnt()).isZero();
     }
 
+    @Test
+    void readBytesToArray() {
+        final ByteBufsDecoderInput input = new ByteBufsDecoderInput(UnpooledByteBufAllocator.DEFAULT);
+        final ByteBuf byteBuf1 = Unpooled.wrappedBuffer(new byte[]{ 1, 2, 3, 4 });
+        final ByteBuf byteBuf2 = Unpooled.buffer(4);
+        byteBuf2.writeByte(5);
+        byteBuf2.writeByte(6);
+        final ByteBuf byteBuf3 = Unpooled.wrappedBuffer(new byte[]{ 7, 8 });
+        final ByteBuf byteBuf4 = Unpooled.wrappedBuffer(new byte[]{ -1, 9 });
+        byteBuf4.readByte();
+
+        input.add(byteBuf1);
+        input.add(byteBuf2);
+        input.add(byteBuf3);
+        input.add(Unpooled.EMPTY_BUFFER);
+        input.add(byteBuf4);
+
+        byte[] bytes = new byte[4];
+        input.readBytes(bytes);
+        assertThat(bytes).containsExactly(1, 2, 3, 4);
+
+        bytes = new byte[1];
+        input.readBytes(bytes);
+        assertThat(bytes).containsExactly(5);
+
+        bytes = new byte[3];
+        input.readBytes(bytes);
+        assertThat(bytes).containsExactly(6, 7, 8);
+
+        bytes = new byte[1];
+        input.readBytes(bytes);
+        assertThat(bytes).containsExactly(9);
+
+        final byte[] bytes0 = new byte[1];
+        assertThatThrownBy(() -> input.readBytes(bytes0))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("end of deframer input");
+        input.close();
+
+        assertThat(byteBuf1.refCnt()).isZero();
+        assertThat(byteBuf2.refCnt()).isZero();
+        assertThat(byteBuf3.refCnt()).isZero();
+        assertThat(byteBuf4.refCnt()).isZero();
+    }
+
     @Test
     void skipBytes() {
         // fast
diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/UnaryDecoderInput.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/UnaryDecoderInput.java
index 91944c8d5b8..62733f0b9aa 100644
--- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/UnaryDecoderInput.java
+++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/UnaryDecoderInput.java
@@ -43,6 +43,11 @@ public int readInt() {
         return buf.readInt();
     }
 
+    @Override
+    public long readLong() {
+        return buf.readLong();
+    }
+
     @Override
     public ByteBuf readBytes(int length) {
         if (length == readableBytes()) {
@@ -52,6 +57,11 @@ public ByteBuf readBytes(int length) {
         }
     }
 
+    @Override
+    public void readBytes(byte[] dst) {
+        buf.readBytes(dst);
+    }
+
     @Override
     public byte getByte(int index) {
         return buf.getByte(index);