From 89a10ace1e2a97de922eee39074cd90ff933d72a Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Thu, 23 May 2024 13:10:06 +0530 Subject: [PATCH] Async write flow for routing table --- .../remote/RemoteRoutingTableService.java | 187 ++++++++++++------ .../remote/ClusterMetadataManifest.java | 33 +++- .../remote/RemoteClusterStateService.java | 69 ++++--- .../remote/RemoteClusterStateUtils.java | 6 +- .../index/remote/RemoteStoreEnums.java | 30 +-- .../index/remote/RemoteStorePathStrategy.java | 9 +- 6 files changed, 223 insertions(+), 111 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 858583d2ddcdd..499296a15e6b6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -10,41 +10,39 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.Version; +import org.apache.lucene.store.IndexInput; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; -import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.Index; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream; import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.opensearch.threadpool.ThreadPool; -import java.io.*; import java.io.Closeable; import java.io.IOException; @@ -53,11 +51,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -78,100 +78,157 @@ public class RemoteRoutingTableService implements Closeable { Setting.Property.Final ); public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; + public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; public static final String DELIMITER = "__"; - + public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; - private final ClusterSettings clusterSettings; private BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; public RemoteRoutingTableService(Supplier repositoriesService, Settings settings, - ClusterSettings clusterSettings, ThreadPool threadPool) { + ThreadPool threadPool) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; - this.clusterSettings = clusterSettings; this.threadPool = threadPool; } - public List writeFullRoutingTable(ClusterState clusterState, String previousClusterUUID) { + public List getChangedIndicesRouting( ClusterState previousClusterState, + ClusterState clusterState) { + Map previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting(); + List changedIndicesRouting = new ArrayList<>(); + for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) { + if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) { + changedIndicesRouting.add(indexRouting); + logger.info("changedIndicesRouting {}", indexRouting.prettyPrint()); + } + } + return changedIndicesRouting; + } + + public CheckedRunnable getIndexRoutingAsyncAction( + ClusterState clusterState, + IndexRoutingTable indexRouting, + LatchedActionListener latchedActionListener + ) throws IOException { - //batch index count and parallelize - RoutingTable currentRoutingTable = clusterState.getRoutingTable(); - List uploadedIndices = new ArrayList<>(); BlobPath custerMetadataBasePath = getCusterMetadataBasePath(blobStoreRepository, clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID()); - for (IndexRoutingTable indexRouting : currentRoutingTable.getIndicesRouting().values()) { - uploadedIndices.add(uploadIndex(indexRouting, custerMetadataBasePath)); + clusterState.metadata().clusterUUID()).add(INDEX_ROUTING_PATH_TOKEN); + logger.info("custerMetadataBasePath {}", custerMetadataBasePath); + + BlobPath path = RemoteStoreEnums.PathType.HASHED_PREFIX.path(RemoteStorePathStrategy.PathInput.builder() + .basePath(custerMetadataBasePath) + .indexUUID(indexRouting.getIndex().getUUID()) + .build(), + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64); + logger.info("path from prefix hasd {}", path); + final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path); + + final String fileName = getIndexRoutingFileName(); + logger.info("fileName {}", fileName); + + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse( + new ClusterMetadataManifest.UploadedIndexMetadata( + + indexRouting.getIndex().getName(), + indexRouting.getIndex().getUUID(), + path.buildAsString() + fileName, + INDEX_ROUTING_METADATA_PREFIX + ) + ), + ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(indexRouting.getIndex().toString(), ex)) + ); + + if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { + logger.info("TRYING FILE UPLOAD"); + + return () -> { + logger.info("Going to upload {}", indexRouting.prettyPrint()); + + uploadIndex(indexRouting, fileName , blobContainer); + logger.info("upload done {}", indexRouting.prettyPrint()); + + completionListener.onResponse(null); + logger.info("response done {}", indexRouting.prettyPrint()); + + }; } - logger.info("uploadedIndices {}", uploadedIndices); - return uploadedIndices; + logger.info("TRYING S3 UPLOAD"); + + //TODO: Integrate with S3AsyncCrtClient for using buffered stream directly with putObject. + try ( + InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting); + IndexInput input = new ByteArrayIndexInput("indexrouting", indexRoutingStream.readAllBytes())) { + long expectedChecksum; + try { + expectedChecksum = checksumOfChecksum(input.clone(), 8); + } catch (Exception e) { + throw e; + } + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + fileName, + fileName, + input.length(), + true, + WritePriority.URGENT, + (size, position) -> new OffsetRangeIndexInputStream(input, size, position), + expectedChecksum, + ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported() + ) + ) { + return () -> ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), completionListener); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } } - public List writeIncrementalRoutingTable( - ClusterState previousClusterState, - ClusterState clusterState, - ClusterMetadataManifest previousManifest) { + public List getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List indicesRoutingToUpload, Set indicesRoutingToDelete) { final Map allUploadedIndicesRouting = previousManifest.getIndicesRouting() .stream() .collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity())); + + indicesRoutingToUpload.forEach( + uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting) + ); + + indicesRoutingToDelete.forEach(index -> allUploadedIndicesRouting.remove(index)); + logger.info("allUploadedIndicesRouting ROUTING {}", allUploadedIndicesRouting); - Map previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting(); - List uploadedIndices = new ArrayList<>(); - BlobPath custerMetadataBasePath = getCusterMetadataBasePath(blobStoreRepository, clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID()); - for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) { - if (previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName()))) { - logger.info("index exists {}", indexRouting.getIndex().getName()); - //existing index with no shard change. - uploadedIndices.add(allUploadedIndicesRouting.get(indexRouting.getIndex().getName())); - } else { - // new index or shards changed, in both cases we upload new index file. - uploadedIndices.add(uploadIndex(indexRouting, custerMetadataBasePath)); - } - } - return uploadedIndices; + return new ArrayList<>(allUploadedIndicesRouting.values()); } - private ClusterMetadataManifest.UploadedIndexMetadata uploadIndex(IndexRoutingTable indexRouting, BlobPath custerMetadataBasePath) { + private void uploadIndex(IndexRoutingTable indexRouting, String fileName, BlobContainer container) { + logger.info("Starting write"); + try { InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting); - BlobContainer container = blobStoreRepository.blobStore().blobContainer(custerMetadataBasePath.add(INDEX_ROUTING_PATH_TOKEN).add(indexRouting.getIndex().getUUID())); - String indexRoutingFileName = getIndexRoutingFileName(); - container.writeBlob(indexRoutingFileName, indexRoutingStream, 4096, true); - return new ClusterMetadataManifest.UploadedIndexMetadata(indexRouting.getIndex().getName(), indexRouting.getIndex().getUUID(), container.path().buildAsString() + indexRoutingFileName); - + container.writeBlob(fileName, indexRoutingStream, 4096, true); + logger.info("SUccessful write"); } catch (IOException e) { logger.error("Failed to write {}", e); } - logger.info("SUccessful write"); - return null; } private String getIndexRoutingFileName() { return String.join( DELIMITER, - //RemoteStoreUtils.invertLong(indexMetadata.getVersion()), - RemoteStoreUtils.invertLong(System.currentTimeMillis()), - String.valueOf("CODEC1") // Keep the codec version at last place only, during read we reads last - // place to determine codec version. + INDEX_ROUTING_FILE_PREFIX, + RemoteStoreUtils.invertLong(System.currentTimeMillis()) ); } - public RoutingTable getLatestRoutingTable(String clusterName, String clusterUUID) { - return null; - } - public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState, ClusterMetadataManifest previousManifest, String clusterName, String clusterUUID) { - return null; - } public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState, ClusterMetadataManifest manifest){ List indicesRoutingDeleted = manifest.getDiffManifest().getIndicesRoutingDeleted(); @@ -232,9 +289,10 @@ public CheckedRunnable getAsyncIndexMetadataReadAction( String uploadedFilename, Index index, LatchedActionListener latchedActionListener) { - BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(blobStoreRepository, clusterName, clusterUUID).add(INDEX_ROUTING_PATH_TOKEN).add(index.getUUID())); - String[] fileNameTokens = uploadedFilename.split("/"); - String blobFileName = fileNameTokens[fileNameTokens.length -1]; + int idx = uploadedFilename.lastIndexOf("/"); + String blobFileName = uploadedFilename.substring(idx+1); + BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx))); + return () -> readAsync( blobContainer, blobFileName, @@ -262,8 +320,6 @@ public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, Stri } return null; } - private void deleteStaleRoutingTable(String clusterName, String clusterUUID, int manifestsToRetain) { - } @Override public void close() throws IOException { @@ -300,4 +356,5 @@ public IndexRoutingTable getIndexRoutingTable() { return indexRoutingTable; } } + } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 1ace3a7b3b1f5..00b58335b640e 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -947,6 +947,7 @@ public static class UploadedIndexMetadata implements UploadedMetadata, Writeable private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid"); private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); + private static final ParseField COMPONENT_PREFIX_FIELD = new ParseField("component_prefix"); private static String indexName(Object[] fields) { return (String) fields[0]; @@ -960,23 +961,35 @@ private static String uploadedFilename(Object[] fields) { return (String) fields[2]; } + private static String componentPrefix(Object[] fields) { + return (String) fields[3]; + } + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "uploaded_index_metadata", - fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)) + fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields), componentPrefix(fields)) ); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPONENT_PREFIX_FIELD); } static final String COMPONENT_PREFIX = "index--"; + private final String componentPrefix; private final String indexName; private final String indexUUID; private final String uploadedFilename; public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) { + this( indexName,indexUUID,uploadedFileName, COMPONENT_PREFIX); + } + + public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName, String componentPrefix) { + logger.info("creating UploadedIndexMetadata {}", componentPrefix); + this.componentPrefix = componentPrefix; this.indexName = indexName; this.indexUUID = indexUUID; this.uploadedFilename = uploadedFileName; @@ -986,6 +999,7 @@ public UploadedIndexMetadata(StreamInput in) throws IOException { this.indexName = in.readString(); this.indexUUID = in.readString(); this.uploadedFilename = in.readString(); + this.componentPrefix = in.readString(); } public String getUploadedFilePath() { @@ -994,7 +1008,7 @@ public String getUploadedFilePath() { @Override public String getComponent() { - return COMPONENT_PREFIX + getIndexName(); + return componentPrefix + getIndexName(); } public String getUploadedFilename() { @@ -1010,12 +1024,18 @@ public String getIndexUUID() { return indexUUID; } + public String getComponentPrefix() { + return componentPrefix; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()); + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()) + .field(COMPONENT_PREFIX_FIELD.getPreferredName(), getComponentPrefix()); + } @Override @@ -1023,6 +1043,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(indexName); out.writeString(indexUUID); out.writeString(uploadedFilename); + out.writeString(componentPrefix); } @Override @@ -1036,12 +1057,14 @@ public boolean equals(Object o) { final UploadedIndexMetadata that = (UploadedIndexMetadata) o; return Objects.equals(indexName, that.indexName) && Objects.equals(indexUUID, that.indexUUID) - && Objects.equals(uploadedFilename, that.uploadedFilename); + && Objects.equals(uploadedFilename, that.uploadedFilename) + && Objects.equals(componentPrefix, that.componentPrefix); + } @Override public int hashCode() { - return Objects.hash(indexName, indexUUID, uploadedFilename); + return Objects.hash(indexName, indexUUID, uploadedFilename, componentPrefix); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index c792a78d1477e..98717c3f7b9fe 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -60,6 +60,9 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -186,7 +189,7 @@ public RemoteClusterStateService( if(isRemoteRoutingTableEnabled(settings)) { this.remoteRoutingTableService = new RemoteRoutingTableService(repositoriesService, - settings, clusterSettings, threadPool); + settings, threadPool); logger.info("REMOTE ROUTING ENABLED"); } else { logger.info("REMOTE ROUTING DISABLED"); @@ -212,7 +215,6 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri return null; } - UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel( clusterState, new ArrayList<>(clusterState.metadata().indices().values()), @@ -222,14 +224,8 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri true, true, true, - true - ); - - List routingIndexMetadata = new ArrayList<>(); - if(remoteRoutingTableService!=null) { - routingIndexMetadata = remoteRoutingTableService.writeFullRoutingTable(clusterState, previousClusterUUID); - logger.info("routingIndexMetadata {}", routingIndexMetadata); - } + true, + new ArrayList<>(clusterState.getRoutingTable().indicesRouting().values())); final ClusterMetadataManifest manifest = remoteManifestManager.uploadManifest( clusterState, @@ -242,7 +238,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri uploadedMetadataResults.uploadedDiscoveryNodes, uploadedMetadataResults.uploadedClusterBlocks, new ClusterStateDiffManifest(clusterState, ClusterState.EMPTY_STATE), - routingIndexMetadata, + uploadedMetadataResults.uploadedIndicesRoutingMetadata, false ); @@ -326,6 +322,11 @@ public ClusterMetadataManifest writeIncrementalMetadata( // index present in current cluster state indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName()); } + + List indicesRoutingToUpload = new ArrayList<>(); + if(remoteRoutingTableService!=null) { + indicesRoutingToUpload = remoteRoutingTableService.getChangedIndicesRouting(previousClusterState, clusterState); + } UploadedMetadataResults uploadedMetadataResults; // For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files, // If file is empty and codec is 1 then write global metadata. @@ -362,14 +363,10 @@ public ClusterMetadataManifest writeIncrementalMetadata( updateSettingsMetadata, updateTemplatesMetadata, updateDiscoveryNodes, - updateClusterBlocks - ); + updateClusterBlocks, + indicesRoutingToUpload + ); - List routingIndexMetadata = new ArrayList<>(); - if(remoteRoutingTableService!=null) { - routingIndexMetadata = remoteRoutingTableService.writeIncrementalRoutingTable(previousClusterState, clusterState, previousManifest); - logger.info("routingIndexMetadata incremental {}", routingIndexMetadata); - } // update the map if the metadata was uploaded uploadedMetadataResults.uploadedIndexMetadata.forEach( @@ -380,6 +377,11 @@ public ClusterMetadataManifest writeIncrementalMetadata( customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); + List allUploadedIndicesRouting = new ArrayList<>(); + if(remoteRoutingTableService!=null) { + allUploadedIndicesRouting = remoteRoutingTableService.getAllUploadedIndicesRouting(previousManifest, uploadedMetadataResults.uploadedIndicesRoutingMetadata, indicesToBeDeletedFromRemote.keySet()); + } + final ClusterMetadataManifest manifest = remoteManifestManager.uploadManifest( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), @@ -393,8 +395,11 @@ public ClusterMetadataManifest writeIncrementalMetadata( firstUploadForSplitGlobalMetadata || updateDiscoveryNodes ? uploadedMetadataResults.uploadedDiscoveryNodes : previousManifest.getDiscoveryNodesMetadata(), firstUploadForSplitGlobalMetadata || updateClusterBlocks ? uploadedMetadataResults.uploadedClusterBlocks : previousManifest.getClusterBlocksMetadata(), new ClusterStateDiffManifest(clusterState, previousClusterState), - routingIndexMetadata, false + allUploadedIndicesRouting, false ); + + logger.info("MANIFEST IN INC STATE {}", manifest); + this.latestClusterName = clusterState.getClusterName().value(); this.latestClusterUUID = clusterState.metadata().clusterUUID(); @@ -459,11 +464,10 @@ private UploadedMetadataResults writeMetadataInParallel( boolean uploadSettingsMetadata, boolean uploadTemplateMetadata, boolean uploadDiscoveryNodes, - boolean uploadClusterBlock - ) throws IOException { - assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null"; + boolean uploadClusterBlock, + List indicesRoutingToUpload) throws IOException { int totalUploadTasks = indexToUpload.size() + customToUpload.size() + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata - ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0) + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0); + ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0) + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + indicesRoutingToUpload.size(); CountDownLatch latch = new CountDownLatch(totalUploadTasks); Map> uploadTasks = new HashMap<>(totalUploadTasks); Map results = new HashMap<>(totalUploadTasks); @@ -471,7 +475,7 @@ private UploadedMetadataResults writeMetadataInParallel( LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap((ClusterMetadataManifest.UploadedMetadata uploadedMetadata) -> { - logger.trace(String.format(Locale.ROOT, "Metadata component %s uploaded successfully.", uploadedMetadata.getComponent())); + logger.info(String.format(Locale.ROOT, "Metadata component %s uploaded successfully.", uploadedMetadata.getComponent())); results.put(uploadedMetadata.getComponent(), uploadedMetadata); }, ex -> { logger.error( @@ -563,6 +567,17 @@ private UploadedMetadataResults writeMetadataInParallel( ); }); + indicesRoutingToUpload.forEach(indexRoutingTable -> { + try { + uploadTasks.put( + indexRoutingTable.getIndex().getName() + "--indexRouting", + remoteRoutingTableService.getIndexRoutingAsyncAction(clusterState, indexRoutingTable, listener) + ); + } catch (IOException e) { + e.printStackTrace(); + } + }); + // start async upload of all required metadata files for (CheckedRunnable uploadTask : uploadTasks.values()) { uploadTask.run(); @@ -608,7 +623,10 @@ private UploadedMetadataResults writeMetadataInParallel( } UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { - if (name.contains(CUSTOM_METADATA)) { + if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) && + uploadedMetadata.getComponent().contains(RemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) { + response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); + } else if (name.contains(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- String custom = name.split(DELIMITER)[0].split(CUSTOM_DELIMITER)[1]; response.uploadedCustomMetadataMap.put( @@ -631,6 +649,7 @@ private UploadedMetadataResults writeMetadataInParallel( throw new IllegalStateException("Unexpected metadata component " + uploadedMetadata.getComponent()); } }); + logger.info("response {}", response.uploadedIndicesRoutingMetadata.toString()); return response; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java index abedd23c1e742..1574fc935058a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -80,6 +80,7 @@ public static class UploadedMetadataResults { ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata; ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes; ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks; + List uploadedIndicesRoutingMetadata; public UploadedMetadataResults( List uploadedIndexMetadata, @@ -88,7 +89,8 @@ public UploadedMetadataResults( ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata, ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata, ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes, - ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks + ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks, + List uploadedIndicesRoutingMetadata ) { this.uploadedIndexMetadata = uploadedIndexMetadata; this.uploadedCustomMetadataMap = uploadedCustomMetadataMap; @@ -97,6 +99,7 @@ public UploadedMetadataResults( this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; this.uploadedDiscoveryNodes = uploadedDiscoveryNodes; this.uploadedClusterBlocks = uploadedClusterBlocks; + this.uploadedIndicesRoutingMetadata = uploadedIndicesRoutingMetadata; } public UploadedMetadataResults() { @@ -107,6 +110,7 @@ public UploadedMetadataResults() { this.uploadedTemplatesMetadata = null; this.uploadedDiscoveryNodes = null; this.uploadedClusterBlocks = null; + this.uploadedIndicesRoutingMetadata = new ArrayList<>(); } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java index c1ac74724e405..78361079c9176 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java @@ -111,13 +111,20 @@ boolean requiresHashAlgorithm() { @Override public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null"; - return BlobPath.cleanPath() + BlobPath path = BlobPath.cleanPath() .add(hashAlgorithm.hash(pathInput)) .add(pathInput.basePath()) - .add(pathInput.indexUUID()) - .add(pathInput.shardId()) - .add(pathInput.dataCategory().getName()) - .add(pathInput.dataType().getName()); + .add(pathInput.indexUUID()); + if (pathInput.shardId() != null) { + path.add(pathInput.shardId()); + } + if(pathInput.dataCategory() != null){ + path.add(pathInput.dataCategory().getName()); + } + if(pathInput.dataType() != null ) { + path.add(pathInput.dataType().getName()); + } + return path; } @Override @@ -188,11 +195,11 @@ public static PathType fromCode(int code) { public BlobPath path(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { DataCategory dataCategory = pathInput.dataCategory(); DataType dataType = pathInput.dataType(); - assert dataCategory.isSupportedDataType(dataType) : "category:" - + dataCategory - + " type:" - + dataType - + " are not supported together"; +// assert dataCategory.isSupportedDataType(dataType) : "category:" +// + dataCategory +// + " type:" +// + dataType +// + " are not supported together"; return generatePath(pathInput, hashAlgorithm); } @@ -227,8 +234,7 @@ public enum PathHashAlgorithm { FNV_1A_BASE64(0) { @Override String hash(PathInput pathInput) { - String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType() - .getName(); + String input = pathInput.indexUUID() + pathInput.shardId() + (pathInput.dataCategory() != null ? pathInput.dataCategory().getName(): "" )+ (pathInput.dataType()!= null ? pathInput.dataType().getName(): ""); long hash = FNV1a.hash64(input); return longToUrlBase64(hash); } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java index c58f6c3faac84..b5f28e4b6fdee 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java @@ -87,9 +87,12 @@ public static class PathInput { public PathInput(BlobPath basePath, String indexUUID, String shardId, DataCategory dataCategory, DataType dataType) { this.basePath = Objects.requireNonNull(basePath); this.indexUUID = Objects.requireNonNull(indexUUID); - this.shardId = Objects.requireNonNull(shardId); - this.dataCategory = Objects.requireNonNull(dataCategory); - this.dataType = Objects.requireNonNull(dataType); +// this.shardId = Objects.requireNonNull(shardId); +// this.dataCategory = Objects.requireNonNull(dataCategory); +// this.dataType = Objects.requireNonNull(dataType); + this.shardId =(shardId); + this.dataCategory = (dataCategory); + this.dataType = (dataType); } BlobPath basePath() {