diff --git a/server/build.gradle b/server/build.gradle index c4aa3818bfa8c..6ef4c9d5eda39 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -361,6 +361,8 @@ tasks.named("missingJavadoc").configure { "org.opensearch.extensions.proto.ExtensionRequestProto.ExtensionRequestOrBuilder", "org.opensearch.extensions.proto.RegisterTransportActionsProto", "org.opensearch.extensions.proto.RegisterTransportActionsProto.RegisterTransportActionsOrBuilder", + "org.opensearch.extensions.proto.ExtensionTransportMessageProto", + "org.opensearch.extensions.proto.ExtensionTransportMessageProto.ExtensionTransportMessageOrBuilder", "org.opensearch.extensions.proto" ] } diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java index dbcd682a38950..cf9428cecb1b5 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java @@ -8,10 +8,12 @@ package org.opensearch.extensions.action; +import com.google.protobuf.ByteString; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.extensions.proto.ExtensionTransportMessageProto.ExtensionTransportMessage; import java.io.IOException; @@ -22,14 +24,7 @@ * @opensearch.internal */ public class ExtensionActionRequest extends ActionRequest { - /** - * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. - */ - private final String action; - /** - * requestBytes is the raw bytes being transported between extensions. - */ - private final byte[] requestBytes; + private final ExtensionTransportMessage request; /** * ExtensionActionRequest constructor. @@ -37,9 +32,8 @@ public class ExtensionActionRequest extends ActionRequest { * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. * @param requestBytes is the raw bytes being transported between extensions. */ - public ExtensionActionRequest(String action, byte[] requestBytes) { - this.action = action; - this.requestBytes = requestBytes; + public ExtensionActionRequest(String action, ByteString requestBytes) { + this.request = ExtensionTransportMessage.newBuilder().setAction(action).setRequestBytes(requestBytes).build(); } /** @@ -50,23 +44,21 @@ public ExtensionActionRequest(String action, byte[] requestBytes) { */ public ExtensionActionRequest(StreamInput in) throws IOException { super(in); - action = in.readString(); - requestBytes = in.readByteArray(); + this.request = ExtensionTransportMessage.parseFrom(in.readByteArray()); } public String getAction() { - return action; + return request.getAction(); } - public byte[] getRequestBytes() { - return requestBytes; + public ByteString getRequestBytes() { + return request.getRequestBytes(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(action); - out.writeByteArray(requestBytes); + out.writeByteArray(request.toByteArray()); } @Override diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java index 1b946d08f0459..d1ad4ebb7dfff 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java @@ -8,8 +8,10 @@ package org.opensearch.extensions.action; +import com.google.protobuf.ByteString; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.extensions.proto.ExtensionTransportMessageProto.ExtensionTransportMessage; import org.opensearch.transport.TransportRequest; import java.io.IOException; @@ -21,14 +23,7 @@ * @opensearch.api */ public class ExtensionHandleTransportRequest extends TransportRequest { - /** - * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. - */ - private final String action; - /** - * requestBytes is the raw bytes being transported between extensions. - */ - private final byte[] requestBytes; + private final ExtensionTransportMessage request; /** * ExtensionHandleTransportRequest constructor. @@ -36,9 +31,8 @@ public class ExtensionHandleTransportRequest extends TransportRequest { * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. * @param requestBytes is the raw bytes being transported between extensions. */ - public ExtensionHandleTransportRequest(String action, byte[] requestBytes) { - this.action = action; - this.requestBytes = requestBytes; + public ExtensionHandleTransportRequest(String action, ByteString requestBytes) { + this.request = ExtensionTransportMessage.newBuilder().setAction(action).setRequestBytes(requestBytes).build(); } /** @@ -49,28 +43,26 @@ public ExtensionHandleTransportRequest(String action, byte[] requestBytes) { */ public ExtensionHandleTransportRequest(StreamInput in) throws IOException { super(in); - this.action = in.readString(); - this.requestBytes = in.readByteArray(); + this.request = ExtensionTransportMessage.parseFrom(in.readByteArray()); } public String getAction() { - return this.action; + return this.request.getAction(); } - public byte[] getRequestBytes() { - return this.requestBytes; + public ByteString getRequestBytes() { + return this.request.getRequestBytes(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(action); - out.writeByteArray(requestBytes); + out.writeByteArray(request.toByteArray()); } @Override public String toString() { - return "ExtensionHandleTransportRequest{action=" + action + ", requestBytes=" + requestBytes + "}"; + return "ExtensionHandleTransportRequest{action=" + request.getAction() + ", requestBytes=" + request.getRequestBytes() + "}"; } @Override @@ -78,12 +70,13 @@ public boolean equals(Object obj) { if (this == obj) return true; if (obj == null || getClass() != obj.getClass()) return false; ExtensionHandleTransportRequest that = (ExtensionHandleTransportRequest) obj; - return Objects.equals(action, that.action) && Objects.equals(requestBytes, that.requestBytes); + return Objects.equals(request.getAction(), that.request.getAction()) + && Objects.equals(request.getRequestBytes(), that.request.getRequestBytes()); } @Override public int hashCode() { - return Objects.hash(action, requestBytes); + return Objects.hash(request.getAction(), request.getRequestBytes()); } } diff --git a/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java b/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java index 1f90d3224bb82..5338d5ae2fb15 100644 --- a/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java +++ b/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java @@ -8,10 +8,13 @@ package org.opensearch.extensions.action; +import com.google.protobuf.ByteString; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.extensions.proto.ExtensionIdentityProto.ExtensionIdentity; +import org.opensearch.extensions.proto.ExtensionTransportMessageProto.ExtensionTransportMessage; import java.io.IOException; import java.util.Objects; @@ -22,18 +25,8 @@ * @opensearch.api */ public class TransportActionRequestFromExtension extends ActionRequest { - /** - * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. - */ - private final String action; - /** - * requestBytes is the raw bytes being transported between extensions. - */ - private final byte[] requestBytes; - /** - * uniqueId to identify which extension is making a transport request call. - */ - private final String uniqueId; + private final ExtensionIdentity identity; + private final ExtensionTransportMessage request; /** * TransportActionRequestFromExtension constructor. @@ -42,10 +35,9 @@ public class TransportActionRequestFromExtension extends ActionRequest { * @param requestBytes is the raw bytes being transported between extensions. * @param uniqueId to identify which extension is making a transport request call. */ - public TransportActionRequestFromExtension(String action, byte[] requestBytes, String uniqueId) { - this.action = action; - this.requestBytes = requestBytes; - this.uniqueId = uniqueId; + public TransportActionRequestFromExtension(String action, ByteString requestBytes, String uniqueId) { + this.identity = ExtensionIdentity.newBuilder().setUniqueId(uniqueId).build(); + this.request = ExtensionTransportMessage.newBuilder().setAction(action).setRequestBytes(requestBytes).build(); } /** @@ -56,29 +48,27 @@ public TransportActionRequestFromExtension(String action, byte[] requestBytes, S */ public TransportActionRequestFromExtension(StreamInput in) throws IOException { super(in); - this.action = in.readString(); - this.requestBytes = in.readByteArray(); - this.uniqueId = in.readString(); + this.identity = ExtensionIdentity.parseFrom(in.readByteArray()); + this.request = ExtensionTransportMessage.parseFrom(in.readByteArray()); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(action); - out.writeByteArray(requestBytes); - out.writeString(uniqueId); + out.writeByteArray(identity.toByteArray()); + out.writeByteArray(request.toByteArray()); } public String getAction() { - return this.action; + return this.request.getAction(); } - public byte[] getRequestBytes() { - return this.requestBytes; + public ByteString getRequestBytes() { + return this.request.getRequestBytes(); } public String getUniqueId() { - return this.uniqueId; + return this.identity.getUniqueId(); } @Override @@ -88,7 +78,13 @@ public ActionRequestValidationException validate() { @Override public String toString() { - return "TransportActionRequestFromExtension{action=" + action + ", requestBytes=" + requestBytes + ", uniqueId=" + uniqueId + "}"; + return "TransportActionRequestFromExtension{action=" + + request.getAction() + + ", requestBytes=" + + request.getRequestBytes() + + ", uniqueId=" + + identity.getUniqueId() + + "}"; } @Override @@ -96,13 +92,13 @@ public boolean equals(Object obj) { if (this == obj) return true; if (obj == null || getClass() != obj.getClass()) return false; TransportActionRequestFromExtension that = (TransportActionRequestFromExtension) obj; - return Objects.equals(action, that.action) - && Objects.equals(requestBytes, that.requestBytes) - && Objects.equals(uniqueId, that.uniqueId); + return Objects.equals(request.getAction(), that.request.getAction()) + && Objects.equals(request.getRequestBytes(), that.request.getRequestBytes()) + && Objects.equals(identity.getUniqueId(), that.identity.getUniqueId()); } @Override public int hashCode() { - return Objects.hash(action, requestBytes, uniqueId); + return Objects.hash(request.getAction(), request.getRequestBytes(), identity.getUniqueId()); } } diff --git a/server/src/main/proto/extensions/HandleTransportRequestProto.proto b/server/src/main/proto/extensions/HandleTransportRequestProto.proto new file mode 100644 index 0000000000000..79ba80a6c2021 --- /dev/null +++ b/server/src/main/proto/extensions/HandleTransportRequestProto.proto @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +syntax = "proto3"; +package org.opensearch.extensions.proto; + +option java_outer_classname = "ExtensionTransportMessageProto"; + +message ExtensionTransportMessage { + string action = 1; + bytes requestBytes = 2; +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java index 2d4f2b5d8aa66..cf7c391abe4e5 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java @@ -8,18 +8,17 @@ package org.opensearch.extensions.action; +import com.google.protobuf.ByteString; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.test.OpenSearchTestCase; -import java.nio.charset.StandardCharsets; - public class ExtensionActionRequestTests extends OpenSearchTestCase { public void testExtensionActionRequest() throws Exception { String expectedAction = "test-action"; - byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ByteString expectedRequestBytes = ByteString.copyFromUtf8("request-bytes"); ExtensionActionRequest request = new ExtensionActionRequest(expectedAction, expectedRequestBytes); assertEquals(expectedAction, request.getAction()); @@ -31,7 +30,7 @@ public void testExtensionActionRequest() throws Exception { request = new ExtensionActionRequest(in); assertEquals(expectedAction, request.getAction()); - assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); + assertEquals(expectedRequestBytes, request.getRequestBytes()); assertNull(request.validate()); } } diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java index 15e7320ba7556..70b17273ef690 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java @@ -8,28 +8,28 @@ package org.opensearch.extensions.action; +import com.google.protobuf.ByteString; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.test.OpenSearchTestCase; -import java.nio.charset.StandardCharsets; - public class ExtensionHandleTransportRequestTests extends OpenSearchTestCase { public void testExtensionHandleTransportRequest() throws Exception { String expectedAction = "test-action"; - byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ByteString expectedRequestBytes = ByteString.copyFromUtf8("request-bytes"); ExtensionHandleTransportRequest request = new ExtensionHandleTransportRequest(expectedAction, expectedRequestBytes); assertEquals(expectedAction, request.getAction()); + logger.info(expectedRequestBytes); + logger.info(request.getRequestBytes()); assertEquals(expectedRequestBytes, request.getRequestBytes()); BytesStreamOutput out = new BytesStreamOutput(); request.writeTo(out); BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); request = new ExtensionHandleTransportRequest(in); - + assertEquals(expectedRequestBytes, request.getRequestBytes()); assertEquals(expectedAction, request.getAction()); - assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); } } diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java index 0f3aab3644489..f5053f0ebf01d 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -8,6 +8,7 @@ package org.opensearch.extensions.action; +import com.google.protobuf.ByteString; import org.junit.After; import org.junit.Before; import org.opensearch.Version; @@ -36,7 +37,6 @@ import org.opensearch.transport.nio.MockNioTransport; import java.net.InetAddress; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -148,7 +148,7 @@ public void testRegisterTransportActionsRequest() { public void testTransportActionRequestFromExtension() throws Exception { String action = "test-action"; - byte[] requestBytes = "requestBytes".getBytes(StandardCharsets.UTF_8); + ByteString requestBytes = ByteString.copyFromUtf8("requestBytes"); TransportActionRequestFromExtension request = new TransportActionRequestFromExtension(action, requestBytes, "uniqueid1"); RemoteExtensionActionResponse response = extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request); assertFalse(response.isSuccess()); @@ -158,7 +158,7 @@ public void testTransportActionRequestFromExtension() throws Exception { public void testSendTransportRequestToExtension() throws InterruptedException { String action = "test-action"; - byte[] requestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ByteString requestBytes = ByteString.copyFromUtf8("requestBytes"); ExtensionActionRequest request = new ExtensionActionRequest(action, requestBytes); // Action not registered, expect exception diff --git a/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java b/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java index a8ef5372800d9..c002dfe135e36 100644 --- a/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java @@ -8,17 +8,16 @@ package org.opensearch.extensions.action; +import com.google.protobuf.ByteString; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.test.OpenSearchTestCase; -import java.nio.charset.StandardCharsets; - public class TransportActionRequestFromExtensionTests extends OpenSearchTestCase { public void testTransportActionRequestFromExtension() throws Exception { String expectedAction = "test-action"; - byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ByteString expectedRequestBytes = ByteString.copyFromUtf8("request-bytes"); String uniqueId = "test-uniqueId"; TransportActionRequestFromExtension request = new TransportActionRequestFromExtension( expectedAction, @@ -36,7 +35,7 @@ public void testTransportActionRequestFromExtension() throws Exception { request = new TransportActionRequestFromExtension(in); assertEquals(expectedAction, request.getAction()); - assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); + assertEquals(expectedRequestBytes, request.getRequestBytes()); assertEquals(uniqueId, request.getUniqueId()); } }