Skip to content

Commit

Permalink
Add Bytes that represents binary data (#4505)
Browse files Browse the repository at this point in the history
Motivation:
We create an `HttpData` by wrapping `ByteBuf` or `byte[]` that represent binary data. `ByteArrayHttpData` or `ByteBufHttpData` is internally used to operate on the binary data. Because it's an `HttpData` we cannot share the logic if we add other classes that are binary data such as WebSocket frames.

Modifications:
- Add `Bytes` and its implementations: `ByteArrayBytes`, `ByteBufBytes`
  - `HttpData` uses them internally.
- Add more methods to `StreamDecoderInput` that will be used for WebSocket.

Result:
- Ready to work on #3904
  • Loading branch information
minwoox authored Dec 5, 2022
1 parent 5fd87c2 commit 2aa149b
Show file tree
Hide file tree
Showing 17 changed files with 1,163 additions and 676 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@

import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpResponseWriter;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.metric.NoopMeterRegistry;
import com.linecorp.armeria.server.Server;
Expand Down Expand Up @@ -67,13 +70,30 @@ String uriText() {
@Param
private Protocol protocol;

@Param("100")
private int chunkCount;

@Setup
public void startServer() throws Exception {
final byte[] PLAINTEXT = "Hello, World!".getBytes(StandardCharsets.UTF_8);
final byte[] plaintext = "Hello, World!".getBytes(StandardCharsets.UTF_8);

server = Server.builder()
.service("/empty", (ctx, req) -> HttpResponse.of(HttpStatus.OK))
.service("/plaintext", (ctx, req) -> HttpResponse
.of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, PLAINTEXT))
.of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, plaintext))
.service("/streaming", (ctx, req) -> {
final HttpResponseWriter writer = HttpResponse.streaming();
writer.write(ResponseHeaders.of(200));
for (int i = 0; i < chunkCount; i++) {
if (i == chunkCount - 1) {
writer.write(HttpData.wrap(plaintext).withEndOfStream());
writer.close();
} else {
writer.write(HttpData.wrap(plaintext));
}
}
return writer;
})
.requestTimeout(Duration.ZERO)
.meterRegistry(NoopMeterRegistry.get())
.build();
Expand Down Expand Up @@ -108,6 +128,11 @@ public void empty(Blackhole bh, AsyncCounters counters) throws Exception {
}));
}

@Benchmark
public void streaming(Blackhole bh) throws Exception {
bh.consume(webClient.get("/streaming").aggregate().join());
}

/**
* A benchmark for a test designed to demonstrate the capacity about {@code MediaType.PLAIN_TEXT_UTF_8}.
*
Expand Down
193 changes: 37 additions & 156 deletions core/src/main/java/com/linecorp/armeria/common/ByteArrayHttpData.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 LINE Corporation
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -13,201 +13,82 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common;

import static java.util.Objects.requireNonNull;
package com.linecorp.armeria.common;

import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;

import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals;
import com.linecorp.armeria.internal.common.ByteArrayBytes;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.EmptyArrays;
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
final class ByteArrayHttpData extends ByteArrayBytes implements HttpData {

/**
* A {@code byte[]}-based {@link HttpData}.
*/
final class ByteArrayHttpData implements HttpData {
private static final byte[] EMPTY_BYTES = {};

static final ByteArrayHttpData EMPTY = new ByteArrayHttpData(EmptyArrays.EMPTY_BYTES, false);
static final ByteArrayHttpData EMPTY_EOS = new ByteArrayHttpData(EmptyArrays.EMPTY_BYTES, true);
static final ByteArrayHttpData EMPTY = new ByteArrayHttpData(EMPTY_BYTES);
static final ByteArrayHttpData EMPTY_EOS = new ByteArrayHttpData(EMPTY_BYTES, true);

private static final byte[] SAFE_OCTETS = new byte[256];

static {
final String safeOctetStr = "`~!@#$%^&*()-_=+\t[{]}\\|;:'\",<.>/?" +
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
for (int i = 0; i < safeOctetStr.length(); i++) {
SAFE_OCTETS[safeOctetStr.charAt(i)] = -1;
}
}

private final byte[] array;
private final boolean endOfStream;

/**
* Creates a new instance.
*/
ByteArrayHttpData(byte[] array) {
this(array, false);
super(array);
endOfStream = false;
}

private ByteArrayHttpData(byte[] array, boolean endOfStream) {
this.array = array;
super(array);
this.endOfStream = endOfStream;
}

@Override
public byte[] array() {
return array;
}

@Override
public int length() {
return array.length;
}

@Override
public String toString(Charset charset) {
requireNonNull(charset, "charset");
return new String(array, charset);
}

@Override
public String toString() {
if (array.length == 0) {
return isEndOfStream() ? "{0B, EOS}" : "{0B}";
}

try (TemporaryThreadLocals tempThreadLocals = TemporaryThreadLocals.acquire()) {
final StringBuilder buf = tempThreadLocals.stringBuilder();
buf.append('{').append(array.length);

if (isEndOfStream()) {
buf.append("B, EOS, ");
} else {
buf.append("B, ");
}

return appendPreviews(buf, array, 0, Math.min(16, array.length))
.append('}').toString();
}
}

static StringBuilder appendPreviews(StringBuilder buf, byte[] array, int offset, int previewLength) {
// Append the hex preview if contains non-ASCII chars.
final int endOffset = offset + previewLength;
for (int i = offset; i < endOffset; i++) {
if (SAFE_OCTETS[array[i] & 0xFF] == 0) {
return buf.append("hex=").append(ByteBufUtil.hexDump(array, offset, previewLength));
}
}

// Append the text preview otherwise.
return buf.append("text=").append(new String(array, 0, offset, previewLength));
}

@Override
public InputStream toInputStream() {
return new FastByteArrayInputStream(array);
}

@Override
public boolean isEndOfStream() {
return endOfStream;
}

@Override
public ByteArrayHttpData withEndOfStream(boolean endOfStream) {
if (isEndOfStream() == endOfStream) {
public HttpData withEndOfStream(boolean endOfStream) {
if (this.endOfStream == endOfStream) {
return this;
}

if (isEmpty()) {
return endOfStream ? EMPTY_EOS : EMPTY;
}

return new ByteArrayHttpData(array, endOfStream);
}

@Override
public boolean isPooled() {
return false;
}

@Override
public ByteBuf byteBuf(ByteBufAccessMode mode) {
requireNonNull(mode, "mode");
if (isEmpty()) {
return Unpooled.EMPTY_BUFFER;
}

if (mode != ByteBufAccessMode.FOR_IO) {
return Unpooled.wrappedBuffer(array);
} else {
final ByteBuf copy = newDirectByteBuf();
copy.writeBytes(array);
return copy;
}
return new ByteArrayHttpData(array(), endOfStream);
}

@Override
public ByteBuf byteBuf(int offset, int length, ByteBufAccessMode mode) {
requireNonNull(mode, "mode");
if (length == 0) {
return Unpooled.EMPTY_BUFFER;
}

if (mode != ByteBufAccessMode.FOR_IO) {
return Unpooled.wrappedBuffer(array, offset, length);
} else {
final ByteBuf copy = newDirectByteBuf(length);
copy.writeBytes(array, offset, length);
return copy;
}
}

private ByteBuf newDirectByteBuf() {
return newDirectByteBuf(length());
}

private static ByteBuf newDirectByteBuf(int length) {
return PooledByteBufAllocator.DEFAULT.directBuffer(length);
public boolean isEndOfStream() {
return endOfStream;
}

@Override
public void close() {}

@SuppressWarnings("RedundantMethodOverride")
@Override
public int hashCode() {
// Calculate the hash code from the first 32 bytes.
final int end = Math.min(32, length());

int hash = 0;
for (int i = 0; i < end; i++) {
hash = hash * 31 + array[i];
}
return hash;
// Use hashcode in ByteBufBytes because we don't use endOfStream in equals.
return super.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof HttpData)) {
return false;
}

if (obj == this) {
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof HttpData)) {
return false;
}

final HttpData that = (HttpData) obj;
final HttpData that = (HttpData) o;
if (length() != that.length()) {
return false;
}

return Arrays.equals(array, that.array());
return Arrays.equals(array(), that.array());
}

@Override
public String toString() {
final String toString = super.toString();
if (!isEndOfStream()) {
return toString;
}
return toString + ", {EOS}";
}
}
Loading

0 comments on commit 2aa149b

Please sign in to comment.