Skip to content

Commit

Permalink
Add implementation for remote store path types (#13103)
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
ashking94 authored Apr 9, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent d202d90 commit bad49ef
Showing 14 changed files with 623 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
@@ -56,6 +57,10 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
@@ -279,6 +284,11 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
String restoredIndexName1version1 = indexName1 + "-restored-1";
String restoredIndexName1version2 = indexName1 + "-restored-2";

client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED))
.get();
createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));
Client client = client();
Settings indexSettings = getIndexSettings(1, 0).build();
@@ -476,12 +486,15 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
}

void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
String indexUUID = client().admin().indices().prepareGetSettings(idx).get().getSetting(idx, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data");
Path segmentMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/metadata");
Path segmentDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/data");
Client client = client();
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA).buildAsString();
Path remoteTranslogMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA).buildAsString();
Path remoteTranslogDataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA).buildAsString();
Path segmentMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataPath = Path.of(remoteRepoPath + "/" + path);

try (
Stream<Path> translogMetadata = Files.list(remoteTranslogMetadataPath);
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
@@ -57,7 +58,11 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.comparesEqualTo;
@@ -182,13 +187,9 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");

String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles();
// Delete is async.
@@ -212,12 +213,8 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
@@ -231,12 +228,8 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
@@ -250,12 +243,9 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1)));
@@ -589,12 +579,8 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
flushAndRefresh(INDEX_NAME);

// 3. Delete data from remote segment store
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(segmentDataPath)) {
files.forEach(p -> {
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

@@ -22,7 +23,10 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
@@ -45,8 +49,10 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
assertEquals(1, response.getShards().length);

String indexName = response.getShards()[0].getShardRouting().index().getName();
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataRepoPath = location.resolve(shardPath);
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

logger.info("--> Verify that the segment files are same on local and repository eventually");
Original file line number Diff line number Diff line change
@@ -14,9 +14,14 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
@@ -27,6 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.LOCK_FILES;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.comparesEqualTo;
@@ -307,7 +314,21 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
SnapshotInfo snapshotInfo1 = createFullSnapshot(snapshotRepoName, "snap1");
SnapshotInfo snapshotInfo2 = createFullSnapshot(snapshotRepoName, "snap2");

String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME);
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(REMOTE_REPO_NAME);
BlobPath shardLevelBlobPath = getShardLevelBlobPath(
client(),
remoteStoreEnabledIndexName,
remoteStoreRepository.basePath(),
"0",
SEGMENTS,
LOCK_FILES
);
BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath);
String[] lockFiles;
try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) {
lockFiles = lockDirectory.listAll();
}
assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles);

// delete remote store index
@@ -320,7 +341,9 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
.get();
assertAcked(deleteSnapshotResponse);

lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME, indexUUID);
try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) {
lockFiles = lockDirectory.listAll();
}
assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles);
assertTrue(lockFiles[0].contains(snapshotInfo2.snapshotId().getUUID()));

Original file line number Diff line number Diff line change
@@ -79,6 +79,15 @@ public BlobPath add(String path) {
return new BlobPath(Collections.unmodifiableList(paths));
}

/**
* Add additional level of paths to the existing path and returns new {@link BlobPath} with the updated paths.
*/
public BlobPath add(Iterable<String> paths) {
List<String> updatedPaths = new ArrayList<>(this.paths);
paths.iterator().forEachRemaining(updatedPaths::add);
return new BlobPath(Collections.unmodifiableList(updatedPaths));
}

public String buildAsString() {
String p = String.join(SEPARATOR, paths);
if (p.isEmpty() || p.endsWith(SEPARATOR)) {
Original file line number Diff line number Diff line change
@@ -103,9 +103,27 @@ boolean requiresHashAlgorithm() {
HASHED_PREFIX(1) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
// throw new UnsupportedOperationException("Not implemented"); --> Not using this for unblocking couple of tests.
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return BlobPath.cleanPath()
.add(hashAlgorithm.hash(pathInput))
.add(pathInput.basePath())
.add(pathInput.indexUUID())
.add(pathInput.shardId())
.add(pathInput.dataCategory().getName())
.add(pathInput.dataType().getName());
}

@Override
boolean requiresHashAlgorithm() {
return true;
}
},
HASHED_INFIX(2) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return pathInput.basePath()
.add(hashAlgorithm.hash(pathInput))
.add(pathInput.indexUUID())
.add(pathInput.shardId())
.add(pathInput.dataCategory().getName())
@@ -200,10 +218,11 @@ public enum PathHashAlgorithm {

FNV_1A(0) {
@Override
long hash(PathInput pathInput) {
String hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
.getName();
return FNV1a.hash32(input);
long hash = FNV1a.hash64(input);
return RemoteStoreUtils.longToUrlBase64(hash);
}
};

@@ -218,6 +237,7 @@ public int getCode() {
}

private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;

static {
PathHashAlgorithm[] values = values();
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
@@ -240,7 +260,7 @@ public static PathHashAlgorithm fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

abstract long hash(PathInput pathInput);
abstract String hash(PathInput pathInput);

public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
try {
Original file line number Diff line number Diff line change
@@ -10,7 +10,9 @@

import org.opensearch.common.collect.Tuple;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -101,4 +103,17 @@ public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String
});
}

/**
* Converts an input hash which occupies 64 bits of space into Base64 (6 bits per character) String. This must not
* be changed as it is used for creating path for storing remote store data on the remote store.
* This converts the byte array to base 64 string. `/` is replaced with `_`, `+` is replaced with `-` and `=`
* which is padded at the last is also removed. These characters are either used as delimiter or special character
* requiring special handling in some vendors. The characters present in this base64 version are [A-Za-z0-9_-].
* This must not be changed as it is used for creating path for storing remote store data on the remote store.
*/
static String longToUrlBase64(long value) {
byte[] hashBytes = ByteBuffer.allocate(Long.BYTES).putLong(value).array();
String base64Str = Base64.getUrlEncoder().encodeToString(hashBytes);
return base64Str.substring(0, base64Str.length() - 1);
}
}
Loading

0 comments on commit bad49ef

Please sign in to comment.