Skip to content

Commit

Permalink
Adding support for more than one protocol for transport
Browse files Browse the repository at this point in the history
Signed-off-by: Vacha Shah <[email protected]>
  • Loading branch information
VachaShah committed Mar 28, 2024
1 parent 6ddbdcd commit c405882
Show file tree
Hide file tree
Showing 10 changed files with 633 additions and 436 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

import org.opensearch.common.annotation.ExperimentalApi;

/**
* Base class for inbound data as a message.
* Different implementations are used for different protocols.
*
* @opensearch.internal
*/
@ExperimentalApi
public interface BaseInboundMessage {

/**
* The protocol used to encode this message
*/
static String NATIVE_PROTOCOL = "native";
static String PROTOBUF_PROTOCOL = "protobuf";

/**
* @return the protocol used to encode this message
*/
public String getProtocol();

/**
* Set the protocol used to encode this message
*/
public void setProtocol();
}
406 changes: 18 additions & 388 deletions server/src/main/java/org/opensearch/transport/InboundHandler.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class InboundMessage implements Releasable {
public class InboundMessage implements Releasable, BaseInboundMessage {

private final Header header;
private final ReleasableBytesReference content;
private final Exception exception;
private final boolean isPing;
private Releasable breakerRelease;
private StreamInput streamInput;
private String protocol;

public InboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) {
this.header = header;
Expand Down Expand Up @@ -133,4 +134,14 @@ public void close() {
public String toString() {
return "InboundMessage{" + header + "}";
}

@Override
public String getProtocol() {
return NATIVE_PROTOCOL;
}

@Override
public void setProtocol() {
this.protocol = NATIVE_PROTOCOL;
}
}
65 changes: 37 additions & 28 deletions server/src/main/java/org/opensearch/transport/InboundPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ public class InboundPipeline implements Releasable {
private final StatsTracker statsTracker;
private final InboundDecoder decoder;
private final InboundAggregator aggregator;
private final BiConsumer<TcpChannel, InboundMessage> messageHandler;
private final BiConsumer<TcpChannel, BaseInboundMessage> messageHandler;
private Exception uncaughtException;
private final ArrayDeque<ReleasableBytesReference> pending = new ArrayDeque<>(2);
private boolean isClosed = false;
private Version version;

public InboundPipeline(
Version version,
Expand All @@ -74,14 +75,15 @@ public InboundPipeline(
LongSupplier relativeTimeInMillis,
Supplier<CircuitBreaker> circuitBreaker,
Function<String, RequestHandlerRegistry<TransportRequest>> registryFunction,
BiConsumer<TcpChannel, InboundMessage> messageHandler
BiConsumer<TcpChannel, BaseInboundMessage> messageHandler
) {
this(
statsTracker,
relativeTimeInMillis,
new InboundDecoder(version, recycler),
new InboundAggregator(circuitBreaker, registryFunction),
messageHandler
messageHandler,
version
);
}

Expand All @@ -90,13 +92,15 @@ public InboundPipeline(
LongSupplier relativeTimeInMillis,
InboundDecoder decoder,
InboundAggregator aggregator,
BiConsumer<TcpChannel, InboundMessage> messageHandler
BiConsumer<TcpChannel, BaseInboundMessage> messageHandler,
Version version
) {
this.relativeTimeInMillis = relativeTimeInMillis;
this.statsTracker = statsTracker;
this.decoder = decoder;
this.aggregator = aggregator;
this.messageHandler = messageHandler;
this.version = version;
}

@Override
Expand Down Expand Up @@ -124,37 +128,42 @@ public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference
statsTracker.markBytesRead(reference.length());
pending.add(reference.retain());

final ArrayList<Object> fragments = fragmentList.get();
boolean continueHandling = true;

while (continueHandling && isClosed == false) {
boolean continueDecoding = true;
while (continueDecoding && pending.isEmpty() == false) {
try (ReleasableBytesReference toDecode = getPendingBytes()) {
final int bytesDecoded = decoder.decode(toDecode, fragments::add);
if (bytesDecoded != 0) {
releasePendingBytes(bytesDecoded);
if (fragments.isEmpty() == false && endOfMessage(fragments.get(fragments.size() - 1))) {
String incomingMessageProtocol = TcpTransport.determineTransportProtocol(reference);
if (incomingMessageProtocol.equals(BaseInboundMessage.PROTOBUF_PROTOCOL) && this.version.onOrAfter(Version.V_3_0_0)) {
// protobuf messages logic can be added here
} else {
final ArrayList<Object> fragments = fragmentList.get();
boolean continueHandling = true;

while (continueHandling && isClosed == false) {
boolean continueDecoding = true;
while (continueDecoding && pending.isEmpty() == false) {
try (ReleasableBytesReference toDecode = getPendingBytes()) {
final int bytesDecoded = decoder.decode(toDecode, fragments::add);
if (bytesDecoded != 0) {
releasePendingBytes(bytesDecoded);
if (fragments.isEmpty() == false && endOfMessage(fragments.get(fragments.size() - 1))) {
continueDecoding = false;
}
} else {
continueDecoding = false;
}
} else {
continueDecoding = false;
}
}
}

if (fragments.isEmpty()) {
continueHandling = false;
} else {
try {
forwardFragments(channel, fragments);
} finally {
for (Object fragment : fragments) {
if (fragment instanceof ReleasableBytesReference) {
((ReleasableBytesReference) fragment).close();
if (fragments.isEmpty()) {
continueHandling = false;
} else {
try {
forwardFragments(channel, fragments);
} finally {
for (Object fragment : fragments) {
if (fragment instanceof ReleasableBytesReference) {
((ReleasableBytesReference) fragment).close();
}
}
fragments.clear();
}
fragments.clear();
}
}
}
Expand Down
Loading

0 comments on commit c405882

Please sign in to comment.