Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jun 3, 2024
1 parent 933933c commit a7fb428
Showing 1 changed file with 56 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,41 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;

import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;

import java.io.Closeable;
import java.io.IOException;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
* A Service which provides APIs to upload and download routing table from remote store.
*
Expand Down Expand Up @@ -80,108 +72,99 @@ public class RemoteRoutingTableService implements Closeable {
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
Settings settings,
ThreadPool threadPool) {
private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings, ThreadPool threadPool) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadPool;
}

public List<IndexRoutingTable> getChangedIndicesRouting( ClusterState previousClusterState,
ClusterState clusterState) {
Map<String, IndexRoutingTable> previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting();
List<IndexRoutingTable> changedIndicesRouting = new ArrayList<>();
for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) {
if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) {
changedIndicesRouting.add(indexRouting);
logger.info("changedIndicesRouting {}", indexRouting.prettyPrint());
}
}

return changedIndicesRouting;
}

private String getIndexRoutingFileName() {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
return String.join(DELIMITER, INDEX_ROUTING_FILE_PREFIX, RemoteStoreUtils.invertLong(System.currentTimeMillis()));

}

public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<RemoteIndexRoutingResult> latchedActionListener) {
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx+1);
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));
String blobFileName = uploadedFilename.substring(idx + 1);
BlobContainer blobContainer = blobStoreRepository.blobStore()
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));

return () -> readAsync(
blobContainer,
blobFileName,
threadPool.executor(ThreadPool.Names.GENERIC),
ActionListener.wrap(response -> latchedActionListener.onResponse(new RemoteIndexRoutingResult(index.getName(), response.readIndexRoutingTable(index))), latchedActionListener::onFailure)
ActionListener.wrap(
response -> latchedActionListener.onResponse(response.readIndexRoutingTable(index)),
latchedActionListener::onFailure
)
);
}

public void readAsync(BlobContainer blobContainer, String name, ExecutorService executorService, ActionListener<IndexRoutingTableInputStreamReader> listener) throws IOException {
public void readAsync(
BlobContainer blobContainer,
String name,
ExecutorService executorService,
ActionListener<IndexRoutingTableInputStreamReader> listener
) throws IOException {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name));
} catch (Exception e) {
logger.error("routing table download failed : ", e);
listener.onFailure(e);
}
});
}

public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, String path) {
public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, String path) throws IOException {
try {
return new IndexRoutingTableInputStreamReader(blobContainer.readBlob(path));
} catch (IOException e) {
logger.info("RoutingTable read failed with error: {}", e.toString());
throw new RemoteTransportException("Failed to read RemoteRoutingTable from Manifest with error ", e);
}
return null;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(List<String> updatedIndicesRouting, List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting) {
public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(
List<String> updatedIndicesRouting,
List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting
) {
return updatedIndicesRouting.stream().map(idx -> {
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream()
.filter(idx2 -> idx2.getIndexName().equals(idx))
.findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());
}

public static List<String> getIndicesRoutingDeleted(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) {
List<String> deletedIndicesRouting = new ArrayList<>();
for(IndexRoutingTable previousIndexRouting: previousRoutingTable.getIndicesRouting().values()) {
if(!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) {
// Latest Routing Table does not have entry for the index which means the index is deleted
deletedIndicesRouting.add(previousIndexRouting.getIndex().getName());
}
}
return deletedIndicesRouting;
}

public static List<String> getIndicesRoutingUpdated(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) {
List<String> updatedIndicesRouting = new ArrayList<>();
for(IndexRoutingTable currentIndicesRouting: currentRoutingTable.getIndicesRouting().values()) {
if(!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) {
// Latest Routing Table does not have entry for the index which means the index is created
updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName());
} else {
if(previousRoutingTable.getIndicesRouting().get(currentIndicesRouting.getIndex().getName()).equals(currentIndicesRouting)) {
// if the latest routing table has the same routing table as the previous routing table, then the index is not updated
continue;
}
updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName());
}
}
return updatedIndicesRouting;
public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER
);
}

@Override
Expand All @@ -201,23 +184,4 @@ public void start() {
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}

public static class RemoteIndexRoutingResult {
String indexName;
IndexRoutingTable indexRoutingTable;

public RemoteIndexRoutingResult(String indexName, IndexRoutingTable indexRoutingTable) {
this.indexName = indexName;
this.indexRoutingTable = indexRoutingTable;
}

public String getIndexName() {
return indexName;
}

public IndexRoutingTable getIndexRoutingTable() {
return indexRoutingTable;
}
}

}

0 comments on commit a7fb428

Please sign in to comment.