From a7fb428458dcca0abb5c7b98ec80c958fd275379 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 3 Jun 2024 16:26:06 +0530 Subject: [PATCH] Address comments Signed-off-by: Arpit Bandejiya --- .../remote/RemoteRoutingTableService.java | 148 +++++++----------- 1 file changed, 56 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 517af479c6c24..a154f8746f322 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -10,28 +10,21 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.IndexInput; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; -import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; - -import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; -import org.opensearch.index.remote.RemoteStoreEnums; -import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; @@ -39,20 +32,19 @@ import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; import java.io.Closeable; import java.io.IOException; - -import java.io.InputStream; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; + +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; + /** * A Service which provides APIs to upload and download routing table from remote store. * @@ -80,108 +72,99 @@ public class RemoteRoutingTableService implements Closeable { private BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; - public RemoteRoutingTableService(Supplier repositoriesService, - Settings settings, - ThreadPool threadPool) { + private static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = + new DiffableUtils.NonDiffableValueSerializer() { + @Override + public void write(IndexRoutingTable value, StreamOutput out) throws IOException { + value.writeTo(out); + } + + @Override + public IndexRoutingTable read(StreamInput in, String key) throws IOException { + return IndexRoutingTable.readFrom(in); + } + }; + + public RemoteRoutingTableService(Supplier repositoriesService, Settings settings, ThreadPool threadPool) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; this.threadPool = threadPool; } - public List getChangedIndicesRouting( ClusterState previousClusterState, - ClusterState clusterState) { - Map previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting(); - List changedIndicesRouting = new ArrayList<>(); - for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) { - if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) { - changedIndicesRouting.add(indexRouting); - logger.info("changedIndicesRouting {}", indexRouting.prettyPrint()); - } - } - - return changedIndicesRouting; - } - private String getIndexRoutingFileName() { - return String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(System.currentTimeMillis()) - ); + return String.join(DELIMITER, INDEX_ROUTING_FILE_PREFIX, RemoteStoreUtils.invertLong(System.currentTimeMillis())); } public CheckedRunnable getAsyncIndexMetadataReadAction( String uploadedFilename, Index index, - LatchedActionListener latchedActionListener) { + LatchedActionListener latchedActionListener + ) { int idx = uploadedFilename.lastIndexOf("/"); - String blobFileName = uploadedFilename.substring(idx+1); - BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx))); + String blobFileName = uploadedFilename.substring(idx + 1); + BlobContainer blobContainer = blobStoreRepository.blobStore() + .blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx))); return () -> readAsync( blobContainer, blobFileName, threadPool.executor(ThreadPool.Names.GENERIC), - ActionListener.wrap(response -> latchedActionListener.onResponse(new RemoteIndexRoutingResult(index.getName(), response.readIndexRoutingTable(index))), latchedActionListener::onFailure) + ActionListener.wrap( + response -> latchedActionListener.onResponse(response.readIndexRoutingTable(index)), + latchedActionListener::onFailure + ) ); } - public void readAsync(BlobContainer blobContainer, String name, ExecutorService executorService, ActionListener listener) throws IOException { + public void readAsync( + BlobContainer blobContainer, + String name, + ExecutorService executorService, + ActionListener listener + ) throws IOException { executorService.execute(() -> { try { listener.onResponse(read(blobContainer, name)); } catch (Exception e) { - logger.error("routing table download failed : ", e); listener.onFailure(e); } }); } - public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, String path) { + public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, String path) throws IOException { try { return new IndexRoutingTableInputStreamReader(blobContainer.readBlob(path)); } catch (IOException e) { - logger.info("RoutingTable read failed with error: {}", e.toString()); + throw new RemoteTransportException("Failed to read RemoteRoutingTable from Manifest with error ", e); } return null; } - public List getUpdatedIndexRoutingTableMetadata(List updatedIndicesRouting, List allIndicesRouting) { + public List getUpdatedIndexRoutingTableMetadata( + List updatedIndicesRouting, + List allIndicesRouting + ) { return updatedIndicesRouting.stream().map(idx -> { - Optional uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst(); + Optional uploadedIndexMetadataOptional = allIndicesRouting.stream() + .filter(idx2 -> idx2.getIndexName().equals(idx)) + .findFirst(); assert uploadedIndexMetadataOptional.isPresent() == true; return uploadedIndexMetadataOptional.get(); }).collect(Collectors.toList()); } - public static List getIndicesRoutingDeleted(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) { - List deletedIndicesRouting = new ArrayList<>(); - for(IndexRoutingTable previousIndexRouting: previousRoutingTable.getIndicesRouting().values()) { - if(!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) { - // Latest Routing Table does not have entry for the index which means the index is deleted - deletedIndicesRouting.add(previousIndexRouting.getIndex().getName()); - } - } - return deletedIndicesRouting; - } - - public static List getIndicesRoutingUpdated(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) { - List updatedIndicesRouting = new ArrayList<>(); - for(IndexRoutingTable currentIndicesRouting: currentRoutingTable.getIndicesRouting().values()) { - if(!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) { - // Latest Routing Table does not have entry for the index which means the index is created - updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName()); - } else { - if(previousRoutingTable.getIndicesRouting().get(currentIndicesRouting.getIndex().getName()).equals(currentIndicesRouting)) { - // if the latest routing table has the same routing table as the previous routing table, then the index is not updated - continue; - } - updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName()); - } - } - return updatedIndicesRouting; + public static DiffableUtils.MapDiff> getIndicesRoutingMapDiff( + RoutingTable before, + RoutingTable after + ) { + return DiffableUtils.diff( + before.getIndicesRouting(), + after.getIndicesRouting(), + DiffableUtils.getStringKeySerializer(), + CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER + ); } @Override @@ -201,23 +184,4 @@ public void start() { assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; } - - public static class RemoteIndexRoutingResult { - String indexName; - IndexRoutingTable indexRoutingTable; - - public RemoteIndexRoutingResult(String indexName, IndexRoutingTable indexRoutingTable) { - this.indexName = indexName; - this.indexRoutingTable = indexRoutingTable; - } - - public String getIndexName() { - return indexName; - } - - public IndexRoutingTable getIndexRoutingTable() { - return indexRoutingTable; - } - } - }