Skip to content

Commit

Permalink
Fix NPE on restore searchable snapshot (opensearch-project#13911)
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <[email protected]>
Signed-off-by: Andrew Ross <[email protected]>
Co-authored-by: Andrew Ross <[email protected]>
  • Loading branch information
bugmakerrrrrr and andrross authored May 31, 2024
1 parent bb84ca7 commit 9a87624
Show file tree
Hide file tree
Showing 17 changed files with 145 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710))
- Don't return negative scores from `multi_match` query with `cross_fields` type ([#13829](https://github.com/opensearch-project/OpenSearch/pull/13829))
- Pass parent filter to inner hit query ([#13903](https://github.com/opensearch-project/OpenSearch/pull/13903))
- Fix NPE on restore searchable snapshot ([#13911](https://github.com/opensearch-project/OpenSearch/pull/13911))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.node.Node;
import org.opensearch.repositories.fs.FsRepository;
import org.hamcrest.MatcherAssert;
import org.junit.After;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -62,6 +63,10 @@

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -939,6 +944,52 @@ public void testRelocateSearchableSnapshotIndex() throws Exception {
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
}

public void testCreateSearchableSnapshotWithSpecifiedRemoteDataRatio() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final String restoredIndexName1 = indexName1 + "-copy";
final String indexName2 = "test-idx-2";
final String restoredIndexName2 = indexName2 + "-copy";
final int numReplicasIndex1 = 1;
final int numReplicasIndex2 = 1;

Settings clusterManagerNodeSettings = clusterManagerOnlyNode();
internalCluster().startNodes(2, clusterManagerNodeSettings);
Settings dateNodeSettings = dataNode();
internalCluster().startNodes(2, dateNodeSettings);
createIndexWithDocsAndEnsureGreen(numReplicasIndex1, 100, indexName1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex2, 100, indexName2);

final Client client = client();
assertAcked(
client.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5))
);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1, indexName2);

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);

assertDocCount(restoredIndexName1, 100L);
assertDocCount(restoredIndexName2, 100L);
assertIndexDirectoryDoesNotExist(restoredIndexName1, restoredIndexName2);
}

@After
public void cleanup() throws Exception {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey()))
);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -59,8 +58,6 @@
import java.util.Set;
import java.util.stream.Stream;

import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;

/**
* Transport action for updating index settings
*
Expand Down Expand Up @@ -133,9 +130,7 @@ protected ClusterBlockException checkBlock(UpdateSettingsRequest request, Cluste
for (Index index : requestIndices) {
if (state.blocks().indexBlocked(ClusterBlockLevel.METADATA_WRITE, index.getName())) {
allowSearchableSnapshotSettingsUpdate = allowSearchableSnapshotSettingsUpdate
&& IndexModule.Type.REMOTE_SNAPSHOT.match(
state.getMetadata().getIndexSafe(index).getSettings().get(INDEX_STORE_TYPE_SETTING.getKey())
);
&& state.getMetadata().getIndexSafe(index).isRemoteSnapshot();
}
}
// check if all settings in the request are in the allow list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexModule;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -399,7 +398,7 @@ public Builder addBlocks(IndexMetadata indexMetadata) {
if (IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.get(indexMetadata.getSettings())) {
addIndexBlock(indexName, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
}
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
if (indexMetadata.isRemoteSnapshot()) {
addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE);
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -683,6 +684,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final ActiveShardCount waitForActiveShards;
private final Map<String, RolloverInfo> rolloverInfos;
private final boolean isSystem;
private final boolean isRemoteSnapshot;

private IndexMetadata(
final Index index,
Expand Down Expand Up @@ -743,6 +745,7 @@ private IndexMetadata(
this.waitForActiveShards = waitForActiveShards;
this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos);
this.isSystem = isSystem;
this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

Expand Down Expand Up @@ -1204,6 +1207,10 @@ public boolean isSystem() {
return isSystem;
}

public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
}

public static Builder builder(String index) {
return new Builder(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.ResponseCollectorService;

Expand Down Expand Up @@ -242,9 +241,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
final Set<ShardIterator> set = new HashSet<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
if (IndexModule.Type.REMOTE_SNAPSHOT.match(
indexMetadataForShard.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
) && (preference == null || preference.isEmpty())) {
if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) {
preference = Preference.PRIMARY.type();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
Expand Down Expand Up @@ -60,10 +58,6 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

Expand All @@ -68,7 +69,6 @@
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -109,11 +109,13 @@ public class DiskThresholdDecider extends AllocationDecider {

private final DiskThresholdSettings diskThresholdSettings;
private final boolean enableForSingleDataNode;
private final FileCacheSettings fileCacheSettings;

public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) {
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
assert Version.CURRENT.major < 9 : "remove enable_for_single_data_node in 9";
this.enableForSingleDataNode = ENABLE_FOR_SINGLE_DATA_NODE.get(settings);
this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings);
}

/**
Expand Down Expand Up @@ -179,6 +181,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
// we don't need to check the ratio
if (dataToFileCacheSizeRatio <= 0.1f) {
return Decision.YES;
}

final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
.collect(Collectors.toList());
Expand All @@ -199,7 +207,6 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
final double dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
return allocation.decision(
Decision.NO,
Expand All @@ -208,6 +215,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
);
}
return Decision.YES;
} else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.NO;
}

Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.index.remote.RemoteStorePressureSettings;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -689,7 +689,7 @@ public void apply(Settings value, Settings current, Settings previous) {

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,

// Settings related to Remote Refresh Segment Pressure
RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
Expand Down
6 changes: 2 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,6 @@ public static IndexMergePolicy fromString(String text) {
private volatile TimeValue remoteTranslogUploadBufferInterval;
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private int remoteTranslogKeepExtraGen;
private Version extendedCompatibilitySnapshotVersion;

Expand Down Expand Up @@ -919,9 +918,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
if (isRemoteSnapshot() && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
extendedCompatibilitySnapshotVersion = SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
} else {
extendedCompatibilitySnapshotVersion = Version.CURRENT.minimumIndexCompatibilityVersion();
Expand Down Expand Up @@ -1278,7 +1276,7 @@ public String getRemoteStoreTranslogRepository() {
* Returns true if this is remote/searchable snapshot
*/
public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
return indexMetadata.isRemoteSnapshot();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
Expand Down Expand Up @@ -52,21 +51,6 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

/**
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.filecache;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Settings relate to file cache
*
* @opensearch.internal
*/
public class FileCacheSettings {
/**
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile double remoteDataRatio;

public FileCacheSettings(Settings settings, ClusterSettings clusterSettings) {
setRemoteDataRatio(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING, this::setRemoteDataRatio);
}

public void setRemoteDataRatio(double remoteDataRatio) {
this.remoteDataRatio = remoteDataRatio;
}

public double getRemoteDataRatio() {
return remoteDataRatio;
}
}
Loading

0 comments on commit 9a87624

Please sign in to comment.