Skip to content

Commit

Permalink
Refactor Compressors from CompressorFactory to CompressorRegistry for…
Browse files Browse the repository at this point in the history
… extensibility (#9262)

This commit refactors the CompressorFactory static singleton
class and CompressorType enum to a formal CompressorRegistry and enables
downstream implementations to register their own compression
implementations for use in compressing Blob stores and MediaType data.
This is different from Lucene's Codec compression extension points which
expose different compression implementations for Lucene's Stored Fields.

---------

Signed-off-by: Nicholas Walter Knize <[email protected]>
(cherry picked from commit c6b67e1)
  • Loading branch information
nknize committed Aug 21, 2023
1 parent 1fdc08a commit 00fb53c
Show file tree
Hide file tree
Showing 49 changed files with 526 additions and 244 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))
- Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177))
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412))
- Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility ([#9262](https://github.com/opensearch-project/OpenSearch/pull/9262))

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions gradle/missing-javadoc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ configure([
configure([
project(":libs:opensearch-common"),
project(":libs:opensearch-core"),
project(":libs:opensearch-compress"),
project(":server")
]) {
project.tasks.withType(MissingJavadocTask) {
Expand Down
38 changes: 38 additions & 0 deletions libs/compress/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.build'
apply plugin: 'opensearch.publish'

base {
archivesName = 'opensearch-compress'
}

dependencies {
api project(':libs:opensearch-common')
api project(':libs:opensearch-core')

//zstd
api "com.github.luben:zstd-jni:${versions.zstd}"

testImplementation(project(":test:framework")) {
// tests use the locally compiled version of server
exclude group: 'org.opensearch', module: 'opensearch-compress'
}
}

tasks.named('forbiddenApisMain').configure {
// :libs:opensearch-compress does not depend on server
// TODO: Need to decide how we want to handle for forbidden signatures with the changes to server
replaceSignatureFiles 'jdk-signatures'
}

jarHell.enabled = false
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.compress;

import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.compress.Compressor;
import org.opensearch.core.compress.Compressor;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
Expand All @@ -25,7 +26,8 @@
/**
* {@link Compressor} implementation based on the ZSTD compression algorithm.
*
* @opensearch.internal
* @opensearch.api - registered name requires BWC support
* @opensearch.experimental - class methods might change
*/
public class ZstdCompressor implements Compressor {
// An arbitrary header that we use to identify compressed streams
Expand All @@ -34,6 +36,14 @@ public class ZstdCompressor implements Compressor {
// a XContent
private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' };

/**
* The name to register the compressor by
*
* @opensearch.api - requires BWC support
*/
@PublicApi(since = "2.10.0")
public static final String NAME = "ZSTD";

private static final int LEVEL = 3;

private static final int BUFFER_SIZE = 4096;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
* compatible open source license.
*/

/** Classes for core compress module */
package org.opensearch.core.common.compress;
/**
* Concrete {@link org.opensearch.core.compress.Compressor} implementations
*/
package org.opensearch.compress;
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.compress.spi;

import org.opensearch.compress.ZstdCompressor;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.spi.CompressorProvider;

import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.Map.Entry;

/**
* Additional "optional" compressor implementations provided by the opensearch compress library
*
* @opensearch.internal
*/
public class CompressionProvider implements CompressorProvider {

/** Returns the concrete {@link Compressor}s provided by the compress library */
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public List<Entry<String, Compressor>> getCompressors() {
return List.of(new SimpleEntry<>(ZstdCompressor.NAME, new ZstdCompressor()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Service Provider Interface for registering concrete {@link org.opensearch.core.compress.Compressor}
* implementations.
*
* See {@link org.opensearch.compress.ZstdCompressor}
*/
package org.opensearch.compress.spi;
13 changes: 13 additions & 0 deletions libs/compress/src/main/java/org/opensearch/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This is the compress library for registering optional
* {@link org.opensearch.core.compress.Compressor} implementations
*/
package org.opensearch;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

org.opensearch.compress.spi.CompressionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.compress;

import org.opensearch.core.common.compress.Compressor;
import org.opensearch.core.compress.Compressor;
import org.opensearch.test.core.compress.AbstractCompressorTestCase;

/**
* Test streaming compression
*/
public class ZstdCompressTests extends AbstractCompressorTests {
public class ZstdCompressTests extends AbstractCompressorTestCase {

private final Compressor compressor = new ZstdCompressor();

@Override
Compressor compressor() {
protected Compressor compressor() {
return compressor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,29 @@
* GitHub history for details.
*/

package org.opensearch.core.common.compress;
package org.opensearch.core.compress;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Compressor interface
* Compressor interface used for compressing {@link org.opensearch.core.xcontent.MediaType} and
* {@code org.opensearch.repositories.blobstore.BlobStoreRepository} implementations.
*
* @opensearch.internal
* This is not to be confused with {@link org.apache.lucene.codecs.compressing.Compressor} which is used
* for codec implementations such as {@code org.opensearch.index.codec.customcodecs.Lucene95CustomCodec}
* for compressing {@link org.apache.lucene.document.StoredField}s
*
* @opensearch.api - intended to be extended
* @opensearch.experimental - however, bwc is not guaranteed at this time
*/
@ExperimentalApi
@PublicApi(since = "2.10.0")
public interface Compressor {

boolean isCompressed(BytesReference bytes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.compress;

import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.compress.spi.CompressorProvider;
import org.opensearch.core.xcontent.MediaTypeRegistry;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.stream.Collectors;

/**
* A registry that wraps a static Map singleton which holds a mapping of unique String names (typically the
* compressor header as a string) to registerd {@link Compressor} implementations.
*
* This enables plugins, modules, extensions to register their own compression implementations through SPI
*
* @opensearch.experimental
* @opensearch.internal
*/
@InternalApi
public final class CompressorRegistry {

// the backing registry map
private static final Map<String, Compressor> registeredCompressors = ServiceLoader.load(
CompressorProvider.class,
CompressorProvider.class.getClassLoader()
)
.stream()
.flatMap(p -> p.get().getCompressors().stream())
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

// no instance:
private CompressorRegistry() {}

/**
* Returns the default compressor
*/
public static Compressor defaultCompressor() {
return registeredCompressors.get("DEFLATE");
}

public static Compressor none() {
return registeredCompressors.get(NoneCompressor.NAME);
}

public static boolean isCompressed(BytesReference bytes) {
return compressor(bytes) != null;
}

@Nullable
public static Compressor compressor(final BytesReference bytes) {
for (Compressor compressor : registeredCompressors.values()) {
if (compressor.isCompressed(bytes) == true) {
// bytes should be either detected as compressed or as xcontent,
// if we have bytes that can be either detected as compressed or
// as a xcontent, we have a problem
assert MediaTypeRegistry.xContentType(bytes) == null;
return compressor;
}
}

if (MediaTypeRegistry.xContentType(bytes) == null) {
throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes");
}

return null;
}

/** Decompress the provided {@link BytesReference}. */
public static BytesReference uncompress(BytesReference bytes) throws IOException {
Compressor compressor = compressor(bytes);
if (compressor == null) {
throw new NotCompressedException();
}
return compressor.uncompress(bytes);
}

/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
*/
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
return compressor == null ? bytes : compressor.uncompress(bytes);
}

/** Returns a registered compressor by its registered name */
public static Compressor getCompressor(final String name) {
if (registeredCompressors.containsKey(name)) {
return registeredCompressors.get(name);
}
throw new IllegalArgumentException("No registered compressor found by name [" + name + "]");
}

/**
* Returns the registered compressors as an Immutable collection
*
* note: used for testing
*/
public static Map<String, Compressor> registeredCompressors() {
// no destructive danger as backing map is immutable
return registeredCompressors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.core.compress;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.compress.Compressor;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -18,9 +18,18 @@
/**
* {@link Compressor} no compressor implementation.
*
* @opensearch.internal
* @opensearch.api - registered name requires BWC support
* @opensearch.experimental - class methods might change
*/
public class NoneCompressor implements Compressor {
/**
* The name to register the compressor by
*
* @opensearch.api - requires BWC support
*/
@PublicApi(since = "2.10.0")
public static final String NAME = "NONE";

@Override
public boolean isCompressed(BytesReference bytes) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* GitHub history for details.
*/

package org.opensearch.common.compress;
package org.opensearch.core.compress;

/**
* Exception indicating that we were expecting something compressed, which
Expand Down
Loading

0 comments on commit 00fb53c

Please sign in to comment.