Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async write flow for routing table #11

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,39 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;

import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

import java.io.*;
import java.io.Closeable;
import java.io.IOException;

Expand All @@ -53,11 +51,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

Expand All @@ -78,100 +78,157 @@ public class RemoteRoutingTableService implements Closeable {
Setting.Property.Final
);
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";

public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings, ThreadPool threadPool) {
ThreadPool threadPool) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> writeFullRoutingTable(ClusterState clusterState, String previousClusterUUID) {
public List<IndexRoutingTable> getChangedIndicesRouting( ClusterState previousClusterState,
ClusterState clusterState) {
Map<String, IndexRoutingTable> previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting();
List<IndexRoutingTable> changedIndicesRouting = new ArrayList<>();
for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) {
if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) {
changedIndicesRouting.add(indexRouting);
logger.info("changedIndicesRouting {}", indexRouting.prettyPrint());
}
}

return changedIndicesRouting;
}

public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) throws IOException {

//batch index count and parallelize
RoutingTable currentRoutingTable = clusterState.getRoutingTable();
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndices = new ArrayList<>();
BlobPath custerMetadataBasePath = getCusterMetadataBasePath(blobStoreRepository, clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID());
for (IndexRoutingTable indexRouting : currentRoutingTable.getIndicesRouting().values()) {
uploadedIndices.add(uploadIndex(indexRouting, custerMetadataBasePath));
clusterState.metadata().clusterUUID()).add(INDEX_ROUTING_PATH_TOKEN);
logger.info("custerMetadataBasePath {}", custerMetadataBasePath);

BlobPath path = RemoteStoreEnums.PathType.HASHED_PREFIX.path(RemoteStorePathStrategy.PathInput.builder()
.basePath(custerMetadataBasePath)
.indexUUID(indexRouting.getIndex().getUUID())
.build(),
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64);
logger.info("path from prefix hasd {}", path);
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);

final String fileName = getIndexRoutingFileName();
logger.info("fileName {}", fileName);

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new ClusterMetadataManifest.UploadedIndexMetadata(

indexRouting.getIndex().getName(),
indexRouting.getIndex().getUUID(),
path.buildAsString() + fileName,
INDEX_ROUTING_METADATA_PREFIX
)
),
ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(indexRouting.getIndex().toString(), ex))
);

if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
logger.info("TRYING FILE UPLOAD");

return () -> {
logger.info("Going to upload {}", indexRouting.prettyPrint());

uploadIndex(indexRouting, fileName , blobContainer);
logger.info("upload done {}", indexRouting.prettyPrint());

completionListener.onResponse(null);
logger.info("response done {}", indexRouting.prettyPrint());

};
}
logger.info("uploadedIndices {}", uploadedIndices);

return uploadedIndices;
logger.info("TRYING S3 UPLOAD");

//TODO: Integrate with S3AsyncCrtClient for using buffered stream directly with putObject.
try (
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting);
IndexInput input = new ByteArrayIndexInput("indexrouting", indexRoutingStream.readAllBytes())) {
long expectedChecksum;
try {
expectedChecksum = checksumOfChecksum(input.clone(), 8);
} catch (Exception e) {
throw e;
}
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
fileName,
input.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
)
) {
return () -> ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), completionListener);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> writeIncrementalRoutingTable(
ClusterState previousClusterState,
ClusterState clusterState,
ClusterMetadataManifest previousManifest) {

public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingToUpload, Set<String> indicesRoutingToDelete) {
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting()
.stream()
.collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity()));

indicesRoutingToUpload.forEach(
uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting)
);

indicesRoutingToDelete.forEach(index -> allUploadedIndicesRouting.remove(index));

logger.info("allUploadedIndicesRouting ROUTING {}", allUploadedIndicesRouting);

Map<String, IndexRoutingTable> previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting();
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndices = new ArrayList<>();
BlobPath custerMetadataBasePath = getCusterMetadataBasePath(blobStoreRepository, clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID());
for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) {
if (previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName()))) {
logger.info("index exists {}", indexRouting.getIndex().getName());
//existing index with no shard change.
uploadedIndices.add(allUploadedIndicesRouting.get(indexRouting.getIndex().getName()));
} else {
// new index or shards changed, in both cases we upload new index file.
uploadedIndices.add(uploadIndex(indexRouting, custerMetadataBasePath));
}
}
return uploadedIndices;
return new ArrayList<>(allUploadedIndicesRouting.values());
}

private ClusterMetadataManifest.UploadedIndexMetadata uploadIndex(IndexRoutingTable indexRouting, BlobPath custerMetadataBasePath) {
private void uploadIndex(IndexRoutingTable indexRouting, String fileName, BlobContainer container) {
logger.info("Starting write");

try {
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting);
BlobContainer container = blobStoreRepository.blobStore().blobContainer(custerMetadataBasePath.add(INDEX_ROUTING_PATH_TOKEN).add(indexRouting.getIndex().getUUID()));
String indexRoutingFileName = getIndexRoutingFileName();
container.writeBlob(indexRoutingFileName, indexRoutingStream, 4096, true);
return new ClusterMetadataManifest.UploadedIndexMetadata(indexRouting.getIndex().getName(), indexRouting.getIndex().getUUID(), container.path().buildAsString() + indexRoutingFileName);

container.writeBlob(fileName, indexRoutingStream, 4096, true);
logger.info("SUccessful write");
} catch (IOException e) {
logger.error("Failed to write {}", e);
}
logger.info("SUccessful write");
return null;
}

private String getIndexRoutingFileName() {
return String.join(
DELIMITER,
//RemoteStoreUtils.invertLong(indexMetadata.getVersion()),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf("CODEC1") // Keep the codec version at last place only, during read we reads last
// place to determine codec version.
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);

}
public RoutingTable getLatestRoutingTable(String clusterName, String clusterUUID) {
return null;
}

public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState, ClusterMetadataManifest previousManifest, String clusterName, String clusterUUID) {
return null;
}

public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState, ClusterMetadataManifest manifest){
List<String> indicesRoutingDeleted = manifest.getDiffManifest().getIndicesRoutingDeleted();
Expand Down Expand Up @@ -232,9 +289,10 @@ public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<RemoteIndexRoutingResult> latchedActionListener) {
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(blobStoreRepository, clusterName, clusterUUID).add(INDEX_ROUTING_PATH_TOKEN).add(index.getUUID()));
String[] fileNameTokens = uploadedFilename.split("/");
String blobFileName = fileNameTokens[fileNameTokens.length -1];
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx+1);
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));

return () -> readAsync(
blobContainer,
blobFileName,
Expand Down Expand Up @@ -262,8 +320,6 @@ public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, Stri
}
return null;
}
private void deleteStaleRoutingTable(String clusterName, String clusterUUID, int manifestsToRetain) {
}

@Override
public void close() throws IOException {
Expand Down Expand Up @@ -300,4 +356,5 @@ public IndexRoutingTable getIndexRoutingTable() {
return indexRoutingTable;
}
}

}
Loading
Loading