From 8432375013835e26f70e444ac1198684f9742cda Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sun, 14 Apr 2024 04:59:04 +0000 Subject: [PATCH] Fix flakiness with SegmentReplicationSuiteIT (#11977) * Fix SegmentReplicationSuiteIT This test fails because of a race during shard/node shutdown with node-node replication. Fixed by properly synchronizing creation of new replication events with cancellation and cancelling after shards are closed. Signed-off-by: Marc Handalian * Remove CopyState caching from OngoingSegmentReplications. This change removes the responsibility of caching CopyState inside of OngoingSegmentReplications. 1. CopyState was originally cached to prevent frequent disk reads while building segment metadata. This is now cached lower down in IndexShard and is not required here. 2. Change prepareForReplication method to return SegmentReplicationSourceHandler directly 3. Move responsibility of creating and clearing CopyState to the handler. Signed-off-by: Marc Handalian * Fix comment for afterIndexShardClosed method. Signed-off-by: Marc Handalian * Fix comment on beforeIndexShardClosed Signed-off-by: Marc Handalian * Remove unnecessary method from OngoingSegmentReplications Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian (cherry picked from commit e828c180b28fd96b3121e92613fc0879004e791f) Signed-off-by: github-actions[bot] --- .../SegmentReplicationSuiteIT.java | 3 +- .../replication/CheckpointInfoResponse.java | 6 + .../OngoingSegmentReplications.java | 185 ++++-------------- .../SegmentReplicationSourceHandler.java | 44 +++-- .../SegmentReplicationSourceService.java | 12 +- .../indices/replication/common/CopyState.java | 33 ++-- .../SegmentReplicationIndexShardTests.java | 16 +- .../OngoingSegmentReplicationsTests.java | 73 ++----- .../SegmentReplicationSourceHandlerTests.java | 26 +-- .../SegmentReplicationTargetServiceTests.java | 5 +- .../replication/common/CopyStateTests.java | 13 +- .../index/shard/IndexShardTestCase.java | 4 +- 12 files changed, 125 insertions(+), 295 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index 8c045c1560dd3..27b65432e0bac 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -16,7 +15,6 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9499") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { @@ -64,6 +62,7 @@ public void testDropRandomNodeDuringReplication() throws Exception { ensureYellow(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get(); internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java index 9fd3b7f3afb80..24b744bebc53d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -40,6 +40,12 @@ public CheckpointInfoResponse( this.infosBytes = infosBytes; } + public CheckpointInfoResponse(final ReplicationCheckpoint checkpoint, final byte[] infosBytes) { + this.checkpoint = checkpoint; + this.infosBytes = infosBytes; + this.metadataMap = checkpoint.getMetadataMap(); + } + public CheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new); diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 33967c0203516..6b99b3c0b0696 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -21,12 +21,10 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.CopyState; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,7 +34,6 @@ /** * Manages references to ongoing segrep events on a node. * Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication. - * CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments. * * @opensearch.internal */ @@ -45,7 +42,6 @@ class OngoingSegmentReplications { private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class); private final RecoverySettings recoverySettings; private final IndicesService indicesService; - private final Map copyStateMap; private final Map allocationIdToHandlers; /** @@ -57,46 +53,9 @@ class OngoingSegmentReplications { OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) { this.indicesService = indicesService; this.recoverySettings = recoverySettings; - this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap(); } - /* - Operations on the {@link #copyStateMap} member. - */ - - /** - * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key - * and returns the cached value if one is present. If the key is not present, a {@link CopyState} - * object is constructed and stored in the map before being returned. - */ - synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { - if (isInCopyStateMap(checkpoint)) { - final CopyState copyState = fetchFromCopyStateMap(checkpoint); - // we incref the copyState for every replica that is using this checkpoint. - // decref will happen when copy completes. - copyState.incRef(); - return copyState; - } else { - // From the checkpoint's shard ID, fetch the IndexShard - ShardId shardId = checkpoint.getShardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.id()); - // build the CopyState object and cache it before returning - final CopyState copyState = new CopyState(checkpoint, indexShard); - - /* - Use the checkpoint from the request as the key in the map, rather than - the checkpoint from the created CopyState. This maximizes cache hits - if replication targets make a request with an older checkpoint. - Replication targets are expected to fetch the checkpoint in the response - CopyState to bring themselves up to date. - */ - addToCopyStateMap(checkpoint, copyState); - return copyState; - } - } - /** * Start sending files to the replica. * @@ -114,12 +73,10 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId()); - if (sourceHandler != null) { - removeCopyState(sourceHandler.getCopyState()); - } - }); + final ActionListener wrappedListener = ActionListener.runBefore( + listener, + () -> allocationIdToHandlers.remove(request.getTargetAllocationId()) + ); handler.sendFiles(request, wrappedListener); } else { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); @@ -127,38 +84,32 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry"); - assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()) == false; - allocationIdToHandlers.put(request.getTargetAllocationId(), newHandler); - } - assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()); - return copyState; + SegmentReplicationSourceHandler prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) { + return allocationIdToHandlers.computeIfAbsent(request.getTargetAllocationId(), aId -> { + try { + // From the checkpoint's shard ID, fetch the IndexShard + final ShardId shardId = request.getCheckpoint().getShardId(); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + return new SegmentReplicationSourceHandler( + request.getTargetNode(), + fileChunkWriter, + indexShard, + request.getTargetAllocationId(), + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks() + ); + } catch (IOException e) { + throw new UncheckedIOException("Error creating replication handler", e); + } + }); } /** @@ -167,8 +118,8 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f * @param shard {@link IndexShard} * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(IndexShard shard, String reason) { - cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason); + void cancel(IndexShard shard, String reason) { + cancelHandlers(handler -> handler.shardId().equals(shard.shardId()), reason); } /** @@ -177,11 +128,10 @@ synchronized void cancel(IndexShard shard, String reason) { * @param allocationId {@link String} - Allocation ID. * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(String allocationId, String reason) { + void cancel(String allocationId, String reason) { final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId); if (handler != null) { handler.cancel(reason); - removeCopyState(handler.getCopyState()); } } @@ -194,14 +144,6 @@ void cancelReplication(DiscoveryNode node) { cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left"); } - /** - * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} - * as a key by invoking {@link Map#containsKey(Object)}. - */ - boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.containsKey(replicationCheckpoint); - } - int size() { return allocationIdToHandlers.size(); } @@ -211,58 +153,20 @@ Map getHandlers() { return allocationIdToHandlers; } - int cachedCopyStateSize() { - return copyStateMap.size(); - } - - private SegmentReplicationSourceHandler createTargetHandler( - DiscoveryNode node, - CopyState copyState, - String allocationId, - FileChunkWriter fileChunkWriter - ) { - return new SegmentReplicationSourceHandler( - node, - fileChunkWriter, - copyState.getShard().getThreadPool(), - copyState, - allocationId, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - recoverySettings.getMaxConcurrentFileChunks() - ); - } - /** - * Adds the input {@link CopyState} object to {@link #copyStateMap}. - * The key is the CopyState's {@link ReplicationCheckpoint} object. - */ - private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { - copyStateMap.putIfAbsent(checkpoint, copyState); - } - - /** - * Given a {@link ReplicationCheckpoint}, return the corresponding - * {@link CopyState} object, if any, from {@link #copyStateMap}. - */ - private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.get(replicationCheckpoint); - } - - /** - * Remove a CopyState. Intended to be called after a replication event completes. - * This method will remove a copyState from the copyStateMap only if its refCount hits 0. - * - * @param copyState {@link CopyState} + * Clear handlers for any allocationIds not in sync. + * @param shardId {@link ShardId} + * @param inSyncAllocationIds {@link List} of in-sync allocation Ids. */ - private synchronized void removeCopyState(CopyState copyState) { - if (copyState.decRef() == true) { - copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); - } + void clearOutOfSyncIds(ShardId shardId, Set inSyncAllocationIds) { + cancelHandlers( + (handler) -> handler.shardId().equals(shardId) && inSyncAllocationIds.contains(handler.getAllocationId()) == false, + "Shard is no longer in-sync with the primary" + ); } /** * Remove handlers from allocationIdToHandlers map based on a filter predicate. - * This will also decref the handler's CopyState reference. */ private void cancelHandlers(Predicate predicate, String reason) { final List allocationIds = allocationIdToHandlers.values() @@ -278,17 +182,4 @@ private void cancelHandlers(Predicate p cancel(allocationId, reason); } } - - /** - * Clear copystate and target handlers for any non insync allocationIds. - * @param shardId {@link ShardId} - * @param inSyncAllocationIds {@link List} of in-sync allocation Ids. - */ - public void clearOutOfSyncIds(ShardId shardId, Set inSyncAllocationIds) { - cancelHandlers( - (handler) -> handler.getCopyState().getShard().shardId().equals(shardId) - && inSyncAllocationIds.contains(handler.getAllocationId()) == false, - "Shard is no longer in-sync with the primary" - ); - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 674c09311c645..bb64d6b0c60b6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -18,16 +18,18 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; -import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -54,48 +56,46 @@ class SegmentReplicationSourceHandler { private final AtomicBoolean isReplicating = new AtomicBoolean(); private final DiscoveryNode targetNode; private final String allocationId; - private final FileChunkWriter writer; /** * Constructor. * - * @param targetNode - {@link DiscoveryNode} target node where files should be sent. + * @param targetNode {@link DiscoveryNode} target node where files should be sent. * @param writer {@link FileChunkWriter} implementation that sends file chunks over the transport layer. - * @param threadPool {@link ThreadPool} Thread pool. - * @param copyState {@link CopyState} CopyState holding segment file metadata. + * @param shard {@link IndexShard} The primary shard local to this node. * @param fileChunkSizeInBytes {@link Integer} * @param maxConcurrentFileChunks {@link Integer} */ SegmentReplicationSourceHandler( DiscoveryNode targetNode, FileChunkWriter writer, - ThreadPool threadPool, - CopyState copyState, + IndexShard shard, String allocationId, int fileChunkSizeInBytes, int maxConcurrentFileChunks - ) { + ) throws IOException { this.targetNode = targetNode; - this.shard = copyState.getShard(); + this.shard = shard; this.logger = Loggers.getLogger( SegmentReplicationSourceHandler.class, - copyState.getShard().shardId(), + shard.shardId(), "sending segments to " + targetNode.getName() ); this.segmentFileTransferHandler = new SegmentFileTransferHandler( - copyState.getShard(), + shard, targetNode, writer, logger, - threadPool, + shard.getThreadPool(), cancellableThreads, fileChunkSizeInBytes, maxConcurrentFileChunks ); this.allocationId = allocationId; - this.copyState = copyState; + this.copyState = new CopyState(shard); this.writer = writer; + resources.add(copyState); } /** @@ -109,6 +109,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene if (request.getFilesToFetch().isEmpty()) { // before completion, alert the primary of the replica's state. shard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint()); + IOUtils.closeWhileHandlingException(copyState); listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); return; } @@ -183,10 +184,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene public void cancel(String reason) { writer.cancel(); cancellableThreads.cancel(reason); - } - - CopyState getCopyState() { - return copyState; + IOUtils.closeWhileHandlingException(copyState); } public boolean isReplicating() { @@ -200,4 +198,16 @@ public DiscoveryNode getTargetNode() { public String getAllocationId() { return allocationId; } + + public ReplicationCheckpoint getCheckpoint() { + return copyState.getCheckpoint(); + } + + public byte[] getInfosBytes() { + return copyState.getInfosBytes(); + } + + public ShardId shardId() { + return shard.shardId(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index a393faabae0ea..ca89741d5bb55 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -29,7 +29,6 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; -import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -126,16 +125,17 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan new AtomicLong(0), (throttleTime) -> {} ); - final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); - channel.sendResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) + final SegmentReplicationSourceHandler handler = ongoingSegmentReplications.prepareForReplication( + request, + segmentSegmentFileChunkWriter ); + channel.sendResponse(new CheckpointInfoResponse(handler.getCheckpoint(), handler.getInfosBytes())); timer.stop(); logger.trace( new ParameterizedMessage( "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", request.getReplicationId(), - copyState.getCheckpoint(), + handler.getCheckpoint(), request.getTargetNode().getId(), timer.time() ) @@ -217,7 +217,7 @@ protected void doClose() throws IOException { /** * - * Cancels any replications on this node to a replica shard that is about to be closed. + * Before a primary shard is closed, cancel any ongoing replications to release incref'd segments. */ @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 3b7ae2af80ca0..7d3eb9083208b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -13,11 +13,11 @@ import org.apache.lucene.store.ByteBuffersIndexOutput; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; -import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; @@ -28,28 +28,21 @@ * * @opensearch.internal */ -public class CopyState extends AbstractRefCounted { +public class CopyState implements Closeable { private final GatedCloseable segmentInfosRef; - /** ReplicationCheckpoint requested */ - private final ReplicationCheckpoint requestedReplicationCheckpoint; /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; - private final Map metadataMap; private final byte[] infosBytes; private final IndexShard shard; - public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { - super("CopyState-" + shard.shardId()); - this.requestedReplicationCheckpoint = requestedReplicationCheckpoint; + public CopyState(IndexShard shard) throws IOException { this.shard = shard; final Tuple, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard .getLatestSegmentInfosAndCheckpoint(); this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1(); this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); - this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); - ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { @@ -58,21 +51,12 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.infosBytes = buffer.toArrayCopy(); } - @Override - protected void closeInternal() { - try { - segmentInfosRef.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - public ReplicationCheckpoint getCheckpoint() { return replicationCheckpoint; } public Map getMetadataMap() { - return metadataMap; + return replicationCheckpoint.getMetadataMap(); } public byte[] getInfosBytes() { @@ -83,7 +67,12 @@ public IndexShard getShard() { return shard; } - public ReplicationCheckpoint getRequestedReplicationCheckpoint() { - return requestedReplicationCheckpoint; + @Override + public void close() throws IOException { + try { + segmentInfosRef.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index e93d266dcab4c..2311fc582616f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -1019,25 +1019,15 @@ protected void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCo } protected void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { - final CopyState copyState; - try { - copyState = new CopyState( - ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), - primary + try (final CopyState copyState = new CopyState(primary)) { + listener.onResponse( + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState"); throw new UncheckedIOException(e); } - - try { - listener.onResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) - ); - } finally { - copyState.decRef(); - } } protected void startReplicationAndAssertCancellation( diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 44e2653cf01da..eb27850000bdd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -25,7 +25,6 @@ import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.transport.TransportService; import org.junit.Assert; @@ -106,25 +105,21 @@ public void testPrepareAndSendSegments() throws IOException { final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { listener.onResponse(null); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, copyState.refCount()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataMap().values()), + new ArrayList<>(handler.getCheckpoint().getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - assertEquals(copyState.getMetadataMap().size(), getSegmentFilesResponse.files.size()); - assertEquals(0, copyState.refCount()); - assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(handler.getCheckpoint().getMetadataMap().size(), getSegmentFilesResponse.files.size()); assertEquals(0, replications.size()); } @@ -148,14 +143,11 @@ public void testCancelReplication() throws IOException { // this shouldn't be called in this test. Assert.fail(); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); replications.cancelReplication(primaryDiscoveryNode); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); } public void testCancelReplication_AfterSendFilesStarts() throws IOException, InterruptedException { @@ -174,14 +166,13 @@ public void testCancelReplication_AfterSendFilesStarts() throws IOException, Int // cancel the replication as soon as the writer starts sending files. replications.cancel(replica.routingEntry().allocationId().getId(), "Test"); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataMap().values()), + new ArrayList<>(handler.getCheckpoint().getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @@ -193,9 +184,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { @Override public void onFailure(Exception e) { assertEquals(CancellableThreads.ExecutionCancelledException.class, e.getClass()); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); latch.countDown(); } }); @@ -219,8 +208,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { Assert.fail(); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, @@ -230,15 +218,11 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { ); replications.prepareForReplication(secondRequest, segmentSegmentFileChunkWriter); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); replications.cancelReplication(primaryDiscoveryNode); replications.cancelReplication(replicaDiscoveryNode); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(secondReplica); } @@ -280,8 +264,7 @@ public void testShardAlreadyReplicatingToNode() throws IOException { listener.onResponse(null); }; replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); } public void testStartReplicationWithNoFilesToFetch() throws IOException { @@ -296,10 +279,8 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { // mock the FileChunkWriter so we can assert its ever called. final FileChunkWriter segmentSegmentFileChunkWriter = mock(FileChunkWriter.class); // Prepare for replication step - and ensure copyState is added to cache. - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, copyState.refCount()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -314,8 +295,6 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { assertEquals(Collections.emptyList(), getSegmentFilesResponse.files); - assertEquals(0, copyState.refCount()); - assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); verifyNoInteractions(segmentSegmentFileChunkWriter); } @@ -340,8 +319,7 @@ public void testCancelAllReplicationsForShard() throws IOException { testCheckpoint ); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, @@ -351,15 +329,11 @@ public void testCancelAllReplicationsForShard() throws IOException { ); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); // cancel the primary's ongoing replications. replications.cancel(primary, "Test"); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(replica_2); } @@ -372,8 +346,7 @@ public void testCancelForMissingIds() throws IOException { final String replicaAllocationId = replica.routingEntry().allocationId().getId(); final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); final String replica_2AllocationId = replica_2.routingEntry().allocationId().getId(); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( @@ -384,23 +357,17 @@ public void testCancelForMissingIds() throws IOException { ); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); assertTrue(replications.getHandlers().containsKey(replicaAllocationId)); assertTrue(replications.getHandlers().containsKey(replica_2AllocationId)); - assertEquals(1, replications.cachedCopyStateSize()); replications.clearOutOfSyncIds(primary.shardId(), Set.of(replica_2AllocationId)); - assertEquals(1, copyState.refCount()); assertEquals(1, replications.size()); assertTrue(replications.getHandlers().containsKey(replica_2AllocationId)); - assertEquals(1, replications.cachedCopyStateSize()); // cancel the primary's ongoing replications. replications.clearOutOfSyncIds(primary.shardId(), Collections.emptySet()); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(replica_2); } @@ -409,11 +376,8 @@ public void testPrepareForReplicationAlreadyReplicating() throws IOException { final String replicaAllocationId = replica.routingEntry().allocationId().getId(); final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - - final SegmentReplicationSourceHandler handler = replications.getHandlers().get(replicaAllocationId); - assertEquals(handler.getCopyState(), copyState); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); + assertEquals(handler, replications.getHandlers().get(replicaAllocationId)); ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( testCheckpoint.getShardId(), @@ -430,11 +394,10 @@ public void testPrepareForReplicationAlreadyReplicating() throws IOException { secondCheckpoint ); - final CopyState secondCopyState = replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - final SegmentReplicationSourceHandler secondHandler = replications.getHandlers().get(replicaAllocationId); - assertEquals(secondHandler.getCopyState(), secondCopyState); - assertEquals("New copy state is incref'd", 1, secondCopyState.refCount()); - assertEquals("Old copy state is cleaned up", 0, copyState.refCount()); - + final SegmentReplicationSourceHandler secondHandler = replications.prepareForReplication( + secondRequest, + mock(FileChunkWriter.class) + ); + assertEquals(secondHandler, replications.getHandlers().get(replicaAllocationId)); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index d586767290797..901dc28794cfc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -68,18 +68,16 @@ public void testSendFiles() throws IOException { chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onResponse(null); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); + final List expectedFiles = List.copyOf(handler.getCheckpoint().getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -106,12 +104,10 @@ public void testSendFiles_emptyRequest() throws IOException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 @@ -148,12 +144,11 @@ public void testSendFileFails() throws IOException { ); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + final CopyState copyState = new CopyState(primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, primary.routingEntry().allocationId().getId(), 5000, 1 @@ -180,19 +175,18 @@ public void onFailure(Exception e) { assertEquals(e.getClass(), OpenSearchException.class); } }); - copyState.decRef(); + copyState.close(); } public void testReplicationAlreadyRunning() throws IOException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + final CopyState copyState = new CopyState(primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 @@ -217,12 +211,10 @@ public void testCancelReplication() throws IOException, InterruptedException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, primary.routingEntry().allocationId().getId(), 5000, 1 diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 85030faeab061..883b415250eb9 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -276,10 +276,7 @@ public void getCheckpointMetadata( ) { try { blockGetCheckpointMetadata.await(); - final CopyState copyState = new CopyState( - ReplicationCheckpoint.empty(primaryShard.shardId(), primaryShard.getLatestReplicationCheckpoint().getCodec()), - primaryShard - ); + final CopyState copyState = new CopyState(primaryShard); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index df180a8ab1007..0b30486038e3a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -18,7 +18,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.Environment; -import org.opensearch.index.codec.CodecService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; @@ -54,13 +53,7 @@ public class CopyStateTests extends IndexShardTestCase { public void testCopyStateCreation() throws IOException { final IndexShard mockIndexShard = createMockIndexShard(); - CopyState copyState = new CopyState( - ReplicationCheckpoint.empty( - mockIndexShard.shardId(), - new CodecService(null, mockIndexShard.indexSettings(), null).codec("default").getName() - ), - mockIndexShard - ); + CopyState copyState = new CopyState(mockIndexShard); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero @@ -86,7 +79,9 @@ public static IndexShard createMockIndexShard() throws IOException { mockShard.getOperationPrimaryTerm(), 0L, 0L, - Codec.getDefault().getName() + 0L, + Codec.getDefault().getName(), + SI_SNAPSHOT.asMap() ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 4e30c315967f8..c57d004359b7a 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1612,12 +1612,10 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - try { - final CopyState copyState = new CopyState(primaryShard.getLatestReplicationCheckpoint(), primaryShard); + try (final CopyState copyState = new CopyState(primaryShard)) { listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); - copyState.decRef(); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState");