Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Shallow Snapshot flows to support remote path type & hash algo #12988

Merged
merged 7 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -284,7 +286,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {

indexDocuments(client, indexName1, randomIntBetween(5, 10));
ensureGreen(indexName1);
validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(indexName1, PathType.FIXED);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1)));
Expand All @@ -301,7 +303,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
.get();
assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status());
ensureGreen(restoredIndexName1version1);
validatePathType(restoredIndexName1version1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(restoredIndexName1version1, PathType.FIXED);

client(clusterManagerNode).admin()
.cluster()
Expand All @@ -327,16 +329,50 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

// Validating that custom data has not changed for indexes which were created before the cluster setting got updated
validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(indexName1, PathType.FIXED);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

// Create Snapshot of index 2
String snapshotName2 = "test-restore-snapshot2";
snapshotInfo = createSnapshot(snapshotRepoName, snapshotName2, new ArrayList<>(List.of(indexName2)));
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertTrue(snapshotInfo.successfulShards() > 0);
assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());

// Update cluster settings to FIXED
client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED))
.get();

// Close index 2
assertAcked(client().admin().indices().prepareClose(indexName2));
restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(false)
.setIndices(indexName2)
.get();
assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status());
ensureGreen(indexName2);

// Validating that custom data has not changed for testindex2 which was created before the cluster setting got updated
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);
}

private void validatePathType(String index, PathType pathType, PathHashAlgorithm pathHashAlgorithm) {
private void validatePathType(String index, PathType pathType) {
validatePathType(index, pathType, null);
}

private void validatePathType(String index, PathType pathType, @Nullable PathHashAlgorithm pathHashAlgorithm) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
// Validate that the remote_store custom data is present in index metadata for the created index.
Map<String, String> remoteCustomData = state.metadata().index(index).getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assertNotNull(remoteCustomData);
assertEquals(pathType.name(), remoteCustomData.get(PathType.NAME));
assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME));
if (Objects.nonNull(pathHashAlgorithm)) {
assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME));
}
}

public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ public MetadataCreateIndexService(

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings())
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings(), minNodeVersionSupplier)
: null;
}

Expand Down Expand Up @@ -572,28 +573,23 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
* @param assertNullOldType flag to verify that the old remote store path type is null
*/
public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStorePathStrategyResolver != null) {
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingRemoteCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
Map<String, String> remoteCustomData = existingRemoteCustomData == null
? new HashMap<>()
: new HashMap<>(existingRemoteCustomData);
// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get();
String oldPathType = remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
String oldHashAlgorithm = remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
assert !assertNullOldType || (Objects.isNull(oldPathType) && Objects.isNull(oldHashAlgorithm));
logger.trace(
() -> new ParameterizedMessage(
"Added newPathStrategy={}, replaced oldPathType={} oldHashAlgorithm={}",
newPathStrategy,
oldPathType,
oldHashAlgorithm
)
);
tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData);
if (remoteStorePathStrategyResolver == null) {
return;
}
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert assertNullOldType == false || Objects.isNull(existingCustomData);

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get();
Map<String, String> remoteCustomData = new HashMap<>();
remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) {
remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
}
logger.trace(() -> new ParameterizedMessage("Added newStrategy={}, replaced oldStrategy={}", remoteCustomData, existingCustomData));
tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData);
}

private ClusterState applyCreateIndexRequestWithV1Templates(
Expand Down
19 changes: 13 additions & 6 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -764,6 +765,7 @@ public static IndexMergePolicy fromString(String text) {
private volatile String defaultSearchPipeline;
private final boolean widenIndexSortType;
private final boolean assignedOnRemoteNode;
private final RemoteStorePathStrategy remoteStorePathStrategy;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -988,6 +990,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
*/
widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0);
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
remoteStorePathStrategy = determineRemoteStorePathStrategy();

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
Expand Down Expand Up @@ -1908,15 +1911,19 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo
this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability;
}

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
private RemoteStorePathStrategy determineRemoteStorePathStrategy() {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
if (remoteCustomData != null
&& remoteCustomData.containsKey(PathType.NAME)
&& remoteCustomData.containsKey(PathHashAlgorithm.NAME)) {
assert remoteCustomData == null || remoteCustomData.containsKey(PathType.NAME);
if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) {
PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME));
PathHashAlgorithm pathHashAlgorithm = PathHashAlgorithm.parseString(remoteCustomData.get(PathHashAlgorithm.NAME));
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME);
PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null;
return new RemoteStorePathStrategy(pathType, hashAlgorithm);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
}
return new RemoteStorePathStrategy(PathType.FIXED);
}

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
return remoteStorePathStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.hash.FNV1a;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;

Expand Down Expand Up @@ -78,9 +83,10 @@
*/
@PublicApi(since = "2.14.0")
public enum PathType {
FIXED {
FIXED(0) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.isNull(hashAlgorithm) : "hashAlgorithm is expected to be null with fixed remote store path type";
// Hash algorithm is not used in FIXED path type
return pathInput.basePath()
.add(pathInput.indexUUID())
Expand All @@ -94,7 +100,7 @@
return false;
}
},
HASHED_PREFIX {
HASHED_PREFIX(1) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
Expand All @@ -112,6 +118,40 @@
}
};

private final int code;

PathType(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathType> CODE_TO_ENUM;

static {
PathType[] values = values();
Map<Integer, PathType> codeToStatus = new HashMap<>(values.length);
for (PathType value : values) {
int code = value.code;
if (codeToStatus.containsKey(code)) {
throw new IllegalStateException(
new ParameterizedMessage("{} has same code as {}", codeToStatus.get(code), value).getFormattedMessage()

Check warning on line 140 in server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java#L139-L140

Added lines #L139 - L140 were not covered by tests
);
}
codeToStatus.put(code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathType}.
*/
public static PathType fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

ashking94 marked this conversation as resolved.
Show resolved Hide resolved
/**
* This method generates the path for the given path input which constitutes multiple fields and characteristics
* of the data.
Expand All @@ -131,7 +171,7 @@
return generatePath(pathInput, hashAlgorithm);
}

abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm);
protected abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm);

abstract boolean requiresHashAlgorithm();

Expand All @@ -158,7 +198,7 @@
@PublicApi(since = "2.14.0")
public enum PathHashAlgorithm {

FNV_1A {
FNV_1A(0) {
@Override
long hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
Expand All @@ -167,6 +207,39 @@
}
};

private final int code;
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

PathHashAlgorithm(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;
static {
PathHashAlgorithm[] values = values();
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
for (PathHashAlgorithm value : values) {
int code = value.code;
if (codeToStatus.containsKey(code)) {
throw new IllegalStateException(
new ParameterizedMessage("{} has same code as {}", codeToStatus.get(code), value).getFormattedMessage()

Check warning on line 228 in server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java#L227-L228

Added lines #L227 - L228 were not covered by tests
);
}
codeToStatus.put(code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathHashAlgorithm}.
*/
public static PathHashAlgorithm fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

abstract long hash(PathInput pathInput);

public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -36,11 +37,21 @@
}

public RemoteStorePathStrategy(PathType type, PathHashAlgorithm hashAlgorithm) {
assert type.requiresHashAlgorithm() == false || Objects.nonNull(hashAlgorithm);
this.type = Objects.requireNonNull(type);
Objects.requireNonNull(type, "pathType can not be null");
if (isCompatible(type, hashAlgorithm) == false) {
throw new IllegalArgumentException(
new ParameterizedMessage("pathType={} pathHashAlgorithm={} are incompatible", type, hashAlgorithm).getFormattedMessage()

Check warning on line 43 in server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java#L42-L43

Added lines #L42 - L43 were not covered by tests
);
}
this.type = type;
this.hashAlgorithm = hashAlgorithm;
}

public static boolean isCompatible(PathType type, PathHashAlgorithm hashAlgorithm) {
return (type.requiresHashAlgorithm() == false && Objects.isNull(hashAlgorithm))
|| (type.requiresHashAlgorithm() && Objects.nonNull(hashAlgorithm));
}

public PathType getType() {
return type;
}
Expand All @@ -55,7 +66,7 @@
}

public BlobPath generatePath(PathInput pathInput) {
return type.generatePath(pathInput, hashAlgorithm);
return type.path(pathInput, hashAlgorithm);
}

/**
Expand Down
Loading
Loading