From af94d0eae5ccc066be765c6d8a4f4332da2b49ab Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 31 Aug 2023 16:27:00 -0400 Subject: [PATCH] [Streaming Indexing] Introduce new experimental HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + buildSrc/version.properties | 4 + .../netty4/Netty4HttpServerTransport.java | 14 + .../licenses/reactive-streams-1.0.4.jar.sha1 | 1 - .../licenses/reactive-streams-LICENSE.txt | 21 -- plugins/repository-azure/build.gradle | 22 +- .../licenses/reactive-streams-1.0.4.jar.sha1 | 1 - .../licenses/reactive-streams-LICENSE.txt | 21 -- .../licenses/reactor-core-3.5.6.jar.sha1 | 1 - .../licenses/reactor-netty-1.1.8.jar.sha1 | 1 - .../reactor-netty-core-1.1.10.jar.sha1 | 1 + .../reactor-netty-core-1.1.8.jar.sha1 | 1 - .../reactor-netty-http-1.1.10.jar.sha1 | 1 + .../reactor-netty-http-1.1.9.jar.sha1 | 1 - plugins/repository-s3/build.gradle | 1 - .../licenses/reactive-streams-1.0.4.jar.sha1 | 1 - .../licenses/reactive-streams-LICENSE.txt | 21 -- .../licenses/reactive-streams-NOTICE.txt | 0 plugins/transport-reactor-netty4/build.gradle | 260 +++++++++++++++++ .../licenses/netty-LICENSE.txt | 202 +++++++++++++ .../licenses/netty-NOTICE.txt | 116 ++++++++ .../netty-buffer-4.1.97.Final.jar.sha1 | 1 + .../netty-codec-4.1.97.Final.jar.sha1 | 1 + .../netty-codec-dns-4.1.97.Final.jar.sha1 | 1 + .../netty-codec-http-4.1.97.Final.jar.sha1 | 1 + .../netty-codec-http2-4.1.97.Final.jar.sha1 | 1 + .../netty-common-4.1.97.Final.jar.sha1 | 1 + .../netty-handler-4.1.97.Final.jar.sha1 | 1 + .../netty-resolver-4.1.97.Final.jar.sha1 | 1 + .../netty-resolver-dns-4.1.97.Final.jar.sha1 | 1 + .../netty-transport-4.1.97.Final.jar.sha1 | 1 + ...t-native-unix-common-4.1.97.Final.jar.sha1 | 1 + .../licenses/reactor-LICENSE.txt | 201 +++++++++++++ .../licenses/reactor-NOTICE.txt} | 0 .../reactor-netty-core-1.1.10.jar.sha1 | 1 + .../reactor-netty-http-1.1.10.jar.sha1 | 1 + .../reactor/netty4/HttpConversionUtil.java | 47 +++ .../netty4/NonStreamingHttpChannel.java | 76 +++++ .../netty4/NonStreamingRequestConsumer.java | 84 ++++++ .../netty4/ReactorNetty4HttpRequest.java | 272 ++++++++++++++++++ .../netty4/ReactorNetty4HttpResponse.java | 42 +++ .../ReactorNetty4HttpServerChannel.java | 53 ++++ .../ReactorNetty4HttpServerTransport.java | 170 +++++++++++ .../http/reactor/netty4/package-info.java | 12 + .../reactor/ReactorNetty4Plugin.java | 110 +++++++ .../transport/reactor/SharedGroupFactory.java | 164 +++++++++++ .../transport/reactor/netty4/Netty4Utils.java | 142 +++++++++ .../netty4/ReactorNetty4Transport.java | 35 +++ .../reactor/netty4/package-info.java | 12 + .../transport/reactor/package-info.java | 12 + .../plugin-metadata/plugin-security.policy | 24 ++ server/build.gradle | 8 +- .../licenses/reactive-streams-1.0.4.jar.sha1 | 0 .../licenses/reactive-streams-LICENSE.txt | 0 .../licenses/reactive-streams-NOTICE.txt | 0 server/licenses/reactor-LICENSE.txt | 201 +++++++++++++ .../licenses/reactor-NOTICE.txt | 0 server/licenses/reactor-core-3.5.9.jar.sha1 | 1 + .../java/org/opensearch/http/HttpChunk.java | 35 +++ 59 files changed, 2316 insertions(+), 90 deletions(-) delete mode 100644 plugins/discovery-ec2/licenses/reactive-streams-1.0.4.jar.sha1 delete mode 100644 plugins/discovery-ec2/licenses/reactive-streams-LICENSE.txt delete mode 100644 plugins/repository-azure/licenses/reactive-streams-1.0.4.jar.sha1 delete mode 100644 plugins/repository-azure/licenses/reactive-streams-LICENSE.txt delete mode 100644 plugins/repository-azure/licenses/reactor-core-3.5.6.jar.sha1 delete mode 100644 plugins/repository-azure/licenses/reactor-netty-1.1.8.jar.sha1 create mode 100644 plugins/repository-azure/licenses/reactor-netty-core-1.1.10.jar.sha1 delete mode 100644 plugins/repository-azure/licenses/reactor-netty-core-1.1.8.jar.sha1 create mode 100644 plugins/repository-azure/licenses/reactor-netty-http-1.1.10.jar.sha1 delete mode 100644 plugins/repository-azure/licenses/reactor-netty-http-1.1.9.jar.sha1 delete mode 100644 plugins/repository-s3/licenses/reactive-streams-1.0.4.jar.sha1 delete mode 100644 plugins/repository-s3/licenses/reactive-streams-LICENSE.txt delete mode 100644 plugins/repository-s3/licenses/reactive-streams-NOTICE.txt create mode 100644 plugins/transport-reactor-netty4/build.gradle create mode 100644 plugins/transport-reactor-netty4/licenses/netty-LICENSE.txt create mode 100644 plugins/transport-reactor-netty4/licenses/netty-NOTICE.txt create mode 100644 plugins/transport-reactor-netty4/licenses/netty-buffer-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-codec-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-codec-dns-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-codec-http-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-codec-http2-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-common-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-handler-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-resolver-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-resolver-dns-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-transport-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/netty-transport-native-unix-common-4.1.97.Final.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/reactor-LICENSE.txt rename plugins/{crypto-kms/licenses/reactive-streams-NOTICE.txt => transport-reactor-netty4/licenses/reactor-NOTICE.txt} (100%) create mode 100644 plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.10.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.10.jar.sha1 create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpConversionUtil.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpResponse.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/package-info.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/SharedGroupFactory.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/ReactorNetty4Transport.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/package-info.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/package-info.java create mode 100644 plugins/transport-reactor-netty4/src/main/plugin-metadata/plugin-security.policy rename {plugins/crypto-kms => server}/licenses/reactive-streams-1.0.4.jar.sha1 (100%) rename {plugins/crypto-kms => server}/licenses/reactive-streams-LICENSE.txt (100%) rename {plugins/discovery-ec2 => server}/licenses/reactive-streams-NOTICE.txt (100%) create mode 100644 server/licenses/reactor-LICENSE.txt rename plugins/repository-azure/licenses/reactive-streams-NOTICE.txt => server/licenses/reactor-NOTICE.txt (100%) create mode 100644 server/licenses/reactor-core-3.5.9.jar.sha1 create mode 100644 server/src/main/java/org/opensearch/http/HttpChunk.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 685daf7cf46fd..5b8a9c9bff1d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- [Streaming Indexing] Introduce new experimental HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672)) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 500727e909494..59c317acdfd2f 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -31,6 +31,10 @@ jna = 5.5.0 netty = 4.1.97.Final joda = 2.12.2 +# project reactor +reactor_netty = 1.1.10 +reactor = 3.5.9 + # client dependencies httpclient5 = 5.2.1 httpcore5 = 5.2.2 diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index 0271472125814..31a2f251683a4 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -116,6 +116,9 @@ import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE; import static org.opensearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; +/** + * The HTTP transport implementations based on Netty 4. + */ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private static final Logger logger = LogManager.getLogger(Netty4HttpServerTransport.class); @@ -184,6 +187,17 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private volatile ServerBootstrap serverBootstrap; private volatile SharedGroupFactory.SharedGroup sharedGroup; + /** + * Creates new HTTP transport implementations based on Netty 4 + * @param settings seetings + * @param networkService network service + * @param bigArrays big array allocator + * @param threadPool thread pool instance + * @param xContentRegistry XContent registry instance + * @param dispatcher dispatcher instance + * @param clusterSettings cluster settings + * @param sharedGroupFactory shared group factory + */ public Netty4HttpServerTransport( Settings settings, NetworkService networkService, diff --git a/plugins/discovery-ec2/licenses/reactive-streams-1.0.4.jar.sha1 b/plugins/discovery-ec2/licenses/reactive-streams-1.0.4.jar.sha1 deleted file mode 100644 index 45a80e3f7e361..0000000000000 --- a/plugins/discovery-ec2/licenses/reactive-streams-1.0.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -3864a1320d97d7b045f729a326e1e077661f31b7 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/reactive-streams-LICENSE.txt b/plugins/discovery-ec2/licenses/reactive-streams-LICENSE.txt deleted file mode 100644 index 1e3c7e7c77495..0000000000000 --- a/plugins/discovery-ec2/licenses/reactive-streams-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -MIT No Attribution - -Copyright 2014 Reactive Streams - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 26e2b4813b8a5..51f2057b4bedb 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -56,11 +56,8 @@ dependencies { api "io.netty:netty-transport-native-unix-common:${versions.netty}" implementation project(':modules:transport-netty4') api 'com.azure:azure-storage-blob:12.23.0' - api 'org.reactivestreams:reactive-streams:1.0.4' - api 'io.projectreactor:reactor-core:3.5.6' - api 'io.projectreactor.netty:reactor-netty:1.1.8' - api 'io.projectreactor.netty:reactor-netty-core:1.1.8' - api 'io.projectreactor.netty:reactor-netty-http:1.1.9' + api "io.projectreactor.netty:reactor-netty-core:${versions.reactor_netty}" + api "io.projectreactor.netty:reactor-netty-http:${versions.reactor_netty}" api "org.slf4j:slf4j-api:${versions.slf4j}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" @@ -101,10 +98,6 @@ thirdPartyAudit { 'com.azure.storage.internal.avro.implementation.schema.AvroSchema', 'com.ctc.wstx.shaded.msv_core.driver.textui.Driver', 'io.micrometer.context.ContextAccessor', - 'io.micrometer.context.ContextRegistry', - 'io.micrometer.context.ContextSnapshot', - 'io.micrometer.context.ContextSnapshot$Scope', - 'io.micrometer.core.instrument.Clock', 'io.micrometer.core.instrument.Counter', 'io.micrometer.core.instrument.Counter$Builder', 'io.micrometer.core.instrument.DistributionSummary', @@ -114,14 +107,10 @@ thirdPartyAudit { 'io.micrometer.core.instrument.Meter', 'io.micrometer.core.instrument.MeterRegistry', 'io.micrometer.core.instrument.Metrics', - 'io.micrometer.core.instrument.Tag', - 'io.micrometer.core.instrument.Tags', 'io.micrometer.core.instrument.Timer', 'io.micrometer.core.instrument.Timer$Builder', 'io.micrometer.core.instrument.Timer$Sample', - 'io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics', 'io.micrometer.core.instrument.composite.CompositeMeterRegistry', - 'io.micrometer.core.instrument.search.Search', 'io.netty.channel.epoll.Epoll', 'io.netty.channel.epoll.EpollDatagramChannel', 'io.netty.channel.epoll.EpollServerSocketChannel', @@ -168,9 +157,6 @@ thirdPartyAudit { 'org.slf4j.impl.StaticLoggerBinder', 'org.slf4j.impl.StaticMDCBinder', 'org.slf4j.impl.StaticMarkerBinder', - 'reactor.blockhound.BlockHound$Builder', - 'reactor.blockhound.integration.BlockHoundIntegration', - 'io.micrometer.context.ThreadLocalAccessor', 'io.micrometer.common.KeyValue', 'io.micrometer.common.KeyValues', 'io.micrometer.common.docs.KeyName', @@ -190,6 +176,7 @@ thirdPartyAudit { 'io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler', 'io.micrometer.tracing.propagation.Propagator', 'io.micrometer.core.instrument.observation.MeterObservationHandler', + 'io.micrometer.core.instrument.Tags', 'io.micrometer.observation.ObservationHandler', 'io.micrometer.observation.ObservationRegistry', 'io.micrometer.observation.ObservationRegistry$ObservationConfig', @@ -210,8 +197,7 @@ thirdPartyAudit { 'com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper$1', 'com.google.common.hash.LittleEndianByteArray$UnsafeByteArray', 'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator', - 'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1', - 'reactor.core.publisher.Traces$SharedSecretsCallSiteSupplierFactory$TracingException' + 'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1' ) } diff --git a/plugins/repository-azure/licenses/reactive-streams-1.0.4.jar.sha1 b/plugins/repository-azure/licenses/reactive-streams-1.0.4.jar.sha1 deleted file mode 100644 index 45a80e3f7e361..0000000000000 --- a/plugins/repository-azure/licenses/reactive-streams-1.0.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -3864a1320d97d7b045f729a326e1e077661f31b7 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactive-streams-LICENSE.txt b/plugins/repository-azure/licenses/reactive-streams-LICENSE.txt deleted file mode 100644 index 1e3c7e7c77495..0000000000000 --- a/plugins/repository-azure/licenses/reactive-streams-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -MIT No Attribution - -Copyright 2014 Reactive Streams - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-core-3.5.6.jar.sha1 b/plugins/repository-azure/licenses/reactor-core-3.5.6.jar.sha1 deleted file mode 100644 index ad9b7263e7b38..0000000000000 --- a/plugins/repository-azure/licenses/reactor-core-3.5.6.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -027fdc551537b349389176a23a192f11a7a3d7de \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-1.1.8.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-1.1.8.jar.sha1 deleted file mode 100644 index 6b6bf1903b16c..0000000000000 --- a/plugins/repository-azure/licenses/reactor-netty-1.1.8.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -d53a9d7d0395285f4c81664494fcd61477626e32 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-core-1.1.10.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-core-1.1.10.jar.sha1 new file mode 100644 index 0000000000000..4f826a379f33c --- /dev/null +++ b/plugins/repository-azure/licenses/reactor-netty-core-1.1.10.jar.sha1 @@ -0,0 +1 @@ +0c2eadc75339948e71b24f95e944d75c0f74aa6f \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-core-1.1.8.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-core-1.1.8.jar.sha1 deleted file mode 100644 index 707631f4dfe0c..0000000000000 --- a/plugins/repository-azure/licenses/reactor-netty-core-1.1.8.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -48999c4ae27cdcee5eaff9dfd150a8b64624f0f5 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-http-1.1.10.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-http-1.1.10.jar.sha1 new file mode 100644 index 0000000000000..b0cb32358b481 --- /dev/null +++ b/plugins/repository-azure/licenses/reactor-netty-http-1.1.10.jar.sha1 @@ -0,0 +1 @@ +9804370c5833843922c9a5f4e2472a585197ddee \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-http-1.1.9.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-http-1.1.9.jar.sha1 deleted file mode 100644 index 96deead2c75d1..0000000000000 --- a/plugins/repository-azure/licenses/reactor-netty-http-1.1.9.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -408b3037133f2e8ab0f195ccd3f807026be9b860 \ No newline at end of file diff --git a/plugins/repository-s3/build.gradle b/plugins/repository-s3/build.gradle index 44fd45b265e82..560d12d14395d 100644 --- a/plugins/repository-s3/build.gradle +++ b/plugins/repository-s3/build.gradle @@ -70,7 +70,6 @@ dependencies { api "software.amazon.awssdk:sts:${versions.aws}" api "software.amazon.awssdk:netty-nio-client:${versions.aws}" - api "org.reactivestreams:reactive-streams:${versions.reactivestreams}" api "org.apache.httpcomponents:httpclient:${versions.httpclient}" api "org.apache.httpcomponents:httpcore:${versions.httpcore}" api "commons-logging:commons-logging:${versions.commonslogging}" diff --git a/plugins/repository-s3/licenses/reactive-streams-1.0.4.jar.sha1 b/plugins/repository-s3/licenses/reactive-streams-1.0.4.jar.sha1 deleted file mode 100644 index 45a80e3f7e361..0000000000000 --- a/plugins/repository-s3/licenses/reactive-streams-1.0.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -3864a1320d97d7b045f729a326e1e077661f31b7 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/reactive-streams-LICENSE.txt b/plugins/repository-s3/licenses/reactive-streams-LICENSE.txt deleted file mode 100644 index 1e3c7e7c77495..0000000000000 --- a/plugins/repository-s3/licenses/reactive-streams-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -MIT No Attribution - -Copyright 2014 Reactive Streams - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/plugins/repository-s3/licenses/reactive-streams-NOTICE.txt b/plugins/repository-s3/licenses/reactive-streams-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/plugins/transport-reactor-netty4/build.gradle b/plugins/transport-reactor-netty4/build.gradle new file mode 100644 index 0000000000000..855e1c174fca5 --- /dev/null +++ b/plugins/transport-reactor-netty4/build.gradle @@ -0,0 +1,260 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +import org.opensearch.gradle.info.BuildParams +import org.opensearch.gradle.test.RestIntegTestTask +import org.opensearch.gradle.test.TestTask +import org.opensearch.gradle.test.rest.JavaRestTestPlugin +import org.opensearch.gradle.test.InternalClusterTestPlugin + +apply plugin: 'opensearch.yaml-rest-test' +apply plugin: 'opensearch.java-rest-test' +apply plugin: 'opensearch.internal-cluster-test' + +// The transport-reactor-netty4 plugin is published to maven +apply plugin: 'opensearch.publish' + +opensearchplugin { + description 'Reactor Netty 4 based transport implementation' + classname 'org.opensearch.transport.ReactorNetty4ModulePlugin' + hasClientJar = true +} + +dependencies { + // network stack + api "io.netty:netty-buffer:${versions.netty}" + api "io.netty:netty-codec:${versions.netty}" + api "io.netty:netty-codec-dns:${versions.netty}" + api "io.netty:netty-codec-http:${versions.netty}" + api "io.netty:netty-codec-http2:${versions.netty}" + api "io.netty:netty-common:${versions.netty}" + api "io.netty:netty-handler:${versions.netty}" + api "io.netty:netty-resolver-dns:${versions.netty}" + api "io.netty:netty-resolver:${versions.netty}" + api "io.netty:netty-transport:${versions.netty}" + api "io.netty:netty-transport-native-unix-common:${versions.netty}" + + api "io.projectreactor.netty:reactor-netty-http:${versions.reactor_netty}" + api "io.projectreactor.netty:reactor-netty-core:${versions.reactor_netty}" +} + +restResources { + restApi { + includeCore '_common', 'cluster', 'nodes' + } +} + +tasks.named("dependencyLicenses").configure { + mapping from: /netty-.*/, to: 'netty' + mapping from: /reactor-.*/, to: 'reactor' +} + +// TODO: Remove that once we have a complete test suite +testingConventions.enabled = false + +test { + /* + * We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each + * other if we allow them to set the number of available processors as it's set-once in Netty. + */ + systemProperty 'opensearch.set.netty.runtime.available.processors', 'false' +} + +internalClusterTest { + systemProperty 'opensearch.set.netty.runtime.available.processors', 'false' +} + +javaRestTest { + systemProperty 'opensearch.set.netty.runtime.available.processors', 'false' +} + +thirdPartyAudit { + ignoreMissingClasses( + 'com.aayushatharva.brotli4j.Brotli4jLoader', + 'com.aayushatharva.brotli4j.decoder.DecoderJNI$Status', + 'com.aayushatharva.brotli4j.decoder.DecoderJNI$Wrapper', + 'com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel', + 'com.aayushatharva.brotli4j.encoder.Encoder$Mode', + 'com.aayushatharva.brotli4j.encoder.Encoder$Parameters', + // classes are missing + + // from io.netty.logging.CommonsLoggerFactory (netty) + 'org.apache.commons.logging.Log', + 'org.apache.commons.logging.LogFactory', + + // from Log4j (deliberate, Netty will fallback to Log4j 2) + 'org.apache.log4j.Level', + 'org.apache.log4j.Logger', + + // from io.netty.handler.ssl.OpenSslEngine (netty) + 'io.netty.internal.tcnative.Buffer', + 'io.netty.internal.tcnative.CertificateCompressionAlgo', + 'io.netty.internal.tcnative.Library', + 'io.netty.internal.tcnative.SSL', + 'io.netty.internal.tcnative.SSLContext', + 'io.netty.internal.tcnative.SSLPrivateKeyMethod', + + // from io.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty) + 'org.bouncycastle.cert.X509v3CertificateBuilder', + 'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter', + 'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder', + 'org.bouncycastle.openssl.PEMEncryptedKeyPair', + 'org.bouncycastle.openssl.PEMParser', + 'org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter', + 'org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder', + 'org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder', + 'org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo', + + // from io.netty.handler.ssl.JettyNpnSslEngine (netty) + 'org.eclipse.jetty.npn.NextProtoNego$ClientProvider', + 'org.eclipse.jetty.npn.NextProtoNego$ServerProvider', + 'org.eclipse.jetty.npn.NextProtoNego', + + // from io.netty.handler.codec.marshalling.ChannelBufferByteInput (netty) + 'org.jboss.marshalling.ByteInput', + + // from io.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty) + 'org.jboss.marshalling.ByteOutput', + + // from io.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty) + 'org.jboss.marshalling.Marshaller', + + // from io.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty) + 'org.jboss.marshalling.MarshallerFactory', + 'org.jboss.marshalling.MarshallingConfiguration', + 'org.jboss.marshalling.Unmarshaller', + + // from io.netty.util.internal.logging.InternalLoggerFactory (netty) - it's optional + 'org.slf4j.helpers.FormattingTuple', + 'org.slf4j.helpers.MessageFormatter', + 'org.slf4j.Logger', + 'org.slf4j.LoggerFactory', + 'org.slf4j.spi.LocationAwareLogger', + + 'com.google.protobuf.nano.CodedOutputByteBufferNano', + 'com.google.protobuf.nano.MessageNano', + 'com.ning.compress.BufferRecycler', + 'com.ning.compress.lzf.ChunkDecoder', + 'com.ning.compress.lzf.ChunkEncoder', + 'com.ning.compress.lzf.LZFChunk', + 'com.ning.compress.lzf.LZFEncoder', + 'com.ning.compress.lzf.util.ChunkDecoderFactory', + 'com.ning.compress.lzf.util.ChunkEncoderFactory', + 'lzma.sdk.lzma.Encoder', + 'net.jpountz.lz4.LZ4Compressor', + 'net.jpountz.lz4.LZ4Factory', + 'net.jpountz.lz4.LZ4FastDecompressor', + 'net.jpountz.xxhash.XXHash32', + 'net.jpountz.xxhash.XXHashFactory', + 'io.netty.internal.tcnative.AsyncSSLPrivateKeyMethod', + 'io.netty.internal.tcnative.AsyncTask', + 'io.netty.internal.tcnative.CertificateCallback', + 'io.netty.internal.tcnative.CertificateVerifier', + 'io.netty.internal.tcnative.ResultCallback', + 'io.netty.internal.tcnative.SessionTicketKey', + 'io.netty.internal.tcnative.SniHostNameMatcher', + 'io.netty.internal.tcnative.SSL', + 'io.netty.internal.tcnative.SSLSession', + 'io.netty.internal.tcnative.SSLSessionCache', + 'io.netty.channel.epoll.Epoll', + 'io.netty.channel.epoll.EpollDatagramChannel', + 'io.netty.channel.epoll.EpollServerSocketChannel', + 'io.netty.channel.epoll.EpollSocketChannel', + 'io.netty.channel.kqueue.KQueue', + 'io.netty.channel.kqueue.KQueueDatagramChannel', + 'io.netty.channel.kqueue.KQueueServerSocketChannel', + 'io.netty.channel.kqueue.KQueueSocketChannel', + 'io.netty.handler.codec.haproxy.HAProxyMessage', + 'io.netty.handler.codec.haproxy.HAProxyMessageDecoder', + 'io.netty.handler.proxy.ProxyHandler', + 'io.netty.incubator.channel.uring.IOUring', + 'io.netty.incubator.channel.uring.IOUringDatagramChannel', + 'io.netty.incubator.channel.uring.IOUringServerSocketChannel', + 'io.netty.incubator.channel.uring.IOUringSocketChannel', + + 'org.eclipse.jetty.alpn.ALPN$ClientProvider', + 'org.eclipse.jetty.alpn.ALPN$ServerProvider', + 'org.eclipse.jetty.alpn.ALPN', + + 'org.conscrypt.AllocatedBuffer', + 'org.conscrypt.BufferAllocator', + 'org.conscrypt.Conscrypt', + 'org.conscrypt.HandshakeListener', + + 'reactor.blockhound.BlockHound$Builder', + 'reactor.blockhound.integration.BlockHoundIntegration', + + 'io.micrometer.common.KeyValue', + 'io.micrometer.common.KeyValues', + 'io.micrometer.common.docs.KeyName', + 'io.micrometer.context.ContextAccessor', + 'io.micrometer.core.instrument.Counter', + 'io.micrometer.core.instrument.Counter$Builder', + 'io.micrometer.core.instrument.DistributionSummary', + 'io.micrometer.core.instrument.DistributionSummary$Builder', + 'io.micrometer.core.instrument.Gauge', + 'io.micrometer.core.instrument.Gauge$Builder', + 'io.micrometer.core.instrument.Meter', + 'io.micrometer.core.instrument.Meter$Type', + 'io.micrometer.core.instrument.MeterRegistry', + 'io.micrometer.core.instrument.Metrics', + 'io.micrometer.core.instrument.Tags', + 'io.micrometer.core.instrument.Timer', + 'io.micrometer.core.instrument.Timer$Builder', + 'io.micrometer.core.instrument.Timer$Sample', + 'io.micrometer.core.instrument.composite.CompositeMeterRegistry', + 'io.micrometer.core.instrument.docs.MeterDocumentation', + 'io.micrometer.core.instrument.observation.MeterObservationHandler', + 'io.micrometer.observation.Observation', + 'io.micrometer.observation.Observation$Context', + 'io.micrometer.observation.ObservationHandler', + 'io.micrometer.observation.ObservationRegistry', + 'io.micrometer.observation.ObservationRegistry$ObservationConfig', + 'io.micrometer.observation.docs.ObservationDocumentation', + 'io.micrometer.observation.transport.ReceiverContext', + 'io.micrometer.observation.transport.RequestReplyReceiverContext', + 'io.micrometer.observation.transport.RequestReplySenderContext', + 'io.micrometer.observation.transport.SenderContext', + 'io.micrometer.tracing.Span', + 'io.micrometer.tracing.Tracer', + 'io.micrometer.tracing.docs.SpanDocumentation', + 'io.micrometer.tracing.handler.DefaultTracingObservationHandler', + 'io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler', + 'io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler', + 'io.micrometer.tracing.propagation.Propagator' + ) + + ignoreViolations( + 'io.netty.util.internal.PlatformDependent0', + 'io.netty.util.internal.PlatformDependent0$1', + 'io.netty.util.internal.PlatformDependent0$2', + 'io.netty.util.internal.PlatformDependent0$3', + 'io.netty.util.internal.PlatformDependent0$4', + 'io.netty.util.internal.PlatformDependent0$6', + 'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef', + 'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef', + 'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields', + 'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields', + 'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields', + 'io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode', + 'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField', + 'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField', + 'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField', + 'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess', + 'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4', + 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5' + ) +} diff --git a/plugins/transport-reactor-netty4/licenses/netty-LICENSE.txt b/plugins/transport-reactor-netty4/licenses/netty-LICENSE.txt new file mode 100644 index 0000000000000..d645695673349 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/plugins/transport-reactor-netty4/licenses/netty-NOTICE.txt b/plugins/transport-reactor-netty4/licenses/netty-NOTICE.txt new file mode 100644 index 0000000000000..5bbf91a14de23 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-NOTICE.txt @@ -0,0 +1,116 @@ + + The Netty Project + ================= + +Please visit the Netty web site for more information: + + * http://netty.io/ + +Copyright 2011 The Netty Project + +The Netty Project licenses this file to you under the Apache License, +version 2.0 (the "License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at: + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations +under the License. + +Also, please refer to each LICENSE..txt file, which is located in +the 'license' directory of the distribution file, for the license terms of the +components that this product depends on. + +------------------------------------------------------------------------------- +This product contains the extensions to Java Collections Framework which has +been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + +This product contains a modified version of Robert Harder's Public Domain +Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + +This product contains a modified version of 'JZlib', a re-implementation of +zlib in pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD Style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + +This product contains a modified version of 'Webbit', a Java event based +WebSocket and HTTP server: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + +This product optionally depends on 'Protocol Buffers', Google's data +interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * http://code.google.com/p/protobuf/ + +This product optionally depends on 'Bouncy Castle Crypto APIs' to generate +a temporary self-signed X.509 certificate when the JVM does not provide the +equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * http://www.bouncycastle.org/ + +This product optionally depends on 'SLF4J', a simple logging facade for Java, +which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * http://www.slf4j.org/ + +This product optionally depends on 'Apache Commons Logging', a logging +framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * http://commons.apache.org/logging/ + +This product optionally depends on 'Apache Log4J', a logging framework, +which can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * http://logging.apache.org/log4j/ + +This product optionally depends on 'JBoss Logging', a logging framework, +which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-logging.txt (GNU LGPL 2.1) + * HOMEPAGE: + * http://anonsvn.jboss.org/repos/common/common-logging-spi/ + +This product optionally depends on 'Apache Felix', an open source OSGi +framework implementation, which can be obtained at: + + * LICENSE: + * license/LICENSE.felix.txt (Apache License 2.0) + * HOMEPAGE: + * http://felix.apache.org/ diff --git a/plugins/transport-reactor-netty4/licenses/netty-buffer-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-buffer-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..8430355365996 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-buffer-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +f8f3d8644afa5e6e1a40a3a6aeb9d9aa970ecb4f \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..7a36dc1f2724f --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +384ba4d75670befbedb45c4d3b497a93639c206d \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-dns-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-dns-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..f592ac8312a5d --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-dns-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +d266d079ef33cf93a16b382d64dd15d562df1159 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-http-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-http-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..37b78a32f741f --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-http-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +af78acec783ffd77c63d8aeecc21041fd39ac54f \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-http2-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-http2-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..cbf685a6d79d3 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-http2-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +893888d09a7bef0d0ba973d7471943e765d0fd08 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-common-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-common-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..1bdfec3aae6ba --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-common-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +7cceacaf11df8dc63f23d0fb58e9d4640fc88404 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-handler-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-handler-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..8b7b50a6fc9c6 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-handler-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +abb86c6906bf512bf2b797a41cd7d2e8d3cd7c36 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-resolver-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-resolver-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..032959e98d009 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-resolver-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +cec8348108dc76c47cf87c669d514be52c922144 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-resolver-dns-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-resolver-dns-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..60fd706436ae7 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-resolver-dns-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +2c50f835777ecd4535e15b552b5d9ccb26a2504f \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-transport-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-transport-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..107863c1b3c9d --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-transport-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +f37380d23c9bb079bc702910833b2fd532c9abd0 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-transport-native-unix-common-4.1.97.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-transport-native-unix-common-4.1.97.Final.jar.sha1 new file mode 100644 index 0000000000000..f736d37d071b7 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-transport-native-unix-common-4.1.97.Final.jar.sha1 @@ -0,0 +1 @@ +d469d84265ab70095b01b40886cabdd433b6e664 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/reactor-LICENSE.txt b/plugins/transport-reactor-netty4/licenses/reactor-LICENSE.txt new file mode 100644 index 0000000000000..e5583c184e67a --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/reactor-LICENSE.txt @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + https://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/plugins/crypto-kms/licenses/reactive-streams-NOTICE.txt b/plugins/transport-reactor-netty4/licenses/reactor-NOTICE.txt similarity index 100% rename from plugins/crypto-kms/licenses/reactive-streams-NOTICE.txt rename to plugins/transport-reactor-netty4/licenses/reactor-NOTICE.txt diff --git a/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.10.jar.sha1 b/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.10.jar.sha1 new file mode 100644 index 0000000000000..4f826a379f33c --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.10.jar.sha1 @@ -0,0 +1 @@ +0c2eadc75339948e71b24f95e944d75c0f74aa6f \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.10.jar.sha1 b/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.10.jar.sha1 new file mode 100644 index 0000000000000..b0cb32358b481 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.10.jar.sha1 @@ -0,0 +1 @@ +9804370c5833843922c9a5f4e2472a585197ddee \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpConversionUtil.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpConversionUtil.java new file mode 100644 index 0000000000000..bd75227dabd08 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpConversionUtil.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.rest.RestRequest; + +import io.netty.handler.codec.http.HttpMethod; + +final class HttpConversionUtil { + private HttpConversionUtil() {} + + /** + * Converts {@link HttpMethod} to {@link RestRequest.Method} + * @param method {@link HttpMethod} method + * @return corresponding {@link RestRequest.Method} + * @throws IllegalArgumentException if HTTP method is not supported + */ + public static RestRequest.Method convertMethod(HttpMethod method) { + if (method == HttpMethod.GET) { + return RestRequest.Method.GET; + } else if (method == HttpMethod.POST) { + return RestRequest.Method.POST; + } else if (method == HttpMethod.PUT) { + return RestRequest.Method.PUT; + } else if (method == HttpMethod.DELETE) { + return RestRequest.Method.DELETE; + } else if (method == HttpMethod.HEAD) { + return RestRequest.Method.HEAD; + } else if (method == HttpMethod.OPTIONS) { + return RestRequest.Method.OPTIONS; + } else if (method == HttpMethod.PATCH) { + return RestRequest.Method.PATCH; + } else if (method == HttpMethod.TRACE) { + return RestRequest.Method.TRACE; + } else if (method == HttpMethod.CONNECT) { + return RestRequest.Method.CONNECT; + } else { + throw new IllegalArgumentException("Unexpected http method: " + method); + } + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java new file mode 100644 index 0000000000000..98b359319ff1b --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.http.HttpChannel; +import org.opensearch.http.HttpResponse; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import reactor.core.publisher.FluxSink; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +class NonStreamingHttpChannel implements HttpChannel { + private final HttpServerRequest request; + private final HttpServerResponse response; + private final CompletableContext closeContext = new CompletableContext<>(); + private final FluxSink emitter; + + NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink emitter) { + this.request = request; + this.response = response; + this.emitter = emitter; + this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext)); + } + + @Override + public boolean isOpen() { + final AtomicBoolean isOpen = new AtomicBoolean(); + request.withConnection(connection -> isOpen.set(connection.channel().isOpen())); + return isOpen.get(); + } + + @Override + public void close() { + request.withConnection(connection -> connection.channel().close()); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public void sendResponse(HttpResponse response, ActionListener listener) { + emitter.next(createResponse(response)); + listener.onResponse(null); + emitter.complete(); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) response.remoteAddress(); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) response.hostAddress(); + } + + FullHttpResponse createResponse(HttpResponse response) { + return (FullHttpResponse) response; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java new file mode 100644 index 0000000000000..88358e21277a6 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.http.AbstractHttpServerTransport; +import org.opensearch.http.HttpRequest; +import org.opensearch.http.HttpResponse; + +import java.util.function.Consumer; + +import io.netty.buffer.CompositeByteBuf; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +class NonStreamingRequestConsumer implements Consumer, Publisher { + private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; + + private final HttpServerRequest request; + private final HttpServerResponse response; + private final CompositeByteBuf content; + private final Publisher publisher; + private final AbstractHttpServerTransport transport; + private volatile FluxSink emitter; + + NonStreamingRequestConsumer(AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response) { + this.transport = transport; + this.request = request; + this.response = response; + this.content = response.alloc().compositeBuffer(DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS); + this.publisher = Flux.create(emitter -> register(emitter)); + } + + private void register(FluxSink emitter) { + this.emitter = emitter; + } + + @Override + public void accept(T message) { + try { + if (message instanceof LastHttpContent) { + process(message, emitter); + } else if (message instanceof HttpContent) { + process(message, emitter); + } + } catch (Throwable ex) { + emitter.error(ex); + } + } + + public void process(HttpContent in, FluxSink emitter) { + // Consume request body in full before dispatching it + content.addComponent(true, in.content().retain()); + + if (in instanceof LastHttpContent) { + transport.incomingRequest(createRequest(request, content), new NonStreamingHttpChannel(request, response, emitter)); + } + } + + HttpRequest createRequest(HttpServerRequest request, CompositeByteBuf content) { + return new ReactorNetty4HttpRequest(request, content); + } + + FullHttpResponse createResponse(HttpResponse response) { + return (FullHttpResponse) response; + } + + @Override + public void subscribe(Subscriber s) { + publisher.subscribe(s); + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java new file mode 100644 index 0000000000000..4406c555a5b04 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java @@ -0,0 +1,272 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.http.HttpRequest; +import org.opensearch.rest.RestRequest; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.cookie.Cookie; +import io.netty.handler.codec.http.cookie.ServerCookieDecoder; +import io.netty.handler.codec.http.cookie.ServerCookieEncoder; +import reactor.netty.http.server.HttpServerRequest; + +class ReactorNetty4HttpRequest implements HttpRequest { + private final String protocol; + private final HttpMethod method; + private final String uri; + private final ByteBuf content; + private final HttpHeadersMap headers; + private final AtomicBoolean released; + private final Exception inboundException; + private final boolean pooled; + + ReactorNetty4HttpRequest(HttpServerRequest request, ByteBuf content) { + this(request, new HttpHeadersMap(request.requestHeaders()), new AtomicBoolean(false), true, content); + } + + ReactorNetty4HttpRequest(HttpServerRequest request, ByteBuf content, Exception inboundException) { + this( + request.protocol(), + request.method(), + request.uri(), + new HttpHeadersMap(request.requestHeaders()), + new AtomicBoolean(false), + true, + content, + inboundException + ); + } + + private ReactorNetty4HttpRequest( + HttpServerRequest request, + HttpHeadersMap headers, + AtomicBoolean released, + boolean pooled, + ByteBuf content + ) { + this(request.protocol(), request.method(), request.uri(), headers, released, pooled, content, null); + } + + private ReactorNetty4HttpRequest( + String protocol, + HttpMethod method, + String uri, + HttpHeadersMap headers, + AtomicBoolean released, + boolean pooled, + ByteBuf content, + Exception inboundException + ) { + + this.protocol = protocol; + this.method = method; + this.uri = uri; + this.headers = headers; + this.content = content; + this.pooled = pooled; + this.released = released; + this.inboundException = inboundException; + } + + @Override + public RestRequest.Method method() { + return HttpConversionUtil.convertMethod(method); + } + + @Override + public String uri() { + return uri; + } + + @Override + public BytesReference content() { + assert released.get() == false; + return Netty4Utils.toBytesReference(content); + } + + @Override + public void release() { + if (pooled && released.compareAndSet(false, true)) { + content.release(); + } + } + + @Override + public HttpRequest releaseAndCopy() { + assert released.get() == false; + if (pooled == false) { + return this; + } + try { + final ByteBuf copiedContent = Unpooled.copiedBuffer(content); + return new ReactorNetty4HttpRequest(protocol, method, uri, headers, new AtomicBoolean(false), false, copiedContent, null); + } finally { + release(); + } + } + + @Override + public final Map> getHeaders() { + return headers; + } + + @Override + public List strictCookies() { + String cookieString = headers.httpHeaders.get(HttpHeaderNames.COOKIE); + if (cookieString != null) { + Set cookies = ServerCookieDecoder.STRICT.decode(cookieString); + if (!cookies.isEmpty()) { + return ServerCookieEncoder.STRICT.encode(cookies); + } + } + return Collections.emptyList(); + } + + @Override + public HttpVersion protocolVersion() { + if (protocol.equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0.toString())) { + return HttpRequest.HttpVersion.HTTP_1_0; + } else if (protocol.equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1.toString())) { + return HttpRequest.HttpVersion.HTTP_1_1; + } else { + throw new IllegalArgumentException("Unexpected http protocol version: " + protocol); + } + } + + @Override + public HttpRequest removeHeader(String header) { + HttpHeaders headersWithoutContentTypeHeader = new DefaultHttpHeaders(); + headersWithoutContentTypeHeader.add(headers.httpHeaders); + headersWithoutContentTypeHeader.remove(header); + + return new ReactorNetty4HttpRequest( + protocol, + method, + uri, + new HttpHeadersMap(headersWithoutContentTypeHeader), + released, + pooled, + content, + null + ); + } + + @Override + public ReactorNetty4HttpResponse createResponse(RestStatus status, BytesReference content) { + return new ReactorNetty4HttpResponse( + headers.httpHeaders, + io.netty.handler.codec.http.HttpVersion.valueOf(protocol), + status, + content + ); + } + + @Override + public Exception getInboundException() { + return inboundException; + } + + /** + * A wrapper of {@link HttpHeaders} that implements a map to prevent copying unnecessarily. This class does not support modifications + * and due to the underlying implementation, it performs case insensitive lookups of key to values. + * + * It is important to note that this implementation does have some downsides in that each invocation of the + * {@link #values()} and {@link #entrySet()} methods will perform a copy of the values in the HttpHeaders rather than returning a + * view of the underlying values. + */ + private static class HttpHeadersMap implements Map> { + + private final HttpHeaders httpHeaders; + + private HttpHeadersMap(HttpHeaders httpHeaders) { + this.httpHeaders = httpHeaders; + } + + @Override + public int size() { + return httpHeaders.size(); + } + + @Override + public boolean isEmpty() { + return httpHeaders.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return key instanceof String && httpHeaders.contains((String) key); + } + + @Override + public boolean containsValue(Object value) { + return value instanceof List && httpHeaders.names().stream().map(httpHeaders::getAll).anyMatch(value::equals); + } + + @Override + public List get(Object key) { + return key instanceof String ? httpHeaders.getAll((String) key) : null; + } + + @Override + public List put(String key, List value) { + throw new UnsupportedOperationException("modifications are not supported"); + } + + @Override + public List remove(Object key) { + throw new UnsupportedOperationException("modifications are not supported"); + } + + @Override + public void putAll(Map> m) { + throw new UnsupportedOperationException("modifications are not supported"); + } + + @Override + public void clear() { + throw new UnsupportedOperationException("modifications are not supported"); + } + + @Override + public Set keySet() { + return httpHeaders.names(); + } + + @Override + public Collection> values() { + return httpHeaders.names().stream().map(k -> Collections.unmodifiableList(httpHeaders.getAll(k))).collect(Collectors.toList()); + } + + @Override + public Set>> entrySet() { + return httpHeaders.names() + .stream() + .map(k -> new AbstractMap.SimpleImmutableEntry<>(k, httpHeaders.getAll(k))) + .collect(Collectors.toSet()); + } + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpResponse.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpResponse.java new file mode 100644 index 0000000000000..c45ad54b668a3 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpResponse.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.http.HttpResponse; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; + +class ReactorNetty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse { + private final HttpHeaders requestHeaders; + + ReactorNetty4HttpResponse(HttpHeaders requestHeaders, HttpVersion version, RestStatus status, BytesReference content) { + super(version, HttpResponseStatus.valueOf(status.getStatus()), Netty4Utils.toByteBuf(content)); + this.requestHeaders = requestHeaders; + } + + @Override + public void addHeader(String name, String value) { + headers().add(name, value); + } + + @Override + public boolean containsHeader(String name) { + return headers().contains(name); + } + + public HttpHeaders requestHeaders() { + return requestHeaders; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java new file mode 100644 index 0000000000000..84360bf028ba9 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerChannel.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.http.HttpServerChannel; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.net.InetSocketAddress; + +import io.netty.channel.Channel; + +class ReactorNetty4HttpServerChannel implements HttpServerChannel { + private final Channel channel; + private final CompletableContext closeContext = new CompletableContext<>(); + + ReactorNetty4HttpServerChannel(Channel channel) { + this.channel = channel; + Netty4Utils.addListener(this.channel.closeFuture(), closeContext); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public String toString() { + return "ReactorNetty4HttpChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java new file mode 100644 index 0000000000000..f1e8643731ad5 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -0,0 +1,170 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.http.AbstractHttpServerTransport; +import org.opensearch.http.HttpServerChannel; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.reactor.SharedGroupFactory; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.net.InetSocketAddress; +import java.time.Duration; + +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.netty.DisposableServer; +import reactor.netty.http.HttpProtocol; +import reactor.netty.http.server.HttpServer; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT; + +/** + * The HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}). + */ +public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTransport { + /** + * The number of Reactor Netty HTTP workers + */ + public static final Setting SETTING_HTTP_WORKER_COUNT = Setting.intSetting("http.netty.worker_count", 0, Property.NodeScope); + + private final SharedGroupFactory sharedGroupFactory; + private volatile SharedGroupFactory.SharedGroup sharedGroup; + private volatile DisposableServer disposableServer; + private final int readTimeoutMillis; + private volatile Scheduler scheduler; + + /** + * Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}). + * @param settings settings + * @param networkService network service + * @param bigArrays big array allocator + * @param threadPool thread pool instance + * @param xContentRegistry XContent registry instance + * @param dispatcher dispatcher instance + * @param clusterSettings cluster settings + * @param sharedGroupFactory shared group factory + * @param tracer tracer instance + */ + public ReactorNetty4HttpServerTransport( + Settings settings, + NetworkService networkService, + BigArrays bigArrays, + ThreadPool threadPool, + NamedXContentRegistry xContentRegistry, + Dispatcher dispatcher, + ClusterSettings clusterSettings, + SharedGroupFactory sharedGroupFactory, + Tracer tracer + ) { + super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer); + Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings)); + this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); + this.sharedGroupFactory = sharedGroupFactory; + } + + /** + * Binds the transport engine to the socket address + * @param socketAddress socket address to bind to + */ + @Override + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { + final HttpServer server = HttpServer.create() + .httpFormDecoder(builder -> builder.scheduler(scheduler)) + .readTimeout(Duration.ofMillis(readTimeoutMillis)) + .runOn(sharedGroup.getLowLevelGroup()) + .bindAddress(() -> socketAddress) + .compress(true) + .protocol(HttpProtocol.HTTP11, HttpProtocol.H2C) + .handle((req, res) -> incomingRequest(req, res)); + + disposableServer = server.bindNow(); + return new ReactorNetty4HttpServerChannel(disposableServer.channel()); + } + + /** + * Handles incoming Reactor Netty request + * @param request request instance + * @param response response instances + * @return response publisher + */ + protected Publisher incomingRequest(HttpServerRequest request, HttpServerResponse response) { + final NonStreamingRequestConsumer consumer = new NonStreamingRequestConsumer<>(this, request, response); + + request.receiveContent() + .switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(consumer, null, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT)); + + return Mono.from(consumer).flatMap(hc -> { + final FullHttpResponse r = (FullHttpResponse) hc; + response.status(r.status()); + r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue())); + return Mono.from(response.chunkedTransfer(false).sendHeaders().sendObject(r.content().copy())); + }); + } + + /** + * Called to tear down internal resources + */ + @Override + protected void stopInternal() { + if (sharedGroup != null) { + sharedGroup.shutdown(); + sharedGroup = null; + } + + if (scheduler != null) { + scheduler.dispose(); + scheduler = null; + } + + if (disposableServer != null) { + disposableServer.disposeNow(); + disposableServer = null; + } + } + + /** + * Starts the transport + */ + @Override + protected void doStart() { + boolean success = false; + try { + scheduler = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + "http-form-decoder" + ); + sharedGroup = sharedGroupFactory.getHttpGroup(); + bindServer(); + success = true; + } finally { + if (success == false) { + doStop(); // otherwise we leak threads since we never moved to started + } + } + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/package-info.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/package-info.java new file mode 100644 index 0000000000000..b5ecb0b62f79d --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * The new HTTP transport implementations based on Reactor Netty. + */ +package org.opensearch.http.reactor.netty4; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java new file mode 100644 index 0000000000000..2e8bc02521f55 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.transport.reactor; + +import org.opensearch.common.SetOnce; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.reactor.netty4.ReactorNetty4Transport; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * The experimental network plugin that introduces new transport implementations based on Reactor Netty. + */ +public class ReactorNetty4Plugin extends Plugin implements NetworkPlugin { + /** + * The name of new experimental HTTP transport implementations based on Reactor Netty. + */ + public static final String REACTOR_NETTY_HTTP_TRANSPORT_NAME = "reactor-netty4"; + + private final SetOnce groupFactory = new SetOnce<>(); + + /** + * Default constructor + */ + public ReactorNetty4Plugin() {} + + /** + * Returns a list of additional {@link Setting} definitions for this plugin. + */ + @Override + public List> getSettings() { + return Arrays.asList(ReactorNetty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT, ReactorNetty4Transport.SETTING_WORKER_COUNT); + } + + /** + * Returns a map of {@link HttpServerTransport} suppliers. + * See {@link org.opensearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation. + * @param settings settings + * @param networkService network service + * @param bigArrays big array allocator + * @param pageCacheRecycler page cache recycler instance + * @param circuitBreakerService circuit breaker service instance + * @param threadPool thread pool instance + * @param xContentRegistry XContent registry instance + * @param dispatcher dispatcher instance + * @param clusterSettings cluster settings + * @param tracer tracer instance + */ + @Override + public Map> getHttpTransports( + Settings settings, + ThreadPool threadPool, + BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, + CircuitBreakerService circuitBreakerService, + NamedXContentRegistry xContentRegistry, + NetworkService networkService, + HttpServerTransport.Dispatcher dispatcher, + ClusterSettings clusterSettings, + Tracer tracer + ) { + return Collections.singletonMap( + REACTOR_NETTY_HTTP_TRANSPORT_NAME, + () -> new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry, + dispatcher, + clusterSettings, + getSharedGroupFactory(settings), + tracer + ) + ); + } + + private SharedGroupFactory getSharedGroupFactory(Settings settings) { + final SharedGroupFactory groupFactory = this.groupFactory.get(); + if (groupFactory != null) { + assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided"; + return groupFactory; + } else { + this.groupFactory.set(new SharedGroupFactory(settings)); + return this.groupFactory.get(); + } + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/SharedGroupFactory.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/SharedGroupFactory.java new file mode 100644 index 0000000000000..ab7de33c8e673 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/SharedGroupFactory.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.transport.reactor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.AbstractRefCounted; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport; +import org.opensearch.transport.TcpTransport; +import org.opensearch.transport.reactor.netty4.ReactorNetty4Transport; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; + +import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; + +/** + * Creates and returns {@link io.netty.channel.EventLoopGroup} instances. It will return a shared group for + * both {@link #getHttpGroup()} and {@link #getTransportGroup()} if + * {@link org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport#SETTING_HTTP_WORKER_COUNT} is configured to be 0. + * If that setting is not 0, then it will return a different group in the {@link #getHttpGroup()} call. + */ +public final class SharedGroupFactory { + + private static final Logger logger = LogManager.getLogger(SharedGroupFactory.class); + + private final Settings settings; + private final int workerCount; + private final int httpWorkerCount; + + private RefCountedGroup genericGroup; + private SharedGroup dedicatedHttpGroup; + + /** + * Creates new shared group factory instance from settings + * @param settings settings + */ + public SharedGroupFactory(Settings settings) { + this.settings = settings; + this.workerCount = ReactorNetty4Transport.SETTING_WORKER_COUNT.get(settings); + this.httpWorkerCount = ReactorNetty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings); + } + + Settings getSettings() { + return settings; + } + + /** + * Gets the number of configured transport workers + * @return the number of configured transport workers + */ + public int getTransportWorkerCount() { + return workerCount; + } + + /** + * Gets transport shared group + * @return transport shared group + */ + public synchronized SharedGroup getTransportGroup() { + return getGenericGroup(); + } + + /** + * Gets HTTP transport shared group + * @return HTTP transport shared group + */ + public synchronized SharedGroup getHttpGroup() { + if (httpWorkerCount == 0) { + return getGenericGroup(); + } else { + if (dedicatedHttpGroup == null) { + NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup( + httpWorkerCount, + daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX) + ); + dedicatedHttpGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup)); + } + return dedicatedHttpGroup; + } + } + + private SharedGroup getGenericGroup() { + if (genericGroup == null) { + EventLoopGroup eventLoopGroup = new NioEventLoopGroup( + workerCount, + daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX) + ); + this.genericGroup = new RefCountedGroup(eventLoopGroup); + } else { + genericGroup.incRef(); + } + return new SharedGroup(genericGroup); + } + + private static class RefCountedGroup extends AbstractRefCounted { + + public static final String NAME = "ref-counted-event-loop-group"; + private final EventLoopGroup eventLoopGroup; + + private RefCountedGroup(EventLoopGroup eventLoopGroup) { + super(NAME); + this.eventLoopGroup = eventLoopGroup; + } + + @Override + protected void closeInternal() { + Future shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + shutdownFuture.awaitUninterruptibly(); + if (shutdownFuture.isSuccess() == false) { + logger.warn("Error closing netty event loop group", shutdownFuture.cause()); + } + } + } + + /** + * Wraps the {@link RefCountedGroup}. Calls {@link RefCountedGroup#decRef()} on close. After close, + * this wrapped instance can no longer be used. + */ + public static class SharedGroup { + + private final RefCountedGroup refCountedGroup; + + private final AtomicBoolean isOpen = new AtomicBoolean(true); + + private SharedGroup(RefCountedGroup refCountedGroup) { + this.refCountedGroup = refCountedGroup; + } + + /** + * Gets Netty's {@link EventLoopGroup} instance + * @return Netty's {@link EventLoopGroup} instance + */ + public EventLoopGroup getLowLevelGroup() { + return refCountedGroup.eventLoopGroup; + } + + /** + * Decreases the reference to underlying {@link EventLoopGroup} instance + */ + public void shutdown() { + if (isOpen.compareAndSet(true, false)) { + refCountedGroup.decRef(); + } + } + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java new file mode 100644 index 0000000000000..8ec432b7dd5cd --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/Netty4Utils.java @@ -0,0 +1,142 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.transport.reactor.netty4; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; +import org.opensearch.ExceptionsHelper; +import org.opensearch.common.Booleans; +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.util.NettyRuntime; + +/** + * Shameless copy of Netty4Utils from transport-netty4 module + */ +public final class Netty4Utils { + private static final AtomicBoolean isAvailableProcessorsSet = new AtomicBoolean(); + + /** + * Utility class + */ + private Netty4Utils() {} + + /** + * Set the number of available processors that Netty uses for sizing various resources (e.g., thread pools). + * + * @param availableProcessors the number of available processors + * @throws IllegalStateException if available processors was set previously and the specified value does not match the already-set value + */ + public static void setAvailableProcessors(final int availableProcessors) { + // we set this to false in tests to avoid tests that randomly set processors from stepping on each other + final boolean set = Booleans.parseBoolean(System.getProperty("opensearch.set.netty.runtime.available.processors", "true")); + if (!set) { + return; + } + + /* + * This can be invoked twice, once from Netty4Transport and another time from Netty4HttpServerTransport; however, + * Netty4Runtime#availableProcessors forbids settings the number of processors twice so we prevent double invocation here. + */ + if (isAvailableProcessorsSet.compareAndSet(false, true)) { + NettyRuntime.setAvailableProcessors(availableProcessors); + } else if (availableProcessors != NettyRuntime.availableProcessors()) { + /* + * We have previously set the available processors yet either we are trying to set it to a different value now or there is a bug + * in Netty and our previous value did not take, bail. + */ + final String message = String.format( + Locale.ROOT, + "available processors value [%d] did not match current value [%d]", + availableProcessors, + NettyRuntime.availableProcessors() + ); + throw new IllegalStateException(message); + } + } + + /** + * Turns the given BytesReference into a ByteBuf. Note: the returned ByteBuf will reference the internal + * pages of the BytesReference. Don't free the bytes of reference before the ByteBuf goes out of scope. + * @param reference reference to convert + */ + public static ByteBuf toByteBuf(final BytesReference reference) { + if (reference.length() == 0) { + return Unpooled.EMPTY_BUFFER; + } + final BytesRefIterator iterator = reference.iterator(); + // usually we have one, two, or three components from the header, the message, and a buffer + final List buffers = new ArrayList<>(3); + try { + BytesRef slice; + while ((slice = iterator.next()) != null) { + buffers.add(Unpooled.wrappedBuffer(slice.bytes, slice.offset, slice.length)); + } + + if (buffers.size() == 1) { + return buffers.get(0); + } else { + CompositeByteBuf composite = Unpooled.compositeBuffer(buffers.size()); + composite.addComponents(true, buffers); + return composite; + } + } catch (IOException ex) { + throw new AssertionError("no IO happens here", ex); + } + } + + /** + * Wraps the given ChannelBuffer with a BytesReference + * @param buffer buffer to convert + */ + public static BytesReference toBytesReference(final ByteBuf buffer) { + final int readableBytes = buffer.readableBytes(); + if (readableBytes == 0) { + return BytesArray.EMPTY; + } else if (buffer.hasArray()) { + return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), readableBytes); + } else { + final ByteBuffer[] byteBuffers = buffer.nioBuffers(); + return BytesReference.fromByteBuffers(byteBuffers); + } + } + + /** + * Add completion listener to ChannelFuture + * @param channelFuture ChannelFuture to add listener to + * @param context completion listener context + */ + public static void addListener(ChannelFuture channelFuture, CompletableContext context) { + channelFuture.addListener(f -> { + if (f.isSuccess()) { + context.complete(null); + } else { + Throwable cause = f.cause(); + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + context.completeExceptionally(new Exception(cause)); + } else { + context.completeExceptionally((Exception) cause); + } + } + }); + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/ReactorNetty4Transport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/ReactorNetty4Transport.java new file mode 100644 index 0000000000000..b3e92f58c540a --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/ReactorNetty4Transport.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.reactor.netty4; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; + +import reactor.netty.tcp.TcpServer; + +/** + * The transport implementations based on Reactor Netty (see please {@link TcpServer}). + */ +public class ReactorNetty4Transport { + /** + * The number of Netty workers + */ + public static final Setting SETTING_WORKER_COUNT = new Setting<>( + "transport.netty.worker_count", + (s) -> Integer.toString(OpenSearchExecutors.allocatedProcessors(s)), + (s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"), + Property.NodeScope + ); + + /** + * Default constructor + */ + public ReactorNetty4Transport() {} +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/package-info.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/package-info.java new file mode 100644 index 0000000000000..921bca104c6fe --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/netty4/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * The new transport implementations based on Reactor Netty. + */ +package org.opensearch.transport.reactor.netty4; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/package-info.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/package-info.java new file mode 100644 index 0000000000000..2f36ebb7f11f8 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * The experimental network plugin that introduces new transport implementations based on Reactor Netty. + */ +package org.opensearch.transport.reactor; diff --git a/plugins/transport-reactor-netty4/src/main/plugin-metadata/plugin-security.policy b/plugins/transport-reactor-netty4/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..4f2dcde995338 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +grant codeBase "${codebase.netty-common}" { + // for reading the system-wide configuration for the backlog of established sockets + permission java.io.FilePermission "/proc/sys/net/core/somaxconn", "read"; + + // netty makes and accepts socket connections + permission java.net.SocketPermission "*", "accept,connect"; + + // Netty sets custom classloader for some of its internal threads + permission java.lang.RuntimePermission "*", "setContextClassLoader"; +}; + +grant codeBase "${codebase.netty-transport}" { + // Netty NioEventLoop wants to change this, because of https://bugs.openjdk.java.net/browse/JDK-6427854 + // the bug says it only happened rarely, and that its fixed, but apparently it still happens rarely! + permission java.util.PropertyPermission "sun.nio.ch.bugLevel", "write"; +}; diff --git a/server/build.gradle b/server/build.gradle index f6db3d53a0dcc..c5c9ab0f6c7be 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -154,6 +154,10 @@ dependencies { // jcraft api "com.jcraft:jzlib:${versions.jzlib}" + // reactor + api "io.projectreactor:reactor-core:${versions.reactor}" + api "org.reactivestreams:reactive-streams:${versions.reactivestreams}" + // protobuf api "com.google.protobuf:protobuf-java:${versions.protobuf}" api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}" @@ -364,11 +368,13 @@ tasks.named("thirdPartyAudit").configure { 'com.google.protobuf.UnsafeUtil$Android32MemoryAccessor', 'com.google.protobuf.UnsafeUtil$Android64MemoryAccessor', 'com.google.protobuf.UnsafeUtil$JvmMemoryAccessor', - 'com.google.protobuf.UnsafeUtil$MemoryAccessor' + 'com.google.protobuf.UnsafeUtil$MemoryAccessor', + 'reactor.core.publisher.Traces$SharedSecretsCallSiteSupplierFactory$TracingException' ) } tasks.named("dependencyLicenses").configure { + mapping from: /reactor-.*/, to: 'reactor' mapping from: /lucene-.*/, to: 'lucene' dependencies = project.configurations.runtimeClasspath.fileCollection { it.group.startsWith('org.opensearch') == false || diff --git a/plugins/crypto-kms/licenses/reactive-streams-1.0.4.jar.sha1 b/server/licenses/reactive-streams-1.0.4.jar.sha1 similarity index 100% rename from plugins/crypto-kms/licenses/reactive-streams-1.0.4.jar.sha1 rename to server/licenses/reactive-streams-1.0.4.jar.sha1 diff --git a/plugins/crypto-kms/licenses/reactive-streams-LICENSE.txt b/server/licenses/reactive-streams-LICENSE.txt similarity index 100% rename from plugins/crypto-kms/licenses/reactive-streams-LICENSE.txt rename to server/licenses/reactive-streams-LICENSE.txt diff --git a/plugins/discovery-ec2/licenses/reactive-streams-NOTICE.txt b/server/licenses/reactive-streams-NOTICE.txt similarity index 100% rename from plugins/discovery-ec2/licenses/reactive-streams-NOTICE.txt rename to server/licenses/reactive-streams-NOTICE.txt diff --git a/server/licenses/reactor-LICENSE.txt b/server/licenses/reactor-LICENSE.txt new file mode 100644 index 0000000000000..e5583c184e67a --- /dev/null +++ b/server/licenses/reactor-LICENSE.txt @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + https://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/plugins/repository-azure/licenses/reactive-streams-NOTICE.txt b/server/licenses/reactor-NOTICE.txt similarity index 100% rename from plugins/repository-azure/licenses/reactive-streams-NOTICE.txt rename to server/licenses/reactor-NOTICE.txt diff --git a/server/licenses/reactor-core-3.5.9.jar.sha1 b/server/licenses/reactor-core-3.5.9.jar.sha1 new file mode 100644 index 0000000000000..99093b60145b3 --- /dev/null +++ b/server/licenses/reactor-core-3.5.9.jar.sha1 @@ -0,0 +1 @@ +1e127cb91adf7535eb90b284159612f8a2d06e2f \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/http/HttpChunk.java b/server/src/main/java/org/opensearch/http/HttpChunk.java new file mode 100644 index 0000000000000..5201d9f112704 --- /dev/null +++ b/server/src/main/java/org/opensearch/http/HttpChunk.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.bytes.BytesReference; + +/** + * Represents a chunk of the HTTP response stream + */ +@PublicApi(since = "2.11.0") +public interface HttpChunk { + /** + * Signals this is the last chunk of the stream. + * @return "true" if this is the last chunk of the stream, "false" otherwise + */ + boolean isLast(); + + /** + * Returns the content of this chunk + * @return the content of this chunk + */ + BytesReference content(); + + /** + * Releases all possible resources associated with this chunk + */ + void release(); +}