Skip to content

Commit

Permalink
minor fixes after rebase and testing
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Aug 14, 2023
1 parent 9efdaf6 commit 1680f3e
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;

public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy, PersistedState remotePersistedState) {
public CoordinationState(
DiscoveryNode localNode,
PersistedState persistedState,
ElectionStrategy electionStrategy,
PersistedState remotePersistedState
) {
this.localNode = localNode;

// persisted state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,9 @@ boolean publicationInProgress() {
protected void doStart() {
synchronized (mutex) {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy, remotePersistedStateSupplier.get()));
coordinationState.set(
new CoordinationState(getLocalNode(), persistedState, electionStrategy, remotePersistedStateSupplier.get())
);
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ private static String stateUUID(Object[] fields) {
);

static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> UploadedIndexMetadata.fromXContent(p), INDICES_FIELD);
PARSER.declareObjectArray(
ConstructingObjectParser.constructorArg(),
(p, c) -> UploadedIndexMetadata.fromXContent(p),
INDICES_FIELD
);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TERM_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD);
Expand Down Expand Up @@ -91,7 +95,13 @@ public String getStateUUID() {
return stateUUID;
}

public ClusterMetadataMarker(Map<String, UploadedIndexMetadata> indices, long term, long version, String clusterUUID, String stateUUID) {
public ClusterMetadataMarker(
Map<String, UploadedIndexMetadata> indices,
long term,
long version,
String clusterUUID,
String stateUUID
) {
this.indices = Collections.unmodifiableMap(indices);
this.term = term;
this.version = version;
Expand Down Expand Up @@ -141,7 +151,10 @@ public boolean equals(Object o) {
return false;
}
final ClusterMetadataMarker that = (ClusterMetadataMarker) o;
return Objects.equals(indices, that.indices) && term == that.term && version == that.version && Objects.equals(clusterUUID, that.clusterUUID)
return Objects.equals(indices, that.indices)
&& term == that.term
&& version == that.version
&& Objects.equals(clusterUUID, that.clusterUUID)
&& Objects.equals(stateUUID, that.stateUUID);
}

Expand All @@ -156,10 +169,15 @@ public static ClusterMetadataMarker fromXContent(XContentParser parser) throws I

private static Map<String, UploadedIndexMetadata> toMap(final Collection<UploadedIndexMetadata> uploadedIndexMetadataList) {
// use a linked hash map to preserve order
return uploadedIndexMetadataList.stream().collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity(), (left, right) -> {
assert left.getIndexName().equals(right.getIndexName()) : "expected [" + left.getIndexName() + "] to equal [" + right.getIndexName() + "]";
throw new IllegalStateException("duplicate index name [" + left.getIndexName() + "]");
}, LinkedHashMap::new));
return uploadedIndexMetadataList.stream()
.collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity(), (left, right) -> {
assert left.getIndexName().equals(right.getIndexName()) : "expected ["
+ left.getIndexName()
+ "] to equal ["
+ right.getIndexName()
+ "]";
throw new IllegalStateException("duplicate index name [" + left.getIndexName() + "]");
}, LinkedHashMap::new));
}

/**
Expand All @@ -175,7 +193,6 @@ public static class Builder {
private String clusterUUID;
private String stateUUID;


public void term(long term) {
this.term = term;
}
Expand Down Expand Up @@ -229,8 +246,10 @@ private static String uploadedFilename(Object[] fields) {
return (String) fields[2];
}

private static final ConstructingObjectParser<UploadedIndexMetadata, Void> PARSER = new ConstructingObjectParser<>("uploaded_index_metadata",
fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)));
private static final ConstructingObjectParser<UploadedIndexMetadata, Void> PARSER = new ConstructingObjectParser<>(
"uploaded_index_metadata",
fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields))
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD);
Expand Down Expand Up @@ -283,8 +302,9 @@ public boolean equals(Object o) {
return false;
}
final UploadedIndexMetadata that = (UploadedIndexMetadata) o;
return Objects.equals(indexName, that.indexName) && Objects.equals(indexUUID, that.indexUUID) && Objects.equals(uploadedFilename,
that.uploadedFilename);
return Objects.equals(indexName, that.indexName)
&& Objects.equals(indexUUID, that.indexUUID)
&& Objects.equals(uploadedFilename, that.uploadedFilename);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.cluster.store.ClusterMetadataMarker.UploadedIndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -70,14 +68,21 @@ public ClusterMetadataMarker writeFullMetadata(long currentTerm, ClusterState cl
}

final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>();
//todo parallel upload
// todo parallel upload
// any validations before/after upload ?
for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
final String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
indexMetadataKey);
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
final String indexMetadataKey = writeIndexMetadata(
clusterState.getClusterName().value(),
clusterState.getMetadata().clusterUUID(),
indexMetadata,
indexMetadataFileName(indexMetadata)
);
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataKey
);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
}
return uploadMarker(clusterState, allUploadedIndexMetadata);
Expand All @@ -101,7 +106,12 @@ private void setRepository() {
}
}

public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataMarker previousMarker) throws IOException {
public ClusterMetadataMarker writeIncrementalMetadata(
long currentTerm,
ClusterState previousClusterState,
ClusterState clusterState,
ClusterMetadataMarker previousMarker
) throws IOException {
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();
final Map<String, Long> indexMetadataVersionByName = new HashMap<>();
for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
Expand All @@ -110,18 +120,31 @@ public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterS

int numIndicesUpdated = 0;
int numIndicesUnchanged = 0;
final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>(previousMarker.getIndices());
final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>(
previousMarker.getIndices()
);
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
// Is it okay to use indexName as key ?
final Long previousVersion = indexMetadataVersionByName.get(indexMetadata.getIndex().getName());
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", indexMetadata.getIndex(), previousVersion,
indexMetadata.getVersion());
logger.trace(
"updating metadata for [{}], changing version from [{}] to [{}]",
indexMetadata.getIndex(),
previousVersion,
indexMetadata.getVersion()
);
numIndicesUpdated++;
String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
indexMetadataKey);
String indexMetadataKey = writeIndexMetadata(
clusterState.getClusterName().value(),
clusterState.getMetadata().clusterUUID(),
indexMetadata,
indexMetadataFileName(indexMetadata)
);
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataKey
);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
} else {
numIndicesUnchanged++;
Expand All @@ -136,53 +159,73 @@ public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterS
}

public ClusterState getLatestClusterState(String clusterUUID) {
//todo
// todo
return null;
}

//todo exception handling
public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata) throws IOException {
// todo exception handling
public ClusterMetadataMarker uploadMarker(
ClusterState clusterState,
Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata
) throws IOException {
synchronized (this) {
String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(),
ClusterMetadataMarker marker = new ClusterMetadataMarker(
uploadedIndexMetadata,
clusterState.term(),
clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID());
clusterState.stateUUID()
);
writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
return marker;
}
}

public String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata indexMetadata, String fileName) throws IOException {
public String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata indexMetadata, String fileName)
throws IOException {
BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, indexMetadata.getIndexUUID());
INDEX_METADATA_FORMAT.write(indexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor());
// returning full path
return indexMetadataContainer.path().buildAsString() + fileName;
}

public void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker marker, String fileName) throws IOException {
public void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker marker, String fileName)
throws IOException {
BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID);
RemoteClusterStateService.CLUSTER_METADATA_MARKER_FORMAT.write(marker, metadataMarkerContainer, fileName, blobStoreRepository.getCompressor());
RemoteClusterStateService.CLUSTER_METADATA_MARKER_FORMAT.write(
marker,
metadataMarkerContainer,
fileName,
blobStoreRepository.getCompressor()
);
}

private static String getMarkerFileName(long term, long version) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker
return String.join(DELIMITER,"marker", String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()));
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker
return String.join(
DELIMITER,
"marker",
String.valueOf(Long.MAX_VALUE - term),
String.valueOf(Long.MAX_VALUE - version),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis())
);
}


private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}

public BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
.blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("index").add(indexUUID));
.blobContainer(
blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("index").add(indexUUID)
);
}

public BlobContainer markerContainer(String clusterName, String clusterUUID) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker
return blobStoreRepository.blobStore()
.blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("marker"));
}
Expand All @@ -199,8 +242,6 @@ public ClusterMetadataMarker getLatestClusterMetadataMarker(String clusterUUID,

public ClusterMetadataMarker fetchRemoteClusterMetadataMarker(String filename, String clusterUUID, String clusterState) {
try {
// We would be creating a new repository for RemoteClusterState which would hold this logic. We would also not not need to have
// getNamedXContentRegistry method in blobStoreRepository
return RemoteClusterStateService.CLUSTER_METADATA_MARKER_FORMAT.read(
getMarkerBlobContainer(clusterUUID, clusterState),
filename,
Expand Down Expand Up @@ -231,9 +272,12 @@ public String getLatestMarkerFileName(String clusterUUID, String clusterState) {
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterUUID, String clusterName) throws IOException {
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
ClusterMetadataMarker clusterMetadataMarker = getLatestClusterMetadataMarker(clusterUUID, clusterName);
for (Map.Entry<String, UploadedIndexMetadata> entry: clusterMetadataMarker.getIndices().entrySet()) {
IndexMetadata indexMetadata = IndexMetadata.readFrom(new InputStreamStreamInput(
getMarkerBlobContainer(clusterUUID, clusterName).readBlob(entry.getValue().getUploadedFilename())));
for (Map.Entry<String, UploadedIndexMetadata> entry : clusterMetadataMarker.getIndices().entrySet()) {
IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.read(
blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath()),
entry.getValue().getUploadedFilename(),
blobStoreRepository.getNamedXContentRegistry()
);
remoteIndexMetadata.put(entry.getKey(), indexMetadata);
}
return remoteIndexMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
package org.opensearch.common.blobstore;

import org.opensearch.core.action.ActionListener;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.LatchedActionListener;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -44,8 +42,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

/**
* An interface for managing a repository of blob entries, where each blob entry is just a named group of bytes.
Expand Down Expand Up @@ -238,11 +234,8 @@ default void listBlobsByPrefixInSortedOrder(
}
}

default List<BlobMetadata> listBlobsByPrefixInSortedOrder(
String blobNamePrefix,
int limit,
BlobNameSortOrder blobNameSortOrder
) throws IOException {
default List<BlobMetadata> listBlobsByPrefixInSortedOrder(String blobNamePrefix, int limit, BlobNameSortOrder blobNameSortOrder)
throws IOException {
if (limit < 0) {
throw new IllegalArgumentException("limit should not be a negative value");
}
Expand Down
Loading

0 comments on commit 1680f3e

Please sign in to comment.