Skip to content

Commit

Permalink
Read write ephemeral objects for remote publication of cluster state (#…
Browse files Browse the repository at this point in the history
…14089)

* Read and write ephemeral objects for remote publication

Co-authored-by: Sooraj Sinha <[email protected]>
Co-authored-by: Arpit Bandejiya <[email protected]>

Signed-off-by: Shivansh Arora <[email protected]>

* Add serde logic for hashes of consistent settings

Signed-off-by: Sooraj Sinha <[email protected]>

* Add formattedName to readAsync for IndexMetadata

Signed-off-by: Shivansh Arora <[email protected]>

(cherry picked from commit 990ddc3)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Jun 11, 2024
1 parent c38dfef commit baff192
Show file tree
Hide file tree
Showing 29 changed files with 1,347 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* Information passed during repository cleanup
Expand Down Expand Up @@ -118,6 +119,24 @@ public Version getMinimalSupportedVersion() {
return LegacyESVersion.V_7_4_0;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;

Check warning on line 128 in server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java#L128

Added line #L128 was not covered by tests
}

RepositoryCleanupInProgress that = (RepositoryCleanupInProgress) o;
return entries.equals(that.entries);
}

@Override
public int hashCode() {
return 31 + entries.hashCode();

Check warning on line 137 in server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java#L137

Added line #L137 was not covered by tests
}

/**
* Entry in the collection.
*
Expand Down Expand Up @@ -155,6 +174,23 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(repositoryStateId);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;

Check warning on line 180 in server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java#L180

Added line #L180 was not covered by tests
}
if (o == null || getClass() != o.getClass()) {
return false;

Check warning on line 183 in server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java#L183

Added line #L183 was not covered by tests
}
RepositoryCleanupInProgress.Entry that = (RepositoryCleanupInProgress.Entry) o;

Check warning on line 185 in server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java#L185

Added line #L185 was not covered by tests
return repository.equals(that.repository) && repositoryStateId == that.repositoryStateId;
}

@Override
public int hashCode() {
return Objects.hash(repository, repositoryStateId);

Check warning on line 191 in server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java#L191

Added line #L191 was not covered by tests
}

@Override
public String toString() {
return "{" + repository + '}' + '{' + repositoryStateId + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,10 @@ public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata meta
return metadata1.templates.equals(metadata2.templates);
}

public static boolean isHashesOfConsistentSettingsEqual(Metadata metadata1, Metadata metadata2) {
return metadata1.hashesOfConsistentSettings.equals(metadata2.hashesOfConsistentSettings);
}

public static boolean isCustomMetadataEqual(Metadata metadata1, Metadata metadata2) {
int customCount1 = 0;
for (Map.Entry<String, Custom> cursor : metadata1.customs.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
// index to IndexRoutingTable map
private final Map<String, IndexRoutingTable> indicesRouting;

private RoutingTable(long version, final Map<String, IndexRoutingTable> indicesRouting) {
public RoutingTable(long version, final Map<String, IndexRoutingTable> indicesRouting) {
this.version = version;
this.indicesRouting = Collections.unmodifiableMap(indicesRouting);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
Expand All @@ -44,11 +45,14 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -97,11 +101,13 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen
private BlobStoreRepository blobStoreRepository;
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;
private ThreadPool threadPool;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings
ClusterSettings clusterSettings,
ThreadPool threadpool
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
Expand All @@ -110,6 +116,7 @@ public InternalRemoteRoutingTableService(
this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
this.threadPool = threadpool;
}

private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
Expand Down Expand Up @@ -266,6 +273,68 @@ private void uploadIndex(
}
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx + 1);
BlobContainer blobContainer = blobStoreRepository.blobStore()
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));

return () -> readAsync(
blobContainer,
blobFileName,
index,
threadPool.executor(ThreadPool.Names.REMOTE_STATE_READ),
ActionListener.wrap(
response -> latchedActionListener.onResponse(response.getIndexRoutingTable()),
latchedActionListener::onFailure
)
);
}

private void readAsync(
BlobContainer blobContainer,
String name,
Index index,
ExecutorService executorService,
ActionListener<RemoteIndexRoutingTable> listener
) {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name, index));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
try {
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
} catch (IOException | AssertionError e) {
logger.error(() -> new ParameterizedMessage("RoutingTable read failed for path {}", path), e);
throw new RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
}
}

@Override
public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(
List<String> updatedIndicesRouting,
List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting
) {
return updatedIndicesRouting.stream().map(idx -> {
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream()
.filter(idx2 -> idx2.getIndexName().equals(idx))
.findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());
}

private String getIndexRoutingFileName(long term, long version) {
return String.join(
DELIMITER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.IOException;
Expand Down Expand Up @@ -57,6 +58,26 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingUploaded,
List<String> indicesRoutingToDelete
) {
// noop
return List.of();
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
// noop
return () -> {};

Check warning on line 72 in server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java#L72

Added line #L72 was not covered by tests
}

@Override
public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(
List<String> updatedIndicesRouting,
List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting
) {
// noop
return List.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Interface for RemoteRoutingTableService. Exposes methods to orchestrate upload and download of routing table from remote store.
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/
public interface RemoteRoutingTableService extends LifecycleComponent {
public static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
Expand All @@ -43,6 +46,17 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {

List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable);

CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
);

List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(
List<String> updatedIndicesRouting,
List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting
);

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(
RoutingTable before,
RoutingTable after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.threadpool.ThreadPool;

import java.util.function.Supplier;

Expand All @@ -26,15 +27,17 @@ public class RemoteRoutingTableServiceFactory {
* @param repositoriesService repositoriesService
* @param settings settings
* @param clusterSettings clusterSettings
* @param threadPool threadPool
* @return RemoteRoutingTableService
*/
public static RemoteRoutingTableService getService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings
ClusterSettings clusterSettings,
ThreadPool threadPool
) {
if (isRemoteRoutingTableEnabled(settings)) {
return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings);
return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool);
}
return new NoopRemoteRoutingTableService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ public void apply(Settings value, Settings current, Settings previous) {
INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,

Expand Down
Loading

0 comments on commit baff192

Please sign in to comment.