Skip to content

Commit

Permalink
Add a new index setting to skip recovery source when synthetic source…
Browse files Browse the repository at this point in the history
… is enabled (#114618)

This change adds a new undocumented index settings that allows to use synthetic source for recovery and CCR without storing a recovery source.
  • Loading branch information
jimczi authored Dec 10, 2024
1 parent 16c4e14 commit d213efd
Show file tree
Hide file tree
Showing 46 changed files with 2,032 additions and 897 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/114618.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114618
summary: Add a new index setting to skip recovery source when synthetic source is enabled
area: Logs
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -26,6 +27,7 @@
import static org.elasticsearch.action.admin.indices.create.ShrinkIndexIT.assertNoResizeSourceIndexSettings;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -143,6 +145,51 @@ public void testResizeChangeSyntheticSource() {
assertThat(error.getMessage(), containsString("can't change setting [index.mapping.source.mode] during resize"));
}

public void testResizeChangeRecoveryUseSyntheticSource() {
prepareCreate("source").setSettings(
indexSettings(between(1, 5), 0).put("index.mode", "logsdb")
.put(
"index.version.created",
IndexVersionUtils.randomVersionBetween(
random(),
IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY,
IndexVersion.current()
)
)
).setMapping("@timestamp", "type=date", "host.name", "type=keyword").get();
updateIndexSettings(Settings.builder().put("index.blocks.write", true), "source");
IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> {
indicesAdmin().prepareResizeIndex("source", "target")
.setResizeType(ResizeType.CLONE)
.setSettings(
Settings.builder()
.put(
"index.version.created",
IndexVersionUtils.randomVersionBetween(
random(),
IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY,
IndexVersion.current()
)
)
.put("index.recovery.use_synthetic_source", true)
.put("index.mode", "logsdb")
.putNull("index.blocks.write")
.build()
)
.get();
});
// The index.recovery.use_synthetic_source setting requires either index.mode or index.mapping.source.mode
// to be present in the settings. Since these are all unmodifiable settings with a non-deterministic evaluation
// order, any of them may trigger a failure first.
assertThat(
error.getMessage(),
anyOf(
containsString("can't change setting [index.mode] during resize"),
containsString("can't change setting [index.recovery.use_synthetic_source] during resize")
)
);
}

public void testResizeChangeIndexSorts() {
prepareCreate("source").setSettings(indexSettings(between(1, 5), 0))
.setMapping("@timestamp", "type=date", "host.name", "type=keyword")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,15 @@ public void testShardChangesWithDefaultDocType() throws Exception {
}
IndexShard shard = indexService.getShard(0);
try (
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean(), randomBoolean());
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot(
"test",
0,
numOps - 1,
true,
randomBoolean(),
randomBoolean(),
randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes())
);
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()
) {
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.indices.IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING;
import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -257,7 +256,7 @@ private void assertOnGoingRecoveryState(
public Settings.Builder createRecoverySettingsChunkPerSecond(long chunkSizeBytes) {
return Settings.builder()
// Set the chunk size in bytes
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSizeBytes, ByteSizeUnit.BYTES))
.put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE.getKey(), new ByteSizeValue(chunkSizeBytes, ByteSizeUnit.BYTES))
// Set one chunk of bytes per second.
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSizeBytes, ByteSizeUnit.BYTES);
}
Expand All @@ -280,7 +279,7 @@ private void unthrottleRecovery() {
Settings.builder()
// 200mb is an arbitrary number intended to be large enough to avoid more throttling.
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "200mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
.put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoveryFilesInfoRequest;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -41,7 +41,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -52,7 +51,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
return Arrays.asList(MockTransportService.TestPlugin.class);
}

/**
Expand All @@ -63,7 +62,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
*/
public void testCancelRecoveryAndResume() throws Exception {
updateClusterSettings(
Settings.builder().put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
Settings.builder()
.put(
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE.getKey(),
new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)
)
);

NodesStatsResponse nodeStats = clusterAdmin().prepareNodesStats().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,24 @@ public void testRestoreChangeSyntheticSource() {
assertThat(error.getMessage(), containsString("cannot modify setting [index.mapping.source.mode] on restore"));
}

public void testRestoreChangeRecoveryUseSyntheticSource() {
Client client = client();
createRepository("test-repo", "fs");
String indexName = "test-idx";
assertAcked(client.admin().indices().prepareCreate(indexName).setSettings(Settings.builder().put(indexSettings())));
createSnapshot("test-repo", "test-snap", Collections.singletonList(indexName));
cluster().wipeIndices(indexName);
var error = expectThrows(SnapshotRestoreException.class, () -> {
client.admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap")
.setIndexSettings(Settings.builder().put("index.recovery.use_synthetic_source", true))
.setWaitForCompletion(true)
.get();
});
assertThat(error.getMessage(), containsString("cannot modify setting [index.recovery.use_synthetic_source] on restore"));
}

public void testRestoreChangeIndexSorts() {
Client client = client();
createRepository("test-repo", "fs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,7 @@ static void validateCloneIndex(
private static final Set<String> UNMODIFIABLE_SETTINGS_DURING_RESIZE = Set.of(
IndexSettings.MODE.getKey(),
SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(),
IndexSettings.RECOVERY_USE_SYNTHETIC_SOURCE_SETTING.getKey(),
IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(),
IndexSortConfig.INDEX_SORT_ORDER_SETTING.getKey(),
IndexSortConfig.INDEX_SORT_MODE_SETTING.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_WRITE_SETTING,
IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING,
SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING,
IndexSettings.RECOVERY_USE_SYNTHETIC_SOURCE_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
69 changes: 68 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -51,6 +52,7 @@
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.elasticsearch.index.mapper.SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING;

/**
* This class encapsulates all index level settings and handles settings updates.
Expand Down Expand Up @@ -653,6 +655,62 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final Setting<Boolean> RECOVERY_USE_SYNTHETIC_SOURCE_SETTING = Setting.boolSetting(
"index.recovery.use_synthetic_source",
false,
new Setting.Validator<>() {
@Override
public void validate(Boolean value) {}

@Override
public void validate(Boolean enabled, Map<Setting<?>, Object> settings) {
if (enabled == false) {
return;
}

// Verify if synthetic source is enabled on the index; fail if it is not
var indexMode = (IndexMode) settings.get(MODE);
if (indexMode.defaultSourceMode() != SourceFieldMapper.Mode.SYNTHETIC) {
var sourceMode = (SourceFieldMapper.Mode) settings.get(INDEX_MAPPER_SOURCE_MODE_SETTING);
if (sourceMode != SourceFieldMapper.Mode.SYNTHETIC) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"The setting [%s] is only permitted when [%s] is set to [%s]. Current mode: [%s].",
RECOVERY_USE_SYNTHETIC_SOURCE_SETTING.getKey(),
INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(),
SourceFieldMapper.Mode.SYNTHETIC.name(),
sourceMode.name()
)
);
}
}

// Verify that all nodes can handle this setting
var version = (IndexVersion) settings.get(SETTING_INDEX_VERSION_CREATED);
if (version.before(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"The setting [%s] is unavailable on this cluster because some nodes are running older "
+ "versions that do not support it. Please upgrade all nodes to the latest version "
+ "and try again.",
RECOVERY_USE_SYNTHETIC_SOURCE_SETTING.getKey()
)
);
}
}

@Override
public Iterator<Setting<?>> settings() {
List<Setting<?>> res = List.of(INDEX_MAPPER_SOURCE_MODE_SETTING, SETTING_INDEX_VERSION_CREATED, MODE);
return res.iterator();
}
},
Property.IndexScope,
Property.Final
);

/**
* Returns <code>true</code> if TSDB encoding is enabled. The default is <code>true</code>
*/
Expand Down Expand Up @@ -824,6 +882,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile boolean skipIgnoredSourceRead;
private final SourceFieldMapper.Mode indexMappingSourceMode;
private final boolean recoverySourceEnabled;
private final boolean recoverySourceSyntheticEnabled;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -984,8 +1043,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
es87TSDBCodecEnabled = scopedSettings.get(TIME_SERIES_ES87TSDB_CODEC_ENABLED_SETTING);
skipIgnoredSourceWrite = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_WRITE_SETTING);
skipIgnoredSourceRead = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING);
indexMappingSourceMode = scopedSettings.get(SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING);
indexMappingSourceMode = scopedSettings.get(INDEX_MAPPER_SOURCE_MODE_SETTING);
recoverySourceEnabled = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(nodeSettings);
recoverySourceSyntheticEnabled = scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
Expand Down Expand Up @@ -1677,6 +1737,13 @@ public boolean isRecoverySourceEnabled() {
return recoverySourceEnabled;
}

/**
* @return Whether recovery source should always be bypassed in favor of using synthetic source.
*/
public boolean isRecoverySourceSyntheticEnabled() {
return recoverySourceSyntheticEnabled;
}

/**
* The bounds for {@code @timestamp} on this index or
* {@code null} if there are no bounds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private static Version parseUnchecked(String version) {
public static final IndexVersion LOGSDB_DEFAULT_IGNORE_DYNAMIC_BEYOND_LIMIT = def(9_001_00_0, Version.LUCENE_10_0_0);
public static final IndexVersion TIME_BASED_K_ORDERED_DOC_ID = def(9_002_00_0, Version.LUCENE_10_0_0);
public static final IndexVersion DEPRECATE_SOURCE_MODE_MAPPER = def(9_003_00_0, Version.LUCENE_10_0_0);
public static final IndexVersion USE_SYNTHETIC_SOURCE_FOR_RECOVERY = def(9_004_00_0, Version.LUCENE_10_0_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ final class CombinedDocValues {
private final NumericDocValues primaryTermDV;
private final NumericDocValues tombstoneDV;
private final NumericDocValues recoverySource;
private final NumericDocValues recoverySourceSize;

CombinedDocValues(LeafReader leafReader) throws IOException {
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
Expand All @@ -34,6 +35,7 @@ final class CombinedDocValues {
);
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
this.recoverySourceSize = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_SIZE_NAME);
}

long docVersion(int segmentDocId) throws IOException {
Expand Down Expand Up @@ -79,4 +81,12 @@ boolean hasRecoverySource(int segmentDocId) throws IOException {
assert recoverySource.docID() < segmentDocId;
return recoverySource.advanceExact(segmentDocId);
}

long recoverySourceSize(int segmentDocId) throws IOException {
if (recoverySourceSize == null) {
return -1;
}
assert recoverySourceSize.docID() < segmentDocId;
return recoverySourceSize.advanceExact(segmentDocId) ? recoverySourceSize.longValue() : -1;
}
}
Loading

0 comments on commit d213efd

Please sign in to comment.