diff --git a/CHANGELOG.md b/CHANGELOG.md index 1524e9fbe4e9b..a748bc8e6f042 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466)) +- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394)) ### Dependencies - Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 04c97a2f41aaa..746cbb46aaaca 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1023,6 +1023,14 @@ public boolean isSegRepEnabled() { return ReplicationType.SEGMENT.equals(replicationType); } + public boolean isSegRepLocalEnabled() { + return isSegRepEnabled() && !isSegRepWithRemoteEnabled(); + } + + public boolean isSegRepWithRemoteEnabled() { + return isSegRepEnabled() && isRemoteStoreEnabled() && FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL); + } + /** * Returns if remote store is enabled for this index. */ diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 66d095878d123..4254586f3d70e 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,10 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) { + if (didRefresh + && shard.state() == IndexShardState.STARTED + && shard.getReplicationTracker().isPrimaryMode() + && !shard.indexSettings.isSegRepWithRemoteEnabled()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f2aa886aed374..a6225569d86b4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3546,9 +3546,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (isRemoteStoreEnabled()) { - internalRefreshListener.add(new RemoteStoreRefreshListener(this)); + internalRefreshListener.add( + new RemoteStoreRefreshListener( + this, + // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. + indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY + ) + ); } - if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { + + if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } /** diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 8672ba6c59a13..ac9c35aaee6b5 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -29,6 +29,8 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import java.io.IOException; import java.util.Collection; @@ -96,7 +98,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; - public RemoteStoreRefreshListener(IndexShard indexShard) { + private final SegmentReplicationCheckpointPublisher checkpointPublisher; + + public RemoteStoreRefreshListener(IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -111,6 +115,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { } } resetBackOffDelayIterator(); + this.checkpointPublisher = checkpointPublisher; } @Override @@ -151,6 +156,10 @@ private synchronized void syncSegments(boolean isRetry) { deleteStaleCommits(); } + // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can + // move. + ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); + String segmentInfoSnapshotFilename = null; try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); @@ -190,9 +199,11 @@ private synchronized void syncSegments(boolean isRetry) { .filter(file -> !localSegmentsPostRefresh.contains(file)) .collect(Collectors.toSet()) .forEach(localSegmentChecksumMap::remove); - OnSuccessfulSegmentsSync(); + onSuccessfulSegmentsSync(); final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + + checkpointPublisher.publish(indexShard, checkpoint); } else { shouldRetry = true; } @@ -229,7 +240,7 @@ private void beforeSegmentsSync(boolean isRetry) { } } - private void OnSuccessfulSegmentsSync() { + private void onSuccessfulSegmentsSync() { // Reset the backoffDelayIterator for the future failures resetBackOffDelayIterator(); // Cancel the scheduled cancellable retry if possible and set it to null diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 84848bb87d634..7ae18771ee0fa 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -24,6 +24,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -52,7 +53,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { indexDocs(1, numberOfDocs); indexShard.refresh("test"); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -316,7 +317,7 @@ private void mockIndexShardWithRetryAndScheduleRefresh( return indexShard.getEngine(); }).when(shard).getEngine(); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard); + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY); refreshListener.afterRefresh(false); }