From ed5e9ccd1f84f77a50fd814a765f899af3cfbd4f Mon Sep 17 00:00:00 2001 From: Nick Knize Date: Mon, 14 Aug 2023 20:06:51 -0500 Subject: [PATCH] Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility (#9262) This commit refactors the CompressorFactory static singleton class and CompressorType enum to a formal CompressorRegistry and enables downstream implementations to register their own compression implementations for use in compressing Blob stores and MediaType data. This is different from Lucene's Codec compression extension points which expose different compression implementations for Lucene's Stored Fields. --------- Signed-off-by: Nicholas Walter Knize (cherry picked from commit c6b67e1b938b5e34806216af5efef32954554bf3) --- CHANGELOG.md | 2 +- gradle/missing-javadoc.gradle | 1 + libs/compress/build.gradle | 38 ++++++ .../licenses/zstd-jni-1.5.5-3.jar.sha1 | 0 .../compress}/licenses/zstd-jni-LICENSE.txt | 0 .../compress}/licenses/zstd-jni-NOTICE.txt | 0 .../opensearch}/compress/ZstdCompressor.java | 16 ++- .../opensearch}/compress/package-info.java | 6 +- .../compress/spi/CompressionProvider.java | 32 +++++ .../opensearch/compress/spi/package-info.java | 15 +++ .../java/org/opensearch/package-info.java | 13 ++ ...earch.core.compress.spi.CompressorProvider | 9 ++ .../compress/ZstdCompressTests.java | 9 +- .../{common => }/compress/Compressor.java | 16 ++- .../core/compress/CompressorRegistry.java | 115 ++++++++++++++++++ .../core}/compress/NoneCompressor.java | 15 ++- .../compress/NotCompressedException.java | 2 +- .../compress/NotXContentException.java | 2 +- .../core/compress/package-info.java | 14 +++ .../core/compress/spi/CompressorProvider.java | 34 ++++++ .../spi/DefaultCompressorProvider.java | 31 +++++ .../core/compress/spi/package-info.java | 16 +++ ...earch.core.compress.spi.CompressorProvider | 9 ++ server/build.gradle | 4 +- .../coordination/CompressedStreamUtils.java | 8 +- .../common/compress/CompressedXContent.java | 13 +- .../common/compress/CompressorFactory.java | 101 --------------- .../common/compress/CompressorType.java | 42 ------- .../common/compress/DeflateCompressor.java | 15 ++- .../spi/ServerCompressorProvider.java | 36 ++++++ .../common/compress/spi/package-info.java | 16 +++ .../common/xcontent/XContentHelper.java | 14 +-- .../org/opensearch/index/get/GetResult.java | 4 +- .../blobstore/BlobStoreRepository.java | 22 ++-- .../blobstore/ChecksumBlobStoreFormat.java | 2 +- .../java/org/opensearch/search/SearchHit.java | 4 +- .../CompressibleBytesOutputStream.java | 4 +- .../transport/TransportDecompressor.java | 8 +- .../opensearch/transport/TransportLogger.java | 4 +- ...earch.core.compress.spi.CompressorProvider | 9 ++ .../common/compress/DeflateCompressTests.java | 7 +- .../DeflateCompressedXContentTests.java | 2 +- .../index/mapper/BinaryFieldMapperTests.java | 6 +- .../snapshots/BlobStoreFormatTests.java | 21 ++-- .../CompressibleBytesOutputStreamTests.java | 10 +- .../transport/TransportDecompressorTests.java | 8 +- ...earchBlobStoreRepositoryIntegTestCase.java | 4 +- .../AbstractSnapshotIntegTestCase.java | 4 +- .../compress/AbstractCompressorTestCase.java | 8 +- 49 files changed, 526 insertions(+), 245 deletions(-) create mode 100644 libs/compress/build.gradle rename {server => libs/compress}/licenses/zstd-jni-1.5.5-3.jar.sha1 (100%) rename {server => libs/compress}/licenses/zstd-jni-LICENSE.txt (100%) rename {server => libs/compress}/licenses/zstd-jni-NOTICE.txt (100%) rename {server/src/main/java/org/opensearch/common => libs/compress/src/main/java/org/opensearch}/compress/ZstdCompressor.java (86%) rename libs/{core/src/main/java/org/opensearch/core/common => compress/src/main/java/org/opensearch}/compress/package-info.java (63%) create mode 100644 libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java create mode 100644 libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java create mode 100644 libs/compress/src/main/java/org/opensearch/package-info.java create mode 100644 libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider rename {server/src/test/java/org/opensearch/common => libs/compress/src/test/java/org/opensearch}/compress/ZstdCompressTests.java (58%) rename libs/core/src/main/java/org/opensearch/core/{common => }/compress/Compressor.java (77%) create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java rename {server/src/main/java/org/opensearch/common => libs/core/src/main/java/org/opensearch/core}/compress/NoneCompressor.java (74%) rename {server/src/main/java/org/opensearch/common => libs/core/src/main/java/org/opensearch/core}/compress/NotCompressedException.java (97%) rename libs/core/src/main/java/org/opensearch/core/{common => }/compress/NotXContentException.java (96%) create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/package-info.java create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java create mode 100644 libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider delete mode 100644 server/src/main/java/org/opensearch/common/compress/CompressorFactory.java delete mode 100644 server/src/main/java/org/opensearch/common/compress/CompressorType.java create mode 100644 server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java create mode 100644 server/src/main/java/org/opensearch/common/compress/spi/package-info.java create mode 100644 server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider rename server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java => test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java (98%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 382b038295dd8..6012923e33b12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,8 +75,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Refactor] Task foundation classes to core library - pt 1 ([#9082](https://github.com/opensearch-project/OpenSearch/pull/9082)) - Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129)) - Add base class for parameterizing the search based tests #9083 ([#9083](https://github.com/opensearch-project/OpenSearch/pull/9083)) -- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129)) - Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177)) +- Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility ([#9262](https://github.com/opensearch-project/OpenSearch/pull/9262)) ### Deprecated diff --git a/gradle/missing-javadoc.gradle b/gradle/missing-javadoc.gradle index e006b4309deea..ab2eddf16eacf 100644 --- a/gradle/missing-javadoc.gradle +++ b/gradle/missing-javadoc.gradle @@ -180,6 +180,7 @@ configure([ configure([ project(":libs:opensearch-common"), project(":libs:opensearch-core"), + project(":libs:opensearch-compress"), project(":server") ]) { project.tasks.withType(MissingJavadocTask) { diff --git a/libs/compress/build.gradle b/libs/compress/build.gradle new file mode 100644 index 0000000000000..7a5bc2f573dea --- /dev/null +++ b/libs/compress/build.gradle @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +apply plugin: 'opensearch.build' +apply plugin: 'opensearch.publish' + +base { + archivesName = 'opensearch-compress' +} + +dependencies { + api project(':libs:opensearch-common') + api project(':libs:opensearch-core') + + //zstd + api "com.github.luben:zstd-jni:${versions.zstd}" + + testImplementation(project(":test:framework")) { + // tests use the locally compiled version of server + exclude group: 'org.opensearch', module: 'opensearch-compress' + } +} + +tasks.named('forbiddenApisMain').configure { + // :libs:opensearch-compress does not depend on server + // TODO: Need to decide how we want to handle for forbidden signatures with the changes to server + replaceSignatureFiles 'jdk-signatures' +} + +jarHell.enabled = false diff --git a/server/licenses/zstd-jni-1.5.5-3.jar.sha1 b/libs/compress/licenses/zstd-jni-1.5.5-3.jar.sha1 similarity index 100% rename from server/licenses/zstd-jni-1.5.5-3.jar.sha1 rename to libs/compress/licenses/zstd-jni-1.5.5-3.jar.sha1 diff --git a/server/licenses/zstd-jni-LICENSE.txt b/libs/compress/licenses/zstd-jni-LICENSE.txt similarity index 100% rename from server/licenses/zstd-jni-LICENSE.txt rename to libs/compress/licenses/zstd-jni-LICENSE.txt diff --git a/server/licenses/zstd-jni-NOTICE.txt b/libs/compress/licenses/zstd-jni-NOTICE.txt similarity index 100% rename from server/licenses/zstd-jni-NOTICE.txt rename to libs/compress/licenses/zstd-jni-NOTICE.txt diff --git a/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java b/libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java similarity index 86% rename from server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java rename to libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java index 672c66eb2909f..01afc368fb120 100644 --- a/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java +++ b/libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java @@ -6,14 +6,15 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.compress; import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -25,7 +26,8 @@ /** * {@link Compressor} implementation based on the ZSTD compression algorithm. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class ZstdCompressor implements Compressor { // An arbitrary header that we use to identify compressed streams @@ -34,6 +36,14 @@ public class ZstdCompressor implements Compressor { // a XContent private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' }; + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static final String NAME = "ZSTD"; + private static final int LEVEL = 3; private static final int BUFFER_SIZE = 4096; diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java b/libs/compress/src/main/java/org/opensearch/compress/package-info.java similarity index 63% rename from libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java rename to libs/compress/src/main/java/org/opensearch/compress/package-info.java index 99459f99c42d8..3ffa53079fa69 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java +++ b/libs/compress/src/main/java/org/opensearch/compress/package-info.java @@ -6,5 +6,7 @@ * compatible open source license. */ -/** Classes for core compress module */ -package org.opensearch.core.common.compress; +/** + * Concrete {@link org.opensearch.core.compress.Compressor} implementations + */ +package org.opensearch.compress; diff --git a/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java b/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java new file mode 100644 index 0000000000000..58bf24a210bae --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.compress.spi; + +import org.opensearch.compress.ZstdCompressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.spi.CompressorProvider; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map.Entry; + +/** + * Additional "optional" compressor implementations provided by the opensearch compress library + * + * @opensearch.internal + */ +public class CompressionProvider implements CompressorProvider { + + /** Returns the concrete {@link Compressor}s provided by the compress library */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry<>(ZstdCompressor.NAME, new ZstdCompressor())); + } +} diff --git a/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java b/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java new file mode 100644 index 0000000000000..47d982a7ca2f9 --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Provider Interface for registering concrete {@link org.opensearch.core.compress.Compressor} + * implementations. + * + * See {@link org.opensearch.compress.ZstdCompressor} + */ +package org.opensearch.compress.spi; diff --git a/libs/compress/src/main/java/org/opensearch/package-info.java b/libs/compress/src/main/java/org/opensearch/package-info.java new file mode 100644 index 0000000000000..264680e9cb271 --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This is the compress library for registering optional + * {@link org.opensearch.core.compress.Compressor} implementations + */ +package org.opensearch; diff --git a/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..a9ea063e24436 --- /dev/null +++ b/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.compress.spi.CompressionProvider diff --git a/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java b/libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java similarity index 58% rename from server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java rename to libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java index 9def702792ffc..54864054a0e02 100644 --- a/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java +++ b/libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java @@ -6,19 +6,20 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.compress; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.test.core.compress.AbstractCompressorTestCase; /** * Test streaming compression */ -public class ZstdCompressTests extends AbstractCompressorTests { +public class ZstdCompressTests extends AbstractCompressorTestCase { private final Compressor compressor = new ZstdCompressor(); @Override - Compressor compressor() { + protected Compressor compressor() { return compressor; } } diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java b/libs/core/src/main/java/org/opensearch/core/compress/Compressor.java similarity index 77% rename from libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java rename to libs/core/src/main/java/org/opensearch/core/compress/Compressor.java index 88b6c9f85f225..27d5b5dfdfa15 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/Compressor.java @@ -30,8 +30,10 @@ * GitHub history for details. */ -package org.opensearch.core.common.compress; +package org.opensearch.core.compress; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; import java.io.IOException; @@ -39,10 +41,18 @@ import java.io.OutputStream; /** - * Compressor interface + * Compressor interface used for compressing {@link org.opensearch.core.xcontent.MediaType} and + * {@code org.opensearch.repositories.blobstore.BlobStoreRepository} implementations. * - * @opensearch.internal + * This is not to be confused with {@link org.apache.lucene.codecs.compressing.Compressor} which is used + * for codec implementations such as {@code org.opensearch.index.codec.customcodecs.Lucene95CustomCodec} + * for compressing {@link org.apache.lucene.document.StoredField}s + * + * @opensearch.api - intended to be extended + * @opensearch.experimental - however, bwc is not guaranteed at this time */ +@ExperimentalApi +@PublicApi(since = "2.10.0") public interface Compressor { boolean isCompressed(BytesReference bytes); diff --git a/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java new file mode 100644 index 0000000000000..9290254c30d8d --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress; + +import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.spi.CompressorProvider; +import org.opensearch.core.xcontent.MediaTypeRegistry; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +/** + * A registry that wraps a static Map singleton which holds a mapping of unique String names (typically the + * compressor header as a string) to registerd {@link Compressor} implementations. + * + * This enables plugins, modules, extensions to register their own compression implementations through SPI + * + * @opensearch.experimental + * @opensearch.internal + */ +@InternalApi +public final class CompressorRegistry { + + // the backing registry map + private static final Map registeredCompressors = ServiceLoader.load( + CompressorProvider.class, + CompressorProvider.class.getClassLoader() + ) + .stream() + .flatMap(p -> p.get().getCompressors().stream()) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + + // no instance: + private CompressorRegistry() {} + + /** + * Returns the default compressor + */ + public static Compressor defaultCompressor() { + return registeredCompressors.get("DEFLATE"); + } + + public static Compressor none() { + return registeredCompressors.get(NoneCompressor.NAME); + } + + public static boolean isCompressed(BytesReference bytes) { + return compressor(bytes) != null; + } + + @Nullable + public static Compressor compressor(final BytesReference bytes) { + for (Compressor compressor : registeredCompressors.values()) { + if (compressor.isCompressed(bytes) == true) { + // bytes should be either detected as compressed or as xcontent, + // if we have bytes that can be either detected as compressed or + // as a xcontent, we have a problem + assert MediaTypeRegistry.xContentType(bytes) == null; + return compressor; + } + } + + if (MediaTypeRegistry.xContentType(bytes) == null) { + throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"); + } + + return null; + } + + /** Decompress the provided {@link BytesReference}. */ + public static BytesReference uncompress(BytesReference bytes) throws IOException { + Compressor compressor = compressor(bytes); + if (compressor == null) { + throw new NotCompressedException(); + } + return compressor.uncompress(bytes); + } + + /** + * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}. + */ + public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException { + Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null")); + return compressor == null ? bytes : compressor.uncompress(bytes); + } + + /** Returns a registered compressor by its registered name */ + public static Compressor getCompressor(final String name) { + if (registeredCompressors.containsKey(name)) { + return registeredCompressors.get(name); + } + throw new IllegalArgumentException("No registered compressor found by name [" + name + "]"); + } + + /** + * Returns the registered compressors as an Immutable collection + * + * note: used for testing + */ + public static Map registeredCompressors() { + // no destructive danger as backing map is immutable + return registeredCompressors; + } +} diff --git a/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java b/libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java similarity index 74% rename from server/src/main/java/org/opensearch/common/compress/NoneCompressor.java rename to libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java index f820a3351bc80..6e607ed701633 100644 --- a/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java @@ -6,10 +6,10 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.core.compress; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; import java.io.IOException; import java.io.InputStream; @@ -18,9 +18,18 @@ /** * {@link Compressor} no compressor implementation. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class NoneCompressor implements Compressor { + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static final String NAME = "NONE"; + @Override public boolean isCompressed(BytesReference bytes) { return false; diff --git a/server/src/main/java/org/opensearch/common/compress/NotCompressedException.java b/libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java similarity index 97% rename from server/src/main/java/org/opensearch/common/compress/NotCompressedException.java rename to libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java index 7f070e0b499d8..91d6bc57f1cd6 100644 --- a/server/src/main/java/org/opensearch/common/compress/NotCompressedException.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.common.compress; +package org.opensearch.core.compress; /** * Exception indicating that we were expecting something compressed, which diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java b/libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java similarity index 96% rename from libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java rename to libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java index d1a3e7709a7d0..99337d5a26025 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.core.common.compress; +package org.opensearch.core.compress; import org.opensearch.core.xcontent.XContent; diff --git a/libs/core/src/main/java/org/opensearch/core/compress/package-info.java b/libs/core/src/main/java/org/opensearch/core/compress/package-info.java new file mode 100644 index 0000000000000..c0365e45702bc --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/package-info.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Concrete {@link org.opensearch.core.compress.Compressor} implementations provided by the core library + * + * See {@link org.opensearch.core.compress.NoneCompressor} + */ +package org.opensearch.core.compress; diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java new file mode 100644 index 0000000000000..019e282444d64 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress.spi; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.compress.Compressor; + +import java.util.List; +import java.util.Map; + +/** + * Service Provider Interface for plugins, modules, extensions providing custom + * compression algorithms + * + * see {@link Compressor} for implementing methods + * and {@link org.opensearch.core.compress.CompressorRegistry} for the registration of custom + * Compressors + * + * @opensearch.experimental + * @opensearch.api + */ +@ExperimentalApi +@PublicApi(since = "2.10.0") +public interface CompressorProvider { + /** Extensions that implement their own concrete {@link Compressor}s provide them through this interface method*/ + List> getCompressors(); +} diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java new file mode 100644 index 0000000000000..3ca10b564ef68 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress.spi; + +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map.Entry; + +/** + * Default {@link Compressor} implementations provided by the + * opensearch core library + * + * @opensearch.internal + */ +public class DefaultCompressorProvider implements CompressorProvider { + /** Returns the default {@link Compressor}s provided by the core library */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry(NoneCompressor.NAME, new NoneCompressor())); + } +} diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java new file mode 100644 index 0000000000000..6e33cc8fb63d3 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * The Service Provider Interface implementation for registering {@link org.opensearch.core.compress.Compressor} + * with the {@link org.opensearch.core.compress.CompressorRegistry} + * + * See {@link org.opensearch.core.compress.spi.DefaultCompressorProvider} as an example of registering the core + * {@link org.opensearch.core.compress.NoneCompressor} + */ +package org.opensearch.core.compress.spi; diff --git a/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..181b802952c60 --- /dev/null +++ b/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.core.compress.spi.DefaultCompressorProvider diff --git a/server/build.gradle b/server/build.gradle index c608c5ff86f06..9c409d77363cb 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -101,6 +101,7 @@ dependencies { api project(':libs:opensearch-common') api project(':libs:opensearch-core') + api project(":libs:opensearch-compress") api project(':libs:opensearch-secure-sm') api project(':libs:opensearch-x-content') api project(":libs:opensearch-geo") @@ -157,9 +158,6 @@ dependencies { api "com.google.protobuf:protobuf-java:${versions.protobuf}" api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}" - //zstd - api "com.github.luben:zstd-jni:${versions.zstd}" - testImplementation(project(":test:framework")) { // tests use the locally compiled version of server exclude group: 'org.opensearch', module: 'server' diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java index 32d3757e7a0cc..dc7b203eb7c4b 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java @@ -12,16 +12,16 @@ import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.common.CheckedConsumer; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.transport.BytesTransportRequest; import java.io.IOException; @@ -37,7 +37,7 @@ public final class CompressedStreamUtils { public static BytesReference createCompressedStream(Version version, CheckedConsumer outputConsumer) throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream))) { + try (StreamOutput stream = new OutputStreamStreamOutput(CompressorRegistry.defaultCompressor().threadLocalOutputStream(bStream))) { stream.setVersion(version); outputConsumer.accept(stream); } @@ -48,7 +48,7 @@ public static BytesReference createCompressedStream(Version version, CheckedCons public static StreamInput decompressBytes(BytesTransportRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException { - final Compressor compressor = CompressorFactory.compressor(request.bytes()); + final Compressor compressor = CompressorRegistry.compressor(request.bytes()); final StreamInput in; if (compressor != null) { in = new InputStreamStreamInput(compressor.threadLocalInputStream(request.bytes().streamInput())); diff --git a/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java b/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java index fefdf0e7dfdf3..550bc5b3fd524 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java +++ b/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java @@ -37,9 +37,10 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; @@ -86,7 +87,7 @@ private CompressedXContent(byte[] compressed, int crc32) { */ public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - OutputStream compressedStream = CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream); + OutputStream compressedStream = CompressorRegistry.defaultCompressor().threadLocalOutputStream(bStream); CRC32 crc32 = new CRC32(); OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32); try (XContentBuilder builder = XContentFactory.jsonBuilder(checkedStream)) { @@ -108,20 +109,20 @@ public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws * that may already be compressed. */ public CompressedXContent(BytesReference data) throws IOException { - Compressor compressor = CompressorFactory.compressor(data); + Compressor compressor = CompressorRegistry.compressor(data); if (compressor != null) { // already compressed... this.bytes = BytesReference.toBytes(data); this.crc32 = crc32(uncompressed()); } else { - this.bytes = BytesReference.toBytes(CompressorFactory.defaultCompressor().compress(data)); + this.bytes = BytesReference.toBytes(CompressorRegistry.defaultCompressor().compress(data)); this.crc32 = crc32(data); } assertConsistent(); } private void assertConsistent() { - assert CompressorFactory.compressor(new BytesArray(bytes)) != null; + assert CompressorRegistry.compressor(new BytesArray(bytes)) != null; assert this.crc32 == crc32(uncompressed()); } @@ -146,7 +147,7 @@ public BytesReference compressedReference() { /** Return the uncompressed bytes. */ public BytesReference uncompressed() { try { - return CompressorFactory.uncompress(new BytesArray(bytes)); + return CompressorRegistry.uncompress(new BytesArray(bytes)); } catch (IOException e) { throw new IllegalStateException("Cannot decompress compressed string", e); } diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java b/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java deleted file mode 100644 index e40dd89abab54..0000000000000 --- a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.common.compress; - -import org.opensearch.common.Nullable; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.core.common.compress.NotXContentException; -import org.opensearch.core.xcontent.MediaTypeRegistry; - -import java.io.IOException; -import java.util.Objects; - -/** - * Factory to create a compressor instance. - * - * @opensearch.internal - */ -public class CompressorFactory { - - public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor(); - - public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor(); - - public static final Compressor NONE_COMPRESSOR = new NoneCompressor(); - - public static boolean isCompressed(BytesReference bytes) { - return compressor(bytes) != null; - } - - public static Compressor defaultCompressor() { - return DEFLATE_COMPRESSOR; - } - - @Nullable - public static Compressor compressor(BytesReference bytes) { - if (DEFLATE_COMPRESSOR.isCompressed(bytes)) { - // bytes should be either detected as compressed or as xcontent, - // if we have bytes that can be either detected as compressed or - // as a xcontent, we have a problem - assert MediaTypeRegistry.xContentType(bytes) == null; - return DEFLATE_COMPRESSOR; - } else if (ZSTD_COMPRESSOR.isCompressed(bytes)) { - assert MediaTypeRegistry.xContentType(bytes) == null; - return ZSTD_COMPRESSOR; - } - - if (MediaTypeRegistry.xContentType(bytes) == null) { - throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"); - } - - return null; - } - - /** - * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}. - */ - public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException { - Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null")); - return compressor == null ? bytes : compressor.uncompress(bytes); - } - - /** Decompress the provided {@link BytesReference}. */ - public static BytesReference uncompress(BytesReference bytes) throws IOException { - Compressor compressor = compressor(bytes); - if (compressor == null) { - throw new NotCompressedException(); - } - return compressor.uncompress(bytes); - } -} diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorType.java b/server/src/main/java/org/opensearch/common/compress/CompressorType.java deleted file mode 100644 index bc688bab57c37..0000000000000 --- a/server/src/main/java/org/opensearch/common/compress/CompressorType.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.compress; - -import org.opensearch.core.common.compress.Compressor; - -/** - * Supported compression types - * - * @opensearch.internal - */ -public enum CompressorType { - - DEFLATE { - @Override - public Compressor compressor() { - return CompressorFactory.DEFLATE_COMPRESSOR; - } - }, - - ZSTD { - @Override - public Compressor compressor() { - return CompressorFactory.ZSTD_COMPRESSOR; - } - }, - - NONE { - @Override - public Compressor compressor() { - return CompressorFactory.NONE_COMPRESSOR; - } - }; - - public abstract Compressor compressor(); -} diff --git a/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java b/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java index ed741b4899ae7..3ccac1a941741 100644 --- a/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java +++ b/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java @@ -32,11 +32,12 @@ package org.opensearch.common.compress; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.core.Assertions; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -53,7 +54,8 @@ /** * {@link Compressor} implementation based on the DEFLATE compression algorithm. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class DeflateCompressor implements Compressor { @@ -62,6 +64,15 @@ public class DeflateCompressor implements Compressor { // enough so that no stream starting with these bytes could be detected as // a XContent private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' }; + + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static String NAME = "DEFLATE"; + // 3 is a good trade-off between speed and compression ratio private static final int LEVEL = 3; // We use buffering on the input and output of in/def-laters in order to diff --git a/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java b/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java new file mode 100644 index 0000000000000..42036f8d88610 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.compress.spi; + +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.spi.CompressorProvider; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map.Entry; + +/** + * Default {@link Compressor} implementations provided by the + * opensearch core library + * + * @opensearch.internal + * + * @deprecated This class is deprecated and will be removed when the {@link DeflateCompressor} is moved to the compress + * library as a default compression option + */ +@Deprecated +public class ServerCompressorProvider implements CompressorProvider { + /** Returns the concrete {@link Compressor}s provided by the server module */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry(DeflateCompressor.NAME, new DeflateCompressor())); + } +} diff --git a/server/src/main/java/org/opensearch/common/compress/spi/package-info.java b/server/src/main/java/org/opensearch/common/compress/spi/package-info.java new file mode 100644 index 0000000000000..a8019b23c7d90 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/spi/package-info.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Provider Interface for registering the{@link org.opensearch.common.compress.DeflateCompressor} with the + * {@link org.opensearch.core.compress.CompressorRegistry}. + * + * Note: this will be refactored to the {@code :libs:opensearch-compress} library after other dependency classes are + * refactored. + */ +package org.opensearch.common.compress.spi; diff --git a/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java b/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java index 3100d597abf55..798a58551457f 100644 --- a/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java +++ b/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java @@ -34,10 +34,10 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -77,7 +77,7 @@ public static XContentParser createParser( DeprecationHandler deprecationHandler, BytesReference bytes ) throws IOException { - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedInput = null; try { @@ -106,7 +106,7 @@ public static XContentParser createParser( MediaType mediaType ) throws IOException { Objects.requireNonNull(mediaType); - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedInput = null; try { @@ -163,7 +163,7 @@ public static Tuple> convertToMap(Bytes try { final MediaType contentType; InputStream input; - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedStreamInput = compressor.threadLocalInputStream(bytes.streamInput()); if (compressedStreamInput.markSupported() == false) { @@ -451,7 +451,7 @@ private static boolean allListValuesAreMapsOfOne(List list) { */ @Deprecated public static void writeRawField(String field, BytesReference source, XContentBuilder builder, Params params) throws IOException { - Compressor compressor = CompressorFactory.compressor(source); + Compressor compressor = CompressorRegistry.compressor(source); if (compressor != null) { try (InputStream compressedStreamInput = compressor.threadLocalInputStream(source.streamInput())) { builder.rawField(field, compressedStreamInput); @@ -470,7 +470,7 @@ public static void writeRawField(String field, BytesReference source, XContentBu public static void writeRawField(String field, BytesReference source, XContentType xContentType, XContentBuilder builder, Params params) throws IOException { Objects.requireNonNull(xContentType); - Compressor compressor = CompressorFactory.compressor(source); + Compressor compressor = CompressorRegistry.compressor(source); if (compressor != null) { try (InputStream compressedStreamInput = compressor.threadLocalInputStream(source.streamInput())) { builder.rawField(field, compressedStreamInput, xContentType); diff --git a/server/src/main/java/org/opensearch/index/get/GetResult.java b/server/src/main/java/org/opensearch/index/get/GetResult.java index 6445434764fb4..7f87f9c61c93e 100644 --- a/server/src/main/java/org/opensearch/index/get/GetResult.java +++ b/server/src/main/java/org/opensearch/index/get/GetResult.java @@ -35,7 +35,6 @@ import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchParseException; import org.opensearch.Version; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.document.DocumentField; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.common.Strings; @@ -43,6 +42,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; @@ -219,7 +219,7 @@ public BytesReference sourceRef() { } try { - this.source = CompressorFactory.uncompressIfNeeded(this.source); + this.source = CompressorRegistry.uncompressIfNeeded(this.source); return this.source; } catch (IOException e) { throw new OpenSearchParseException("failed to decompress source", e); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index f6900a7dd1801..da02fa81925db 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -74,8 +74,7 @@ import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.compress.CompressorType; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -93,10 +92,11 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.core.common.compress.NotXContentException; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; +import org.opensearch.core.compress.NotXContentException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.index.snapshots.IndexShardSnapshotFailedException; import org.opensearch.core.util.BytesRefUtils; @@ -265,10 +265,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", false, Setting.Property.NodeScope); - public static final Setting COMPRESSION_TYPE_SETTING = new Setting<>( + public static final Setting COMPRESSION_TYPE_SETTING = new Setting<>( "compression_type", - CompressorType.DEFLATE.name().toLowerCase(Locale.ROOT), - s -> CompressorType.valueOf(s.toUpperCase(Locale.ROOT)), + DeflateCompressor.NAME.toLowerCase(Locale.ROOT), + s -> CompressorRegistry.getCompressor(s.toUpperCase(Locale.ROOT)), Setting.Property.NodeScope ); @@ -405,7 +405,7 @@ protected BlobStoreRepository( cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); - this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()).compressor() : CompressorFactory.NONE_COMPRESSOR; + this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()) : CompressorRegistry.none(); } @Override @@ -774,7 +774,7 @@ public BlobStore blobStore() { * @return true if compression is needed */ protected final boolean isCompress() { - return compressor != CompressorFactory.NONE_COMPRESSOR; + return compressor != CompressorRegistry.none(); } /** @@ -2002,7 +2002,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) { if (cacheRepositoryData && bestEffortConsistency == false) { final BytesReference serialized; try { - serialized = CompressorFactory.defaultCompressor().compress(updated); + serialized = CompressorRegistry.defaultCompressor().compress(updated); final int len = serialized.length(); if (len > ByteSizeUnit.KB.toBytes(500)) { logger.debug( @@ -2038,7 +2038,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) { } private RepositoryData repositoryDataFromCachedEntry(Tuple cacheEntry) throws IOException { - try (InputStream input = CompressorFactory.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) { + try (InputStream input = CompressorRegistry.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) { return RepositoryData.snapshotsFromXContent( MediaTypeRegistry.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input), cacheEntry.v1(), diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 01f76f9d889b9..9048757405108 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -51,7 +51,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; diff --git a/server/src/main/java/org/opensearch/search/SearchHit.java b/server/src/main/java/org/opensearch/search/SearchHit.java index b6061c18eb629..fab9c1b773d0b 100644 --- a/server/src/main/java/org/opensearch/search/SearchHit.java +++ b/server/src/main/java/org/opensearch/search/SearchHit.java @@ -38,7 +38,6 @@ import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.common.Nullable; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.document.DocumentField; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.ParseField; @@ -49,6 +48,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.text.Text; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.ConstructingObjectParser; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -400,7 +400,7 @@ public BytesReference getSourceRef() { } try { - this.source = CompressorFactory.uncompressIfNeeded(this.source); + this.source = CompressorRegistry.uncompressIfNeeded(this.source); return this.source; } catch (IOException e) { throw new OpenSearchParseException("failed to decompress source", e); diff --git a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java index e3fe3c4e37006..5cb169439a14d 100644 --- a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java +++ b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java @@ -32,12 +32,12 @@ package org.opensearch.transport; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.Streams; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStream; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.CompressorRegistry; import java.io.IOException; import java.io.OutputStream; @@ -68,7 +68,7 @@ final class CompressibleBytesOutputStream extends StreamOutput { this.bytesStreamOutput = bytesStreamOutput; this.shouldCompress = shouldCompress; if (shouldCompress) { - this.stream = CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); + this.stream = CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); } else { this.stream = bytesStreamOutput; } diff --git a/server/src/main/java/org/opensearch/transport/TransportDecompressor.java b/server/src/main/java/org/opensearch/transport/TransportDecompressor.java index 721085c611ad7..8fbc3b7ce6803 100644 --- a/server/src/main/java/org/opensearch/transport/TransportDecompressor.java +++ b/server/src/main/java/org/opensearch/transport/TransportDecompressor.java @@ -35,12 +35,12 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.recycler.Recycler; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import java.io.Closeable; import java.io.IOException; @@ -70,7 +70,7 @@ public TransportDecompressor(PageCacheRecycler recycler) { public int decompress(BytesReference bytesReference) throws IOException { int bytesConsumed = 0; if (hasReadHeader == false) { - final Compressor compressor = CompressorFactory.defaultCompressor(); + final Compressor compressor = CompressorRegistry.defaultCompressor(); if (compressor.isCompressed(bytesReference) == false) { int maxToRead = Math.min(bytesReference.length(), 10); StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead) @@ -137,7 +137,7 @@ public int decompress(BytesReference bytesReference) throws IOException { } public boolean canDecompress(int bytesAvailable) { - return hasReadHeader || bytesAvailable >= CompressorFactory.defaultCompressor().headerLength(); + return hasReadHeader || bytesAvailable >= CompressorRegistry.defaultCompressor().headerLength(); } public boolean isEOS() { diff --git a/server/src/main/java/org/opensearch/transport/TransportLogger.java b/server/src/main/java/org/opensearch/transport/TransportLogger.java index 24a8f886be4ef..1876164c52e58 100644 --- a/server/src/main/java/org/opensearch/transport/TransportLogger.java +++ b/server/src/main/java/org/opensearch/transport/TransportLogger.java @@ -34,12 +34,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.CompressorRegistry; import java.io.IOException; @@ -185,7 +185,7 @@ private static String format(TcpChannel channel, InboundMessage message, String private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException { if (TransportStatus.isCompress(status) && streamInput.available() > 0) { try { - return new InputStreamStreamInput(CompressorFactory.defaultCompressor().threadLocalInputStream(streamInput)); + return new InputStreamStreamInput(CompressorRegistry.defaultCompressor().threadLocalInputStream(streamInput)); } catch (IllegalArgumentException e) { throw new IllegalStateException("stream marked as compressed, but is missing deflate header"); } diff --git a/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..8d93d45035f3f --- /dev/null +++ b/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.common.compress.spi.ServerCompressorProvider diff --git a/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java b/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java index 8c7ab05addc4c..262a7ec40a8f0 100644 --- a/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java +++ b/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java @@ -32,17 +32,18 @@ package org.opensearch.common.compress; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.test.core.compress.AbstractCompressorTestCase; /** * Test streaming compression (e.g. used for recovery) */ -public class DeflateCompressTests extends AbstractCompressorTests { +public class DeflateCompressTests extends AbstractCompressorTestCase { private final Compressor compressor = new DeflateCompressor(); @Override - Compressor compressor() { + protected Compressor compressor() { return compressor; } } diff --git a/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java b/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java index 7583e7bd371c3..5c9353d15e24a 100644 --- a/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java +++ b/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java @@ -35,7 +35,7 @@ import org.apache.lucene.tests.util.TestUtil; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.test.OpenSearchTestCase; import org.junit.Assert; diff --git a/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java b/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java index 8c01bdb2e7056..87b5ad3434944 100644 --- a/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java @@ -33,10 +33,10 @@ package org.opensearch.index.mapper; import org.apache.lucene.util.BytesRef; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; @@ -119,11 +119,11 @@ public void testStoredValue() throws IOException { // case 2: a value that looks compressed: this used to fail in 1.x BytesStreamOutput out = new BytesStreamOutput(); - try (OutputStream compressed = CompressorFactory.defaultCompressor().threadLocalOutputStream(out)) { + try (OutputStream compressed = CompressorRegistry.defaultCompressor().threadLocalOutputStream(out)) { new BytesArray(binaryValue1).writeTo(compressed); } final byte[] binaryValue2 = BytesReference.toBytes(out.bytes()); - assertTrue(CompressorFactory.isCompressed(new BytesArray(binaryValue2))); + assertTrue(CompressorRegistry.isCompressed(new BytesArray(binaryValue2))); for (byte[] value : Arrays.asList(binaryValue1, binaryValue2)) { ParsedDocument doc = mapperService.documentMapper().parse(source(b -> b.field("field", value))); diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index fd99983a0c791..93be194b2d112 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -39,13 +39,12 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobStore; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.compress.CompressorType; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.compress.Compressor; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; @@ -57,7 +56,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.util.Arrays; import java.util.Map; import static org.hamcrest.Matchers.containsString; @@ -122,12 +120,12 @@ public void testBlobStoreOperations() throws IOException { ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", CompressorType.NONE.compressor()); + checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", CompressorRegistry.none()); checksumSMILE.write( new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", - CompressorFactory.DEFLATE_COMPRESSOR + CompressorRegistry.getCompressor(DeflateCompressor.NAME) ); // Assert that all checksum blobs can be read @@ -144,8 +142,8 @@ public void testCompressionIsApplied() throws IOException { } ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); BlobObj blobObj = new BlobObj(veryRedundantText.toString()); - checksumFormat.write(blobObj, blobContainer, "blob-comp", CompressorType.DEFLATE.compressor()); - checksumFormat.write(blobObj, blobContainer, "blob-not-comp", CompressorType.NONE.compressor()); + checksumFormat.write(blobObj, blobContainer, "blob-comp", CompressorRegistry.getCompressor(DeflateCompressor.NAME)); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp", CompressorRegistry.none()); Map blobs = blobContainer.listBlobsByPrefix("blob-"); assertEquals(blobs.size(), 2); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); @@ -157,12 +155,7 @@ public void testBlobCorruption() throws IOException { String testString = randomAlphaOfLength(randomInt(10000)); BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); - checksumFormat.write( - blobObj, - blobContainer, - "test-path", - randomFrom(Arrays.stream(CompressorType.values()).map(CompressorType::compressor).toArray(Compressor[]::new)) - ); + checksumFormat.write(blobObj, blobContainer, "test-path", randomFrom(CompressorRegistry.registeredCompressors().values())); assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { diff --git a/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java b/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java index c244cba513982..89018b7353e7c 100644 --- a/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java +++ b/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java @@ -32,12 +32,12 @@ package org.opensearch.transport; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStream; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.test.OpenSearchTestCase; import java.io.EOFException; @@ -56,7 +56,7 @@ public void testStreamWithoutCompression() throws IOException { // Closing compression stream does not close underlying stream stream.close(); - assertFalse(CompressorFactory.defaultCompressor().isCompressed(bytesRef)); + assertFalse(CompressorRegistry.defaultCompressor().isCompressed(bytesRef)); StreamInput streamInput = bytesRef.streamInput(); byte[] actualBytes = new byte[expectedBytes.length]; @@ -83,10 +83,10 @@ public void testStreamWithCompression() throws IOException { BytesReference bytesRef = stream.materializeBytes(); stream.close(); - assertTrue(CompressorFactory.defaultCompressor().isCompressed(bytesRef)); + assertTrue(CompressorRegistry.defaultCompressor().isCompressed(bytesRef)); StreamInput streamInput = new InputStreamStreamInput( - CompressorFactory.defaultCompressor().threadLocalInputStream(bytesRef.streamInput()) + CompressorRegistry.defaultCompressor().threadLocalInputStream(bytesRef.streamInput()) ); byte[] actualBytes = new byte[expectedBytes.length]; streamInput.readBytes(actualBytes, 0, expectedBytes.length); @@ -110,7 +110,7 @@ public void testCompressionWithCallingMaterializeFails() throws IOException { stream.write(expectedBytes); StreamInput streamInput = new InputStreamStreamInput( - CompressorFactory.defaultCompressor().threadLocalInputStream(bStream.bytes().streamInput()) + CompressorRegistry.defaultCompressor().threadLocalInputStream(bStream.bytes().streamInput()) ); byte[] actualBytes = new byte[expectedBytes.length]; EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length)); diff --git a/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java b/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java index 49d22b2221828..9811c0f690800 100644 --- a/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java +++ b/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java @@ -33,7 +33,6 @@ package org.opensearch.transport; import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasables; @@ -43,6 +42,7 @@ import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -54,7 +54,7 @@ public void testSimpleCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { byte randomByte = randomByte(); try ( - OutputStream deflateStream = CompressorFactory.defaultCompressor() + OutputStream deflateStream = CompressorRegistry.defaultCompressor() .threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) { deflateStream.write(randomByte); @@ -77,7 +77,7 @@ public void testMultiPageCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { try ( StreamOutput deflateStream = new OutputStreamStreamOutput( - CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) + CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) ) { for (int i = 0; i < 10000; ++i) { @@ -109,7 +109,7 @@ public void testIncrementalMultiPageCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { try ( StreamOutput deflateStream = new OutputStreamStreamOutput( - CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) + CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) ) { for (int i = 0; i < 10000; ++i) { diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java index 5e1278302500a..4d23e2aecc118 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java @@ -46,11 +46,11 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.compress.CompressorType; import org.opensearch.common.io.Streams; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -97,7 +97,7 @@ protected Settings repositorySettings() { final Settings.Builder builder = Settings.builder(); builder.put("compress", compress); if (compress) { - builder.put("compression_type", randomFrom(CompressorType.values())); + builder.put("compression_type", randomFrom(CompressorRegistry.registeredCompressors().keySet())); } return builder.build(); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 55b393ab4c577..b04e71d0fca52 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -52,7 +52,6 @@ import org.opensearch.common.action.ActionFuture; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.compress.CompressorType; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; @@ -60,6 +59,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; @@ -417,7 +417,7 @@ protected Settings.Builder randomRepositorySettings() { final boolean compress = randomBoolean(); settings.put("location", randomRepoPath()).put("compress", compress); if (compress) { - settings.put("compression_type", randomFrom(CompressorType.values())); + settings.put("compression_type", randomFrom(CompressorRegistry.registeredCompressors().keySet())); } if (rarely()) { settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); diff --git a/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java b/test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java similarity index 98% rename from server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java rename to test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java index a2a54f444ad9d..be53e46122157 100644 --- a/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java +++ b/test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java @@ -6,11 +6,11 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.test.core.compress; import org.apache.lucene.tests.util.LineFileDocs; import org.apache.lucene.tests.util.TestUtil; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; @@ -22,7 +22,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; -abstract class AbstractCompressorTests extends OpenSearchTestCase { +public abstract class AbstractCompressorTestCase extends OpenSearchTestCase { public void testRandom() throws IOException { Random r = random(); @@ -404,6 +404,6 @@ private void doTest(byte bytes[]) throws IOException { assertArrayEquals(bytes, uncompressedOut.toByteArray()); } - abstract Compressor compressor(); + protected abstract Compressor compressor(); }