Skip to content

Commit

Permalink
feature flagging of Zstandard
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Aug 23, 2023
1 parent 5d3633c commit 6e2e2f6
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))
- Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177))
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412))
- Move support for ZStd compression behind a feature flag ([#9476](https://github.com/opensearch-project/OpenSearch/pull/9476))

### Deprecated

Expand Down
6 changes: 6 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,9 @@ ${path.logs}
# index searcher threadpool.
#
#opensearch.experimental.feature.concurrent_segment_search.enabled: false
#
#
# Gates the visibility of the ZStd compression algorithm features
# for indexing operations.
#
#opensearch.experimental.feature.compression.zstd.enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
Expand All @@ -40,6 +41,11 @@

public class MultiCodecReindexIT extends ReindexTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
}

public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException {
internalCluster().ensureAtLeastNumDataNodes(1);
Map<String, String> codecMap = Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.concurrent.ExecutionException;
Expand All @@ -21,6 +22,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class CodecCompressionLevelIT extends OpenSearchIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
}

public void testLuceneCodecsCreateIndexWithCompressionLevel() {

internalCluster().ensureAtLeastNumDataNodes(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.engine.Segment;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -40,6 +41,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class MultiCodecMergeIT extends OpenSearchIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
}

public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {

Map<String, String> codecMap = Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -112,6 +113,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends SegmentReplicationBaseIT {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
}

@Before
private void setup() {
internalCluster().startClusterManagerOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -123,6 +124,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(InternalSettingsPlugin.class);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
// Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -72,6 +73,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
}

/**
* This test tries to truncate some of larger files in the index to trigger leftovers on the recovery
* target. This happens during recovery when the last chunk of the file is transferred to the replica
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.FieldMemoryStats;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.Fuzziness;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.MapperParsingException;
Expand Down Expand Up @@ -106,6 +107,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(InternalSettingsPlugin.class);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
}

public void testTieBreak() throws Exception {
final CompletionMappingBuilder mapping = new CompletionMappingBuilder();
mapping.indexAnalyzer("keyword");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.geo.GeoPoint;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.Fuzziness;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -74,6 +75,11 @@ public class ContextCompletionSuggestSearchIT extends OpenSearchIntegTestCase {
private final String INDEX = RandomStrings.randomAsciiOfLength(random(), 10).toLowerCase(Locale.ROOT);
private final String FIELD = RandomStrings.randomAsciiOfLength(random(), 10).toLowerCase(Locale.ROOT);

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
}

@Override
protected int numberOfReplicas() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ protected FeatureFlagSettings(
Arrays.asList(
FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING,
FeatureFlags.REMOTE_STORE_SETTING,
FeatureFlags.ZSTD_COMPRESSION_SETTING,
FeatureFlags.EXTENSIONS_SETTING,
FeatureFlags.IDENTITY_SETTING,
FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
*/
public class FeatureFlags {

/**
* Gates the visibility of the index settings that allows the utilization of ZStd compression algorithm features for indexing operations.
*/
public static final String ZSTD_COMPRESSION = "opensearch.experimental.feature.compression.zstd.enabled";

/**
* Gates the visibility of the segment replication experimental features that allows users to test unreleased beta features.
*/
Expand Down Expand Up @@ -96,6 +101,8 @@ public static boolean isEnabled(String featureFlagName) {
Property.NodeScope
);

public static final Setting<Boolean> ZSTD_COMPRESSION_SETTING = Setting.boolSetting(ZSTD_COMPRESSION, false, Property.NodeScope);

public static final Setting<Boolean> REMOTE_STORE_SETTING = Setting.boolSetting(REMOTE_STORE, false, Property.NodeScope);

public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);
Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/org/opensearch/index/codec/CodecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.customcodecs.ZstdCodec;
import org.opensearch.index.codec.customcodecs.ZstdNoDictCodec;
Expand Down Expand Up @@ -67,7 +69,9 @@ public class CodecService {
* the raw unfiltered lucene default. useful for testing
*/
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
@ExperimentalApi
public static final String ZSTD_CODEC = "zstd";
@ExperimentalApi
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";

public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) {
Expand All @@ -79,15 +83,19 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
codecs.put(LZ4, new Lucene95Codec());
codecs.put(BEST_COMPRESSION_CODEC, new Lucene95Codec(Mode.BEST_COMPRESSION));
codecs.put(ZLIB, new Lucene95Codec(Mode.BEST_COMPRESSION));
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
if (FeatureFlags.isEnabled(FeatureFlags.ZSTD_COMPRESSION)) {
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
}
} else {
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
if (FeatureFlags.isEnabled(FeatureFlags.ZSTD_COMPRESSION)) {
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
}
}
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
for (String codec : Codec.availableCodecs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.unit.MemorySizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -133,9 +134,17 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
case "lz4":
case "best_compression":
case "zlib":
case "lucene_default":
return s;
case "zstd":
case "zstd_no_dict":
case "lucene_default":
if (FeatureFlags.isEnabled(FeatureFlags.ZSTD_COMPRESSION) == false) {
throw new IllegalArgumentException(
"ZStandard must be enabled via [opensearch.experimental.feature.compression.zstd.enabled] feature flag to set "
+ s
+ " codec."
);
}
return s;
default:
if (Codec.availableCodecs().contains(s) == false) { // we don't error message the not officially supported ones
Expand Down Expand Up @@ -183,6 +192,11 @@ private static void doValidateCodecSettings(final String codec) {
switch (codec) {
case "zstd":
case "zstd_no_dict":
if (FeatureFlags.isEnabled(FeatureFlags.ZSTD_COMPRESSION) == false) {
throw new IllegalArgumentException(

Check warning on line 196 in server/src/main/java/org/opensearch/index/engine/EngineConfig.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/EngineConfig.java#L196

Added line #L196 was not covered by tests
"ZStandard must be enabled via [opensearch.experimental.feature.compression.zstd.enabled] feature flag to set compression level."
);
}
return;
case "best_compression":
case "zlib":
Expand Down
22 changes: 22 additions & 0 deletions server/src/test/java/org/opensearch/index/codec/CodecTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.IndexAnalyzers;
Expand All @@ -55,8 +56,10 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -67,6 +70,11 @@
@SuppressCodecs("*") // we test against default codec so never get a random one here!
public class CodecTests extends OpenSearchTestCase {

@Before
public void enableZstd() {
FeatureFlagSetter.set(FeatureFlags.ZSTD_COMPRESSION);
}

public void testResolveDefaultCodecs() throws Exception {
CodecService codecService = createCodecService(false);
assertThat(codecService.codec("default"), instanceOf(PerFieldMappingPostingFormatCodec.class));
Expand Down Expand Up @@ -148,6 +156,20 @@ public void testBestCompressionWithCompressionLevel() {
assertTrue(e.getMessage().startsWith("Compression level cannot be set"));
}

public void testZstdFeatureFlag() {
FeatureFlagSetter.clear();
final Settings zstdSettings = Settings.builder()
.put(EngineConfig.INDEX_CODEC_SETTING.getKey(), randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
.build();

IndexScopedSettings zstdIndexScopedSettings = new IndexScopedSettings(zstdSettings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> zstdIndexScopedSettings.validate(zstdSettings, true)
);
assertTrue(e.getMessage().startsWith("ZStandard must be enabled via"));
}

public void testLuceneCodecsWithCompressionLevel() {
String codecName = randomFrom(Codec.availableCodecs());
Codec codec = Codec.forName(codecName);
Expand Down

0 comments on commit 6e2e2f6

Please sign in to comment.