Skip to content

Commit

Permalink
Introducing ZStd compression codec plugin (#9658)
Browse files Browse the repository at this point in the history
* introducing zstd compression codec plugin

Signed-off-by: Sarthak Aggarwal <[email protected]>

* Moving zstd compression codec as a plugin

Signed-off-by: Sarthak Aggarwal <[email protected]>

* introducing zstd compression codec plugin

Signed-off-by: Sarthak Aggarwal <[email protected]>

* Adding checks to EngineConfig and fixing tests

Signed-off-by: Prabhakar Sithanandam <[email protected]>

* incorporating review comments

Signed-off-by: Sarthak Aggarwal <[email protected]>

* fixing tests

Signed-off-by: Sarthak Aggarwal <[email protected]>

* introducing zstd compression codec plugin

Signed-off-by: Sarthak Aggarwal <[email protected]>

* addressing review comments

Signed-off-by: Sarthak Aggarwal <[email protected]>

* nit fixes

Signed-off-by: Sarthak Aggarwal <[email protected]>

* implementing codec aliases

Signed-off-by: Sarthak Aggarwal <[email protected]>

* addressing review comments

Signed-off-by: Sarthak Aggarwal <[email protected]>

* review comments

Signed-off-by: Sarthak Aggarwal <[email protected]>

* moving codec aliases to custom codec

Signed-off-by: Sarthak Aggarwal <[email protected]>

* adding zstd default codec for backward compatibility

Signed-off-by: Sarthak Aggarwal <[email protected]>

* renaming to deprecated codec

Signed-off-by: Sarthak Aggarwal <[email protected]>

* incorporating review comments

Signed-off-by: Sarthak Aggarwal <[email protected]>

* nit fixes

Signed-off-by: Sarthak Aggarwal <[email protected]>

---------

Signed-off-by: Sarthak Aggarwal <[email protected]>
Signed-off-by: Prabhakar Sithanandam <[email protected]>
Signed-off-by: Andrew Ross <[email protected]>
Co-authored-by: Prabhakar Sithanandam <[email protected]>
Co-authored-by: Andrew Ross <[email protected]>
  • Loading branch information
3 people authored Sep 6, 2023
1 parent 92b2095 commit 76f1b52
Show file tree
Hide file tree
Showing 30 changed files with 724 additions and 127 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
- Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503))
- Move ZStd to a plugin ([#9658](https://github.com/opensearch-project/OpenSearch/pull/9658))
- [Remote Store] Add support for Remote Translog Store upload stats in `_nodes/stats/` API ([#8908](https://github.com/opensearch-project/OpenSearch/pull/8908))

### Deprecated
Expand Down
2 changes: 2 additions & 0 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies {
testImplementation project(':modules:transport-netty4')
// for parent/child testing
testImplementation project(':modules:parent-join')
testImplementation project(':plugins:custom-codecs')
}

restResources {
Expand All @@ -95,4 +96,5 @@ forbiddenPatterns {
tasks.named("bundlePlugin").configure {
dependsOn("copyParentJoinMetadata")
dependsOn("copyTransportNetty4Metadata")
dependsOn("copyCustomCodecsMetadata")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
import org.opensearch.index.reindex.ReindexModulePlugin;
import org.opensearch.index.reindex.ReindexRequestBuilder;
import org.opensearch.index.reindex.ReindexTestCase;
import org.opensearch.plugins.Plugin;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand All @@ -40,6 +45,11 @@

public class MultiCodecReindexIT extends ReindexTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(CustomCodecPlugin.class, ReindexModulePlugin.class);
}

public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException {
internalCluster().ensureAtLeastNumDataNodes(1);
Map<String, String> codecMap = Map.of(
Expand Down
27 changes: 27 additions & 0 deletions plugins/custom-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
name 'custom-codecs'
description 'A plugin that implements custom compression codecs.'
classname 'org.opensearch.index.codec.customcodecs.CustomCodecPlugin'
licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt')
noticeFile rootProject.file('NOTICE.txt')
}

dependencies {
api "com.github.luben:zstd-jni:1.5.5-5"
}

testingConventions.enabled = false;
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC;
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class CodecCompressionLevelIT extends OpenSearchIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(CustomCodecPlugin.class);
}

public void testLuceneCodecsCreateIndexWithCompressionLevel() {

Expand Down Expand Up @@ -62,7 +72,7 @@ public void testZStandardCodecsCreateIndexWithCompressionLevel() {
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
.build()
);
Expand All @@ -81,7 +91,7 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
.build()
);
Expand Down Expand Up @@ -164,7 +174,7 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx
.updateSettings(
new UpdateSettingsRequest(index).settings(
Settings.builder()
.put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC))
.put("index.codec.compression_level", randomIntBetween(1, 6))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
import org.opensearch.index.engine.Segment;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -40,6 +44,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class MultiCodecMergeIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(CustomCodecPlugin.class);
}

public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {

Map<String, String> codecMap = Map.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.Plugin;

import java.util.Optional;

/**
* A plugin that implements custom codecs. Supports these codecs:
* <ul>
* <li>ZSTD
* <li>ZSTDNODICT
* </ul>
*
* @opensearch.internal
*/
public final class CustomCodecPlugin extends Plugin implements EnginePlugin {

/**
* Creates a new instance
*/
public CustomCodecPlugin() {}

/**
* @param indexSettings is the default indexSettings
* @return the engine factory
*/
@Override
public Optional<CodecServiceFactory> getCustomCodecServiceFactory(final IndexSettings indexSettings) {
String codecName = indexSettings.getValue(EngineConfig.INDEX_CODEC_SETTING);
if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC) || codecName.equals(CustomCodecService.ZSTD_CODEC)) {
return Optional.of(new CustomCodecServiceFactory());
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.mapper.MapperService;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Stream;

import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;

/**
* CustomCodecService provides ZSTD and ZSTD_NO_DICT compression codecs.
*/
public class CustomCodecService extends CodecService {
private final Map<String, Codec> codecs;
/**
* ZStandard codec
*/
public static final String ZSTD_CODEC = "zstd";
/**
* ZStandard without dictionary codec
*/
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";

/**
* Creates a new CustomCodecService.
*
* @param mapperService The mapper service.
* @param indexSettings The index settings.
* @param logger The logger.
*/
public CustomCodecService(MapperService mapperService, IndexSettings indexSettings, Logger logger) {
super(mapperService, indexSettings, logger);
int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
} else {
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
}
this.codecs = codecs.immutableMap();
}

@Override
public Codec codec(String name) {
Codec codec = codecs.get(name);
if (codec == null) {
return super.codec(name);
}
return codec;
}

@Override
public String[] availableCodecs() {
return Stream.concat(Arrays.stream(super.availableCodecs()), codecs.keySet().stream()).toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceConfig;
import org.opensearch.index.codec.CodecServiceFactory;

/**
* A factory for creating new {@link CodecService} instance
*/
public class CustomCodecServiceFactory implements CodecServiceFactory {

/** Creates a new instance. */
public CustomCodecServiceFactory() {}

@Override
public CodecService createCodecService(CodecServiceConfig config) {
return new CustomCodecService(config.getMapperService(), config.getIndexSettings(), config.getLogger());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
import org.opensearch.index.mapper.MapperService;

import java.util.Collections;
import java.util.Set;

/**
*
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
Expand All @@ -23,12 +26,48 @@
* @opensearch.internal
*/
public abstract class Lucene95CustomCodec extends FilterCodec {

/** Default compression level used for compression */
public static final int DEFAULT_COMPRESSION_LEVEL = 3;

/** Each mode represents a compression algorithm. */
public enum Mode {
ZSTD,
ZSTD_NO_DICT
/**
* ZStandard mode with dictionary
*/
ZSTD("ZSTD", Set.of("zstd")),
/**
* ZStandard mode without dictionary
*/
ZSTD_NO_DICT("ZSTDNODICT", Set.of("zstd_no_dict")),
/**
* Deprecated ZStandard mode, added for backward compatibility to support indices created in 2.9.0 where
* both ZSTD and ZSTD_NO_DICT used Lucene95CustomCodec underneath. This should not be used to
* create new indices.
*/
ZSTD_DEPRECATED("Lucene95CustomCodec", Collections.emptySet());

private final String codec;
private final Set<String> aliases;

Mode(String codec, Set<String> aliases) {
this.codec = codec;
this.aliases = aliases;
}

/**
* Returns the Codec that is registered with Lucene
*/
public String getCodec() {
return codec;
}

/**
* Returns the aliases of the Codec
*/
public Set<String> getAliases() {
return aliases;
}
}

private final StoredFieldsFormat storedFieldsFormat;
Expand All @@ -51,12 +90,22 @@ public Lucene95CustomCodec(Mode mode) {
* @param compressionLevel The compression level.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel) {
super("Lucene95CustomCodec", new Lucene95Codec());
super(mode.getCodec(), new Lucene95Codec());
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
* @param mapperService The mapper service.
* @param logger The logger.
*/
public Lucene95CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) {
super("Lucene95CustomCodec", new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger));
super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger));
this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel);
}

Expand Down
Loading

0 comments on commit 76f1b52

Please sign in to comment.