Skip to content

Commit

Permalink
Add logic for commit cluster state
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Aug 21, 2023
1 parent 3946eea commit 50d6db3
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ClusterMetadataMarker implements Writeable, ToXContentFragment {
private static final ParseField VERSION_FIELD = new ParseField("version");
private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid");
private static final ParseField COMMITTED_FIELD = new ParseField("committed");

private static Map<String, UploadedIndexMetadata> indices(Object[] fields) {
return (Map<String, UploadedIndexMetadata>) fields[0];
Expand All @@ -58,9 +59,20 @@ private static String stateUUID(Object[] fields) {
return (String) fields[4];
}

private static boolean committed(Object[] fields) {
return (boolean) fields[5];
}

private static final ConstructingObjectParser<ClusterMetadataMarker, Void> PARSER = new ConstructingObjectParser<>(
"cluster_metadata_marker",
fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields))
fields -> new ClusterMetadataMarker(
indices(fields),
term(fields),
version(fields),
clusterUUID(fields),
stateUUID(fields),
committed(fields)
)
);

static {
Expand All @@ -76,13 +88,15 @@ private static String stateUUID(Object[] fields) {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD);
}

private final Map<String, UploadedIndexMetadata> indices;
private final long term;
private final long version;
private final String clusterUUID;
private final String stateUUID;
private final boolean committed;

public Map<String, UploadedIndexMetadata> getIndices() {
return indices;
Expand All @@ -104,18 +118,24 @@ public String getStateUUID() {
return stateUUID;
}

public boolean isCommitted() {
return committed;
}

public ClusterMetadataMarker(
Map<String, UploadedIndexMetadata> indices,
long term,
long version,
String clusterUUID,
String stateUUID
String stateUUID,
boolean committed
) {
this.indices = Collections.unmodifiableMap(indices);
this.term = term;
this.version = version;
this.clusterUUID = clusterUUID;
this.stateUUID = stateUUID;
this.committed = committed;
}

public static Builder builder() {
Expand All @@ -134,7 +154,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(TERM_FIELD.getPreferredName(), getTerm())
.field(VERSION_FIELD.getPreferredName(), getVersion())
.field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID())
.field(STATE_UUID_FIELD.getPreferredName(), getStateUUID());
.field(STATE_UUID_FIELD.getPreferredName(), getStateUUID())
.field(COMMITTED_FIELD.getPreferredName(), isCommitted());
return builder;
}

Expand All @@ -145,6 +166,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(version);
out.writeString(clusterUUID);
out.writeString(stateUUID);
out.writeBoolean(committed);
}

@Override
Expand All @@ -160,12 +182,13 @@ public boolean equals(Object o) {
&& term == that.term
&& version == that.version
&& Objects.equals(clusterUUID, that.clusterUUID)
&& Objects.equals(stateUUID, that.stateUUID);
&& Objects.equals(stateUUID, that.stateUUID)
&& Objects.equals(committed, that.committed);
}

@Override
public int hashCode() {
return Objects.hash(indices, term, version, clusterUUID, stateUUID);
return Objects.hash(indices, term, version, clusterUUID, stateUUID, committed);
}

@Override
Expand All @@ -189,6 +212,7 @@ public static class Builder {
private long version;
private String clusterUUID;
private String stateUUID;
private boolean committed;

public Builder indices(Map<String, UploadedIndexMetadata> indices) {
this.indices = indices;
Expand All @@ -215,6 +239,11 @@ public Builder stateUUID(String stateUUID) {
return this;
}

public Builder committed(boolean committed) {
this.committed = committed;
return this;
}

public Map<String, UploadedIndexMetadata> getIndices() {
return indices;
}
Expand All @@ -224,7 +253,7 @@ public Builder() {
}

public ClusterMetadataMarker build() {
return new ClusterMetadataMarker(indices, term, version, clusterUUID, stateUUID);
return new ClusterMetadataMarker(indices, term, version, clusterUUID, stateUUID, committed);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -121,7 +120,7 @@ public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws
);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
}
return uploadMarker(clusterState, allUploadedIndexMetadata);
return uploadMarker(clusterState, allUploadedIndexMetadata, false);
}

/**
Expand Down Expand Up @@ -187,7 +186,19 @@ public ClusterMetadataMarker writeIncrementalMetadata(
for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
return uploadMarker(clusterState, allUploadedIndexMetadata);
return uploadMarker(clusterState, allUploadedIndexMetadata, false);
}

public ClusterMetadataMarker markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataMarker previousMarker)
throws IOException {
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
assert clusterState != null : "Last accepted cluster state is not set";
assert previousMarker != null : "Last cluster metadata marker is not set";
assert CLUSTER_REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
return uploadMarker(clusterState, previousMarker.getIndices(), true);
}

public ClusterState getLatestClusterState(String clusterUUID) {
Expand All @@ -209,7 +220,8 @@ void initializeRepository() {

private ClusterMetadataMarker uploadMarker(
ClusterState clusterState,
Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata
Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata,
boolean committed
) throws IOException {
synchronized (this) {
final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
Expand All @@ -218,7 +230,8 @@ private ClusterMetadataMarker uploadMarker(
clusterState.term(),
clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID()
clusterState.stateUUID(),
committed
);
writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
return marker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,12 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState) {

@Override
public void markLastAcceptedStateAsCommitted() {
// TODO
try {
final ClusterMetadataMarker committedMarker = remoteClusterStateService.markLastStateAsCommitted(lastAcceptedState, lastAcceptedMarker);
lastAcceptedMarker = committedMarker;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public void testXContent() throws IOException {
1L,
1L,
"test-cluster-uuid",
"test-state-uuid"
"test-state-uuid",
false
);
final XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
Expand Down
Loading

0 comments on commit 50d6db3

Please sign in to comment.