Skip to content

Commit

Permalink
Add read flow for remote routing
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jun 6, 2024
1 parent b9ca5a8 commit 768585e
Show file tree
Hide file tree
Showing 3 changed files with 482 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,39 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

Expand All @@ -31,19 +53,130 @@
*/
public class RemoteRoutingTableService extends AbstractLifecycleComponent {

/**
* Cluster setting to specify if routing table should be published to remote store
*/
public static final Setting<Boolean> REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting(
"cluster.remote_store.routing.enabled",
true,
Setting.Property.NodeScope,
Setting.Property.Final
);
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String ROUTING_TABLE = "routing-table";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};


public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
Settings settings, ThreadPool threadPool) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadPool;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingToUpload, Set<String> indicesRoutingToDelete) {
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting()
.stream()
.collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity()));

indicesRoutingToUpload.forEach(
uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting)
);

indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove);

logger.info("allUploadedIndicesRouting ROUTING {}", allUploadedIndicesRouting);

return new ArrayList<>(allUploadedIndicesRouting.values());
}


private String getIndexRoutingFileName() {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);

}

public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx+1);
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));

return () -> readAsync(
blobContainer,
blobFileName,
index,
threadPool.executor(ThreadPool.Names.GENERIC),
ActionListener.wrap(response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), latchedActionListener::onFailure)
);
}

public void readAsync(BlobContainer blobContainer, String name, Index index, ExecutorService executorService, ActionListener<RemoteIndexRoutingTable> listener) throws IOException {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name, index));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

public RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
try {
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
} catch (IOException | AssertionError e) {
logger.info("RoutingTable read failed with error: {}", e.toString());
throw new RemoteClusterStateService.RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
}
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(List<String> updatedIndicesRouting, List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting) {
return updatedIndicesRouting.stream().map(idx -> {
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());
}


public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER
);
}


@Override
protected void doClose() throws IOException {
public void doClose() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.DiffableStringMap;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedRunnable;
Expand All @@ -33,6 +38,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
Expand Down Expand Up @@ -816,6 +822,122 @@ private ClusterMetadataManifest uploadManifest(
}
}

private ClusterState readClusterStateInParallel(
ClusterState previousState,
ClusterMetadataManifest manifest,
String clusterName,
String clusterUUID,
String localNodeId,
List<UploadedIndexMetadata> indicesToRead,
Map<String, UploadedMetadataAttribute> customToRead,
boolean readCoordinationMetadata,
boolean readSettingsMetadata,
boolean readTransientSettingsMetadata,
boolean readTemplatesMetadata,
boolean readDiscoveryNodes,
boolean readClusterBlocks,
List<UploadedIndexMetadata> indicesRoutingToRead,
boolean readHashesOfConsistentSettings,
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead
) throws IOException {
int totalReadTasks =
indicesToRead.size() + customToRead.size() + indicesRoutingToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (
readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0)
+ clusterStateCustomToRead.size();
CountDownLatch latch = new CountDownLatch(totalReadTasks);
List<CheckedRunnable<IOException>> asyncMetadataReadActions = new ArrayList<>();
List<RemoteReadResult> readResults = Collections.synchronizedList(new ArrayList<>());
List<IndexRoutingTable> readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>());
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks));

LatchedActionListener<RemoteReadResult> listener = new LatchedActionListener<>(
ActionListener.wrap(
response -> {
logger.debug("Successfully read cluster state component from remote");
readResults.add(response);
},
ex -> {
logger.error("Failed to read cluster state from remote", ex);
exceptionList.add(ex);
}
),
latch
);

for (UploadedIndexMetadata indexMetadata : indicesToRead) {
asyncMetadataReadActions.add(
remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(
clusterUUID,
indexMetadata.getUploadedFilename(),
listener
)
);
}

LatchedActionListener<IndexRoutingTable> routingTableLatchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(
response -> {
logger.debug("Successfully read cluster state component from remote");
readIndexRoutingTableResults.add(response);
},
ex -> {
logger.error("Failed to read cluster state from remote", ex);
exceptionList.add(ex);
}
),
latch
);

for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) {
asyncMetadataReadActions.add(
remoteRoutingTableService.get().getAsyncIndexMetadataReadAction(
indexRouting.getUploadedFilename(),
new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()),
routingTableLatchedActionListener
)
);
}

for (CheckedRunnable<IOException> asyncMetadataReadAction : asyncMetadataReadActions) {
asyncMetadataReadAction.run();
}

try {
if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) {
RemoteStateTransferException exception = new RemoteStateTransferException(
"Timed out waiting to read cluster state from remote within timeout " + this.remoteStateReadTimeout
);
exceptionList.forEach(exception::addSuppressed);
throw exception;
}
} catch (InterruptedException e) {
exceptionList.forEach(e::addSuppressed);
RemoteStateTransferException ex = new RemoteStateTransferException("Interrupted while waiting to read cluster state from metadata");
Thread.currentThread().interrupt();
throw ex;
}

if (!exceptionList.isEmpty()) {
RemoteStateTransferException exception = new RemoteStateTransferException("Exception during reading cluster state from remote");
exceptionList.forEach(exception::addSuppressed);
throw exception;
}

ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState);
Map<String, IndexRoutingTable> indicesRouting = new HashMap<>(previousState.routingTable().getIndicesRouting());


readIndexRoutingTableResults.forEach(indexRoutingTable -> {
indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable);
});

return clusterStateBuilder.metadata(metadataBuilder)
.version(manifest.getStateVersion())
.stateUUID(manifest.getStateUUID())
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting))
.build();
}

private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
throws IOException {
AtomicReference<String> result = new AtomicReference<String>();
Expand Down
Loading

0 comments on commit 768585e

Please sign in to comment.