From 46821eef42dea5d03e23c36e5648892b504e0dab Mon Sep 17 00:00:00 2001 From: Vacha Shah Date: Fri, 21 Apr 2023 23:59:39 +0000 Subject: [PATCH] Adding ProtobufStreamInput and ProtobufStreamOutput for additional stream related functionality Signed-off-by: Vacha Shah --- .../common/io/stream/ProtobufStreamInput.java | 141 ++++++++++++++++++ .../io/stream/ProtobufStreamOutput.java | 63 ++++++++ 2 files changed, 204 insertions(+) create mode 100644 server/src/main/java/org/opensearch/common/io/stream/ProtobufStreamInput.java create mode 100644 server/src/main/java/org/opensearch/common/io/stream/ProtobufStreamOutput.java diff --git a/server/src/main/java/org/opensearch/common/io/stream/ProtobufStreamInput.java b/server/src/main/java/org/opensearch/common/io/stream/ProtobufStreamInput.java new file mode 100644 index 0000000000000..da867e7e487e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/io/stream/ProtobufStreamInput.java @@ -0,0 +1,141 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.common.io.stream; + +import com.google.protobuf.CodedInputStream; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +import org.apache.lucene.util.ArrayUtil; +import org.opensearch.Version; +import org.opensearch.common.Nullable; + +public class ProtobufStreamInput { + + private Version version = Version.CURRENT; + + /** + * The version of the node on the other side of this stream. + */ + public Version getVersion() { + return this.version; + } + + /** + * Set the version of the node on the other side of this stream. + */ + public void setVersion(Version version) { + this.version = version; + } + + @Nullable + public String readOptionalString(CodedInputStream in) throws IOException { + if (readBoolean(in)) { + return in.readString(); + } + return null; + } + + /** + * If the returned map contains any entries it will be mutable. If it is empty it might be immutable. + */ + public Map readMap(ProtobufWriteable.Reader keyReader, ProtobufWriteable.Reader valueReader, CodedInputStream in) throws IOException { + int size = readArraySize(in); + if (size == 0) { + return Collections.emptyMap(); + } + Map map = new HashMap<>(size); + for (int i = 0; i < size; i++) { + K key = keyReader.read(in); + V value = valueReader.read(in); + map.put(key, value); + } + return map; + } + + @Nullable + public T readOptionalWriteable(ProtobufWriteable.Reader reader, CodedInputStream in) throws IOException { + if (readBoolean(in)) { + T t = reader.read(in); + if (t == null) { + throw new IOException( + "Writeable.Reader [" + reader + "] returned null which is not allowed and probably means it screwed up the stream." + ); + } + return t; + } else { + return null; + } + } + + private int readArraySize(CodedInputStream in) throws IOException { + final int arraySize = readVInt(in); + if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) { + throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize); + } + if (arraySize < 0) { + throw new NegativeArraySizeException("array size must be positive but was: " + arraySize); + } + // lets do a sanity check that if we are reading an array size that is bigger that the remaining bytes we can safely + // throw an exception instead of allocating the array based on the size. A simple corrutpted byte can make a node go OOM + // if the size is large and for perf reasons we allocate arrays ahead of time + // ensureCanReadBytes(arraySize); + return arraySize; + } + + public int readVInt(CodedInputStream in) throws IOException { + byte b = in.readRawByte(); + int i = b & 0x7F; + if ((b & 0x80) == 0) { + return i; + } + b = in.readRawByte(); + i |= (b & 0x7F) << 7; + if ((b & 0x80) == 0) { + return i; + } + b = in.readRawByte(); + i |= (b & 0x7F) << 14; + if ((b & 0x80) == 0) { + return i; + } + b = in.readRawByte(); + i |= (b & 0x7F) << 21; + if ((b & 0x80) == 0) { + return i; + } + b = in.readRawByte(); + if ((b & 0x80) != 0) { + throw new IOException("Invalid vInt ((" + Integer.toHexString(b) + " & 0x7f) << 28) | " + Integer.toHexString(i)); + } + return i | ((b & 0x7F) << 28); + } + + /** + * Reads a boolean. + */ + public final boolean readBoolean(CodedInputStream in) throws IOException { + return readBoolean(in.readRawByte()); + } + + private boolean readBoolean(final byte value) { + if (value == 0) { + return false; + } else if (value == 1) { + return true; + } else { + final String message = String.format(Locale.ROOT, "unexpected byte [0x%02x]", value); + throw new IllegalStateException(message); + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/common/io/stream/ProtobufStreamOutput.java b/server/src/main/java/org/opensearch/common/io/stream/ProtobufStreamOutput.java new file mode 100644 index 0000000000000..7b69249a661bd --- /dev/null +++ b/server/src/main/java/org/opensearch/common/io/stream/ProtobufStreamOutput.java @@ -0,0 +1,63 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.common.io.stream; + +import com.google.protobuf.CodedOutputStream; + +import java.io.IOException; +import java.util.Map; + +import org.opensearch.Version; +import org.opensearch.common.Nullable; + +public class ProtobufStreamOutput { + + private Version version = Version.CURRENT; + + /** + * The version of the node on the other side of this stream. + */ + public Version getVersion() { + return this.version; + } + + /** + * Set the version of the node on the other side of this stream. + */ + public void setVersion(Version version) { + this.version = version; + } + + /** + * Write a {@link Map} of {@code K}-type keys to {@code V}-type. + *

+     * Map<String, String> map = ...;
+     * out.writeMap(map, StreamOutput::writeString, StreamOutput::writeString);
+     * 
+ * + * @param keyWriter The key writer + * @param valueWriter The value writer + */ + public final void writeMap(final Map map, final ProtobufWriteable.Writer keyWriter, final ProtobufWriteable.Writer valueWriter, CodedOutputStream out) throws IOException { + for (final Map.Entry entry : map.entrySet()) { + keyWriter.write(out, entry.getKey()); + valueWriter.write(out, entry.getValue()); + } + } + + public void writeOptionalWriteable(@Nullable ProtobufWriteable writeable, CodedOutputStream out) throws IOException { + if (writeable != null) { + out.writeBool(1, true); + writeable.writeTo(out); + } else { + out.writeBool(1, false); + } + } + +} \ No newline at end of file