Skip to content

Commit

Permalink
upload translog ckp file as object metadata to translog tlog file
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 13, 2024
1 parent c65f9eb commit b07035e
Show file tree
Hide file tree
Showing 23 changed files with 305 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MapperService.MergeReason;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.remote.RemoteStoreCustomMetadataResolver;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStorePathStrategyResolver;
import org.opensearch.index.shard.IndexSettingProvider;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndexCreationException;
Expand Down Expand Up @@ -177,7 +178,7 @@ public class MetadataCreateIndexService {
private AwarenessReplicaBalance awarenessReplicaBalance;

@Nullable
private final RemoteStorePathStrategyResolver remoteStorePathStrategyResolver;
private final RemoteStoreCustomMetadataResolver remoteStoreCustomMetadataResolver;

public MetadataCreateIndexService(
final Settings settings,
Expand Down Expand Up @@ -212,8 +213,8 @@ public MetadataCreateIndexService(
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathStrategyResolver(remoteStoreSettings, minNodeVersionSupplier)
remoteStoreCustomMetadataResolver = isRemoteDataAttributePresent(settings)
? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier)
: null;
}

Expand Down Expand Up @@ -562,7 +563,7 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
tmpImdBuilder.setRoutingNumShards(routingNumShards);
tmpImdBuilder.settings(indexSettings);
tmpImdBuilder.system(isSystem);
addRemoteStorePathStrategyInCustomData(tmpImdBuilder, true);
addRemoteStoreCustomMetadata(tmpImdBuilder, true);

// Set up everything, now locally create the index to see that things are ok, and apply
IndexMetadata tempMetadata = tmpImdBuilder.build();
Expand All @@ -572,23 +573,28 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
}

/**
* Adds the remote store path type information in custom data of index metadata.
* Adds the 1) remote store path type 2) ckp as translog metadata information in custom data of index metadata.
*
* @param tmpImdBuilder index metadata builder.
* @param assertNullOldType flag to verify that the old remote store path type is null
*/
public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStorePathStrategyResolver == null) {
public void addRemoteStoreCustomMetadata(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStoreCustomMetadataResolver == null) {
return;
}
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert assertNullOldType == false || Objects.isNull(existingCustomData);

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get();
Map<String, String> remoteCustomData = new HashMap<>();

// Determine if the ckp would be stored as translog metadata
boolean isCkpAsTranslogMetadata = remoteStoreCustomMetadataResolver.isCkpAsTranslogMetadata();
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(isCkpAsTranslogMetadata));

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStoreCustomMetadataResolver.getPathStrategy();
remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) {
remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable,
oldMetadata.settings(),
logger
);
migrationImdUpdater.maybeUpdateRemoteStorePathStrategy(indexMetadataBuilder, index.getName());
migrationImdUpdater.maybeUpdateRemoteStoreCustomMetadata(indexMetadataBuilder, index.getName());
migrationImdUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, index.getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ default Map<Metric, Map<String, Long>> extendedStats() {
*/
default void reload(RepositoryMetadata repositoryMetadata) {}

/**
* Returns a boolean indicating if blobStore support object metadata upload
*/
default boolean isBlobMetadataSupported() {
return false;
}

/**
* Metrics for BlobStore interactions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA
)
)
);
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ public static IndexMergePolicy fromString(String text) {
private final boolean widenIndexSortType;
private final boolean assignedOnRemoteNode;
private final RemoteStorePathStrategy remoteStorePathStrategy;
private final boolean ckpAsTranslogMetadata;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -989,6 +990,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata);

ckpAsTranslogMetadata = RemoteStoreUtils.determineCkpAsTranslogMetadata(indexMetadata);

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));

Expand Down Expand Up @@ -1911,4 +1914,8 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo
public RemoteStorePathStrategy getRemoteStorePathStrategy() {
return remoteStorePathStrategy;
}

public boolean isCkpAsTranslogMetadata() {
return ckpAsTranslogMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategyDuringMigration;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStoreCustomMetadataDuringMigration;
import static org.opensearch.index.remote.RemoteStoreUtils.getRemoteStoreRepoName;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -118,7 +118,7 @@ private boolean needsRemoteIndexSettingsUpdate(
}

/**
* Updates the remote store path strategy metadata for the index when it is migrating to remote.
* Updates the remote store custom metadata for the index when it is migrating to remote.
* This is run during state change of each shard copy when the cluster is in `MIXED` mode and the direction of migration is `REMOTE_STORE`
* Should not interfere with docrep functionality even if the index is in docrep nodes since this metadata
* is not used anywhere in the docrep flow
Expand All @@ -127,20 +127,20 @@ private boolean needsRemoteIndexSettingsUpdate(
* @param indexMetadataBuilder Mutated {@link IndexMetadata.Builder} having the previous state updates
* @param index index name
*/
public void maybeUpdateRemoteStorePathStrategy(IndexMetadata.Builder indexMetadataBuilder, String index) {
if (indexHasRemotePathMetadata(indexMetadata) == false) {
logger.info("Adding remote store path strategy for index [{}] during migration", index);
public void maybeUpdateRemoteStoreCustomMetadata(IndexMetadata.Builder indexMetadataBuilder, String index) {
if (indexHasRemoteCustomMetadata(indexMetadata) == false) {
logger.info("Adding remote store custom data for index [{}] during migration", index);
indexMetadataBuilder.putCustom(
REMOTE_STORE_CUSTOM_KEY,
determineRemoteStorePathStrategyDuringMigration(clusterSettings, discoveryNodes)
determineRemoteStoreCustomMetadataDuringMigration(clusterSettings, discoveryNodes)
);
} else {
logger.debug("Index {} already has remote store path strategy", index);
logger.debug("Index {} already has remote store custom data", index);
}
}

public static boolean indexHasAllRemoteStoreRelatedMetadata(IndexMetadata indexMetadata) {
return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemotePathMetadata(indexMetadata);
return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemoteCustomMetadata(indexMetadata);
}

/**
Expand All @@ -167,9 +167,11 @@ public static boolean indexHasRemoteStoreSettings(Settings indexSettings) {
* @param indexMetadata Current index metadata
* @return <code>true</code> if all above conditions match. <code>false</code> otherwise
*/
public static boolean indexHasRemotePathMetadata(IndexMetadata indexMetadata) {
public static boolean indexHasRemoteCustomMetadata(IndexMetadata indexMetadata) {
Map<String, String> customMetadata = indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY);
return Objects.nonNull(customMetadata) && Objects.nonNull(customMetadata.get(PathType.NAME));
return Objects.nonNull(customMetadata)
&& Objects.nonNull(customMetadata.get(PathType.NAME))
&& Objects.nonNull(customMetadata.get(RemoteStoreEnums.CKP_AS_METADATA));
}

public static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, String segmentRepository, String translogRepository) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
* @opensearch.internal
*/
@ExperimentalApi
public class RemoteStorePathStrategyResolver {
public class RemoteStoreCustomMetadataResolver {

private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<Version> minNodeVersionSupplier;

public RemoteStorePathStrategyResolver(RemoteStoreSettings remoteStoreSettings, Supplier<Version> minNodeVersionSupplier) {
public RemoteStoreCustomMetadataResolver(RemoteStoreSettings remoteStoreSettings, Supplier<Version> minNodeVersionSupplier) {
this.remoteStoreSettings = remoteStoreSettings;
this.minNodeVersionSupplier = minNodeVersionSupplier;
}

public RemoteStorePathStrategy get() {
public RemoteStorePathStrategy getPathStrategy() {
PathType pathType;
PathHashAlgorithm pathHashAlgorithm;
// Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it.
Expand All @@ -41,4 +41,9 @@ public RemoteStorePathStrategy get() {
pathHashAlgorithm = pathType == PathType.FIXED ? null : remoteStoreSettings.getPathHashAlgorithm();
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

public boolean isCkpAsTranslogMetadata() {
return Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.isCkpAsTranslogMetadata();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
@ExperimentalApi
public class RemoteStoreEnums {

public static final String CKP_AS_METADATA = "ckp-as-metadata";

/**
* Categories of the data in Remote store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA;

/**
* Utils for remote store
Expand Down Expand Up @@ -181,25 +182,42 @@ public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMeta
return new RemoteStorePathStrategy(RemoteStoreEnums.PathType.FIXED);
}

/**
* Determines whether translog ckp upload as metadata allowed or not
*/
public static boolean determineCkpAsTranslogMetadata(IndexMetadata indexMetadata) {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA);
if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA)) {
return Boolean.parseBoolean(remoteCustomData.get(RemoteStoreEnums.CKP_AS_METADATA));
}
return false;
}

/**
* Generates the remote store path type information to be added to custom data of index metadata during migration
*
* @param clusterSettings Current Cluster settings from {@link ClusterState}
* @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state
* @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state
* @return {@link Map} to be added as custom data in index metadata
*/
public static Map<String, String> determineRemoteStorePathStrategyDuringMigration(
public static Map<String, String> determineRemoteStoreCustomMetadataDuringMigration(
Settings clusterSettings,
DiscoveryNodes discoveryNodes
) {
Map<String, String> remoteCustomData = new HashMap<>();
Version minNodeVersion = discoveryNodes.getMinNodeVersion();

boolean ckpAsMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0
&& CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.get(clusterSettings);
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(ckpAsMetadata));

RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0
? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings)
: RemoteStoreEnums.PathType.FIXED;
RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm = pathType == RemoteStoreEnums.PathType.FIXED
? null
: CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.get(clusterSettings);
Map<String, String> remoteCustomData = new HashMap<>();
remoteCustomData.put(RemoteStoreEnums.PathType.NAME, pathType.name());
if (Objects.nonNull(pathHashAlgorithm)) {
remoteCustomData.put(RemoteStoreEnums.PathHashAlgorithm.NAME, pathHashAlgorithm.name());
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4976,7 +4976,14 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings);
RemoteFsTranslog.cleanup(
repository,
shardId,
getThreadPool(),
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
indexSettings().isCkpAsTranslogMetadata()
);
}

/*
Expand All @@ -5001,7 +5008,8 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
logger,
shouldSeedRemoteStore()
shouldSeedRemoteStore(),
indexSettings().isCkpAsTranslogMetadata()
);
}

Expand Down
Loading

0 comments on commit b07035e

Please sign in to comment.