diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java index 9fd3b7f3afb80..24b744bebc53d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -40,6 +40,12 @@ public CheckpointInfoResponse( this.infosBytes = infosBytes; } + public CheckpointInfoResponse(final ReplicationCheckpoint checkpoint, final byte[] infosBytes) { + this.checkpoint = checkpoint; + this.infosBytes = infosBytes; + this.metadataMap = checkpoint.getMetadataMap(); + } + public CheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new); diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 6bdec4034f610..b9caf8591f432 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -21,12 +21,10 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.CopyState; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,7 +34,6 @@ /** * Manages references to ongoing segrep events on a node. * Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication. - * CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments. * * @opensearch.internal */ @@ -45,7 +42,6 @@ class OngoingSegmentReplications { private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class); private final RecoverySettings recoverySettings; private final IndicesService indicesService; - private final Map copyStateMap; private final Map allocationIdToHandlers; /** @@ -57,46 +53,9 @@ class OngoingSegmentReplications { OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) { this.indicesService = indicesService; this.recoverySettings = recoverySettings; - this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap(); } - /* - Operations on the {@link #copyStateMap} member. - */ - - /** - * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key - * and returns the cached value if one is present. If the key is not present, a {@link CopyState} - * object is constructed and stored in the map before being returned. - */ - synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { - if (isInCopyStateMap(checkpoint)) { - final CopyState copyState = fetchFromCopyStateMap(checkpoint); - // we incref the copyState for every replica that is using this checkpoint. - // decref will happen when copy completes. - copyState.incRef(); - return copyState; - } else { - // From the checkpoint's shard ID, fetch the IndexShard - ShardId shardId = checkpoint.getShardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.id()); - // build the CopyState object and cache it before returning - final CopyState copyState = new CopyState(checkpoint, indexShard); - - /* - Use the checkpoint from the request as the key in the map, rather than - the checkpoint from the created CopyState. This maximizes cache hits - if replication targets make a request with an older checkpoint. - Replication targets are expected to fetch the checkpoint in the response - CopyState to bring themselves up to date. - */ - addToCopyStateMap(checkpoint, copyState); - return copyState; - } - } - /** * Start sending files to the replica. * @@ -114,12 +73,10 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId()); - if (sourceHandler != null) { - removeCopyState(sourceHandler.getCopyState()); - } - }); + final ActionListener wrappedListener = ActionListener.runBefore( + listener, + () -> allocationIdToHandlers.remove(request.getTargetAllocationId()) + ); handler.sendFiles(request, wrappedListener); } else { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); @@ -127,38 +84,25 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry"); - assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()) == false; - allocationIdToHandlers.put(request.getTargetAllocationId(), newHandler); - } - assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()); - return copyState; + SegmentReplicationSourceHandler prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) { + // From the checkpoint's shard ID, fetch the IndexShard + final ShardId shardId = request.getCheckpoint().getShardId(); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + return allocationIdToHandlers.computeIfAbsent(request.getTargetAllocationId(), aId -> { + try { + return createTargetHandler(request.getTargetNode(), indexShard, request.getTargetAllocationId(), fileChunkWriter); + } catch (IOException e) { + throw new UncheckedIOException("Error creating replication handler", e); + } + }); } /** @@ -167,8 +111,8 @@ synchronized CopyState prepareForReplication(CheckpointInfoRequest request, File * @param shard {@link IndexShard} * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(IndexShard shard, String reason) { - cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason); + void cancel(IndexShard shard, String reason) { + cancelHandlers(handler -> handler.shardId().equals(shard.shardId()), reason); } /** @@ -177,11 +121,10 @@ synchronized void cancel(IndexShard shard, String reason) { * @param allocationId {@link String} - Allocation ID. * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(String allocationId, String reason) { + void cancel(String allocationId, String reason) { final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId); if (handler != null) { handler.cancel(reason); - removeCopyState(handler.getCopyState()); } } @@ -194,14 +137,6 @@ void cancelReplication(DiscoveryNode node) { cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left"); } - /** - * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} - * as a key by invoking {@link Map#containsKey(Object)}. - */ - boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.containsKey(replicationCheckpoint); - } - int size() { return allocationIdToHandlers.size(); } @@ -211,21 +146,16 @@ Map getHandlers() { return allocationIdToHandlers; } - int cachedCopyStateSize() { - return copyStateMap.size(); - } - private SegmentReplicationSourceHandler createTargetHandler( DiscoveryNode node, - CopyState copyState, + IndexShard shard, String allocationId, FileChunkWriter fileChunkWriter - ) { + ) throws IOException { return new SegmentReplicationSourceHandler( node, fileChunkWriter, - copyState.getShard().getThreadPool(), - copyState, + shard, allocationId, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks() @@ -233,36 +163,19 @@ private SegmentReplicationSourceHandler createTargetHandler( } /** - * Adds the input {@link CopyState} object to {@link #copyStateMap}. - * The key is the CopyState's {@link ReplicationCheckpoint} object. - */ - private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { - copyStateMap.putIfAbsent(checkpoint, copyState); - } - - /** - * Given a {@link ReplicationCheckpoint}, return the corresponding - * {@link CopyState} object, if any, from {@link #copyStateMap}. - */ - private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.get(replicationCheckpoint); - } - - /** - * Remove a CopyState. Intended to be called after a replication event completes. - * This method will remove a copyState from the copyStateMap only if its refCount hits 0. - * - * @param copyState {@link CopyState} + * Clear handlers for any allocationIds not in sync. + * @param shardId {@link ShardId} + * @param inSyncAllocationIds {@link List} of in-sync allocation Ids. */ - private synchronized void removeCopyState(CopyState copyState) { - if (copyState.decRef() == true) { - copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); - } + void clearOutOfSyncIds(ShardId shardId, Set inSyncAllocationIds) { + cancelHandlers( + (handler) -> handler.shardId().equals(shardId) && inSyncAllocationIds.contains(handler.getAllocationId()) == false, + "Shard is no longer in-sync with the primary" + ); } /** * Remove handlers from allocationIdToHandlers map based on a filter predicate. - * This will also decref the handler's CopyState reference. */ private void cancelHandlers(Predicate predicate, String reason) { final List allocationIds = allocationIdToHandlers.values() @@ -278,17 +191,4 @@ private void cancelHandlers(Predicate p cancel(allocationId, reason); } } - - /** - * Clear copystate and target handlers for any non insync allocationIds. - * @param shardId {@link ShardId} - * @param inSyncAllocationIds {@link List} of in-sync allocation Ids. - */ - public void clearOutOfSyncIds(ShardId shardId, Set inSyncAllocationIds) { - cancelHandlers( - (handler) -> handler.getCopyState().getShard().shardId().equals(shardId) - && inSyncAllocationIds.contains(handler.getAllocationId()) == false, - "Shard is no longer in-sync with the primary" - ); - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 674c09311c645..bb64d6b0c60b6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -18,16 +18,18 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; -import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -54,48 +56,46 @@ class SegmentReplicationSourceHandler { private final AtomicBoolean isReplicating = new AtomicBoolean(); private final DiscoveryNode targetNode; private final String allocationId; - private final FileChunkWriter writer; /** * Constructor. * - * @param targetNode - {@link DiscoveryNode} target node where files should be sent. + * @param targetNode {@link DiscoveryNode} target node where files should be sent. * @param writer {@link FileChunkWriter} implementation that sends file chunks over the transport layer. - * @param threadPool {@link ThreadPool} Thread pool. - * @param copyState {@link CopyState} CopyState holding segment file metadata. + * @param shard {@link IndexShard} The primary shard local to this node. * @param fileChunkSizeInBytes {@link Integer} * @param maxConcurrentFileChunks {@link Integer} */ SegmentReplicationSourceHandler( DiscoveryNode targetNode, FileChunkWriter writer, - ThreadPool threadPool, - CopyState copyState, + IndexShard shard, String allocationId, int fileChunkSizeInBytes, int maxConcurrentFileChunks - ) { + ) throws IOException { this.targetNode = targetNode; - this.shard = copyState.getShard(); + this.shard = shard; this.logger = Loggers.getLogger( SegmentReplicationSourceHandler.class, - copyState.getShard().shardId(), + shard.shardId(), "sending segments to " + targetNode.getName() ); this.segmentFileTransferHandler = new SegmentFileTransferHandler( - copyState.getShard(), + shard, targetNode, writer, logger, - threadPool, + shard.getThreadPool(), cancellableThreads, fileChunkSizeInBytes, maxConcurrentFileChunks ); this.allocationId = allocationId; - this.copyState = copyState; + this.copyState = new CopyState(shard); this.writer = writer; + resources.add(copyState); } /** @@ -109,6 +109,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene if (request.getFilesToFetch().isEmpty()) { // before completion, alert the primary of the replica's state. shard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint()); + IOUtils.closeWhileHandlingException(copyState); listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); return; } @@ -183,10 +184,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene public void cancel(String reason) { writer.cancel(); cancellableThreads.cancel(reason); - } - - CopyState getCopyState() { - return copyState; + IOUtils.closeWhileHandlingException(copyState); } public boolean isReplicating() { @@ -200,4 +198,16 @@ public DiscoveryNode getTargetNode() { public String getAllocationId() { return allocationId; } + + public ReplicationCheckpoint getCheckpoint() { + return copyState.getCheckpoint(); + } + + public byte[] getInfosBytes() { + return copyState.getInfosBytes(); + } + + public ShardId shardId() { + return shard.shardId(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index a33fabb0517d0..c013b1aa87f75 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -29,7 +29,6 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; -import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -126,16 +125,17 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan new AtomicLong(0), (throttleTime) -> {} ); - final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); - channel.sendResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) + final SegmentReplicationSourceHandler handler = ongoingSegmentReplications.prepareForReplication( + request, + segmentSegmentFileChunkWriter ); + channel.sendResponse(new CheckpointInfoResponse(handler.getCheckpoint(), handler.getInfosBytes())); timer.stop(); logger.trace( new ParameterizedMessage( "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", request.getReplicationId(), - copyState.getCheckpoint(), + handler.getCheckpoint(), request.getTargetNode().getId(), timer.time() ) diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 3b7ae2af80ca0..7d3eb9083208b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -13,11 +13,11 @@ import org.apache.lucene.store.ByteBuffersIndexOutput; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; -import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; @@ -28,28 +28,21 @@ * * @opensearch.internal */ -public class CopyState extends AbstractRefCounted { +public class CopyState implements Closeable { private final GatedCloseable segmentInfosRef; - /** ReplicationCheckpoint requested */ - private final ReplicationCheckpoint requestedReplicationCheckpoint; /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; - private final Map metadataMap; private final byte[] infosBytes; private final IndexShard shard; - public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { - super("CopyState-" + shard.shardId()); - this.requestedReplicationCheckpoint = requestedReplicationCheckpoint; + public CopyState(IndexShard shard) throws IOException { this.shard = shard; final Tuple, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard .getLatestSegmentInfosAndCheckpoint(); this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1(); this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); - this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); - ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { @@ -58,21 +51,12 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.infosBytes = buffer.toArrayCopy(); } - @Override - protected void closeInternal() { - try { - segmentInfosRef.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - public ReplicationCheckpoint getCheckpoint() { return replicationCheckpoint; } public Map getMetadataMap() { - return metadataMap; + return replicationCheckpoint.getMetadataMap(); } public byte[] getInfosBytes() { @@ -83,7 +67,12 @@ public IndexShard getShard() { return shard; } - public ReplicationCheckpoint getRequestedReplicationCheckpoint() { - return requestedReplicationCheckpoint; + @Override + public void close() throws IOException { + try { + segmentInfosRef.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 7caff3e5f5479..e4b5dc8d96d06 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -1019,25 +1019,15 @@ protected void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCo } protected void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { - final CopyState copyState; - try { - copyState = new CopyState( - ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), - primary + try (final CopyState copyState = new CopyState(primary)) { + listener.onResponse( + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState"); throw new UncheckedIOException(e); } - - try { - listener.onResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) - ); - } finally { - copyState.decRef(); - } } protected void startReplicationAndAssertCancellation( diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 44e2653cf01da..eb27850000bdd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -25,7 +25,6 @@ import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.transport.TransportService; import org.junit.Assert; @@ -106,25 +105,21 @@ public void testPrepareAndSendSegments() throws IOException { final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { listener.onResponse(null); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, copyState.refCount()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataMap().values()), + new ArrayList<>(handler.getCheckpoint().getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - assertEquals(copyState.getMetadataMap().size(), getSegmentFilesResponse.files.size()); - assertEquals(0, copyState.refCount()); - assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(handler.getCheckpoint().getMetadataMap().size(), getSegmentFilesResponse.files.size()); assertEquals(0, replications.size()); } @@ -148,14 +143,11 @@ public void testCancelReplication() throws IOException { // this shouldn't be called in this test. Assert.fail(); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); replications.cancelReplication(primaryDiscoveryNode); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); } public void testCancelReplication_AfterSendFilesStarts() throws IOException, InterruptedException { @@ -174,14 +166,13 @@ public void testCancelReplication_AfterSendFilesStarts() throws IOException, Int // cancel the replication as soon as the writer starts sending files. replications.cancel(replica.routingEntry().allocationId().getId(), "Test"); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataMap().values()), + new ArrayList<>(handler.getCheckpoint().getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @@ -193,9 +184,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { @Override public void onFailure(Exception e) { assertEquals(CancellableThreads.ExecutionCancelledException.class, e.getClass()); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); latch.countDown(); } }); @@ -219,8 +208,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { Assert.fail(); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, @@ -230,15 +218,11 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { ); replications.prepareForReplication(secondRequest, segmentSegmentFileChunkWriter); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); replications.cancelReplication(primaryDiscoveryNode); replications.cancelReplication(replicaDiscoveryNode); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(secondReplica); } @@ -280,8 +264,7 @@ public void testShardAlreadyReplicatingToNode() throws IOException { listener.onResponse(null); }; replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); } public void testStartReplicationWithNoFilesToFetch() throws IOException { @@ -296,10 +279,8 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { // mock the FileChunkWriter so we can assert its ever called. final FileChunkWriter segmentSegmentFileChunkWriter = mock(FileChunkWriter.class); // Prepare for replication step - and ensure copyState is added to cache. - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, copyState.refCount()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -314,8 +295,6 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { assertEquals(Collections.emptyList(), getSegmentFilesResponse.files); - assertEquals(0, copyState.refCount()); - assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); verifyNoInteractions(segmentSegmentFileChunkWriter); } @@ -340,8 +319,7 @@ public void testCancelAllReplicationsForShard() throws IOException { testCheckpoint ); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, @@ -351,15 +329,11 @@ public void testCancelAllReplicationsForShard() throws IOException { ); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); // cancel the primary's ongoing replications. replications.cancel(primary, "Test"); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(replica_2); } @@ -372,8 +346,7 @@ public void testCancelForMissingIds() throws IOException { final String replicaAllocationId = replica.routingEntry().allocationId().getId(); final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); final String replica_2AllocationId = replica_2.routingEntry().allocationId().getId(); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( @@ -384,23 +357,17 @@ public void testCancelForMissingIds() throws IOException { ); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); assertTrue(replications.getHandlers().containsKey(replicaAllocationId)); assertTrue(replications.getHandlers().containsKey(replica_2AllocationId)); - assertEquals(1, replications.cachedCopyStateSize()); replications.clearOutOfSyncIds(primary.shardId(), Set.of(replica_2AllocationId)); - assertEquals(1, copyState.refCount()); assertEquals(1, replications.size()); assertTrue(replications.getHandlers().containsKey(replica_2AllocationId)); - assertEquals(1, replications.cachedCopyStateSize()); // cancel the primary's ongoing replications. replications.clearOutOfSyncIds(primary.shardId(), Collections.emptySet()); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(replica_2); } @@ -409,11 +376,8 @@ public void testPrepareForReplicationAlreadyReplicating() throws IOException { final String replicaAllocationId = replica.routingEntry().allocationId().getId(); final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - - final SegmentReplicationSourceHandler handler = replications.getHandlers().get(replicaAllocationId); - assertEquals(handler.getCopyState(), copyState); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); + assertEquals(handler, replications.getHandlers().get(replicaAllocationId)); ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( testCheckpoint.getShardId(), @@ -430,11 +394,10 @@ public void testPrepareForReplicationAlreadyReplicating() throws IOException { secondCheckpoint ); - final CopyState secondCopyState = replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - final SegmentReplicationSourceHandler secondHandler = replications.getHandlers().get(replicaAllocationId); - assertEquals(secondHandler.getCopyState(), secondCopyState); - assertEquals("New copy state is incref'd", 1, secondCopyState.refCount()); - assertEquals("Old copy state is cleaned up", 0, copyState.refCount()); - + final SegmentReplicationSourceHandler secondHandler = replications.prepareForReplication( + secondRequest, + mock(FileChunkWriter.class) + ); + assertEquals(secondHandler, replications.getHandlers().get(replicaAllocationId)); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index d586767290797..901dc28794cfc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -68,18 +68,16 @@ public void testSendFiles() throws IOException { chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onResponse(null); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); + final List expectedFiles = List.copyOf(handler.getCheckpoint().getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -106,12 +104,10 @@ public void testSendFiles_emptyRequest() throws IOException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 @@ -148,12 +144,11 @@ public void testSendFileFails() throws IOException { ); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + final CopyState copyState = new CopyState(primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, primary.routingEntry().allocationId().getId(), 5000, 1 @@ -180,19 +175,18 @@ public void onFailure(Exception e) { assertEquals(e.getClass(), OpenSearchException.class); } }); - copyState.decRef(); + copyState.close(); } public void testReplicationAlreadyRunning() throws IOException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + final CopyState copyState = new CopyState(primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 @@ -217,12 +211,10 @@ public void testCancelReplication() throws IOException, InterruptedException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, primary.routingEntry().allocationId().getId(), 5000, 1 diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 3c72dda2d8b5d..f06d5595afcd5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -273,10 +273,7 @@ public void getCheckpointMetadata( ) { try { blockGetCheckpointMetadata.await(); - final CopyState copyState = new CopyState( - ReplicationCheckpoint.empty(primaryShard.shardId(), primaryShard.getLatestReplicationCheckpoint().getCodec()), - primaryShard - ); + final CopyState copyState = new CopyState(primaryShard); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index df180a8ab1007..0b30486038e3a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -18,7 +18,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.Environment; -import org.opensearch.index.codec.CodecService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; @@ -54,13 +53,7 @@ public class CopyStateTests extends IndexShardTestCase { public void testCopyStateCreation() throws IOException { final IndexShard mockIndexShard = createMockIndexShard(); - CopyState copyState = new CopyState( - ReplicationCheckpoint.empty( - mockIndexShard.shardId(), - new CodecService(null, mockIndexShard.indexSettings(), null).codec("default").getName() - ), - mockIndexShard - ); + CopyState copyState = new CopyState(mockIndexShard); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero @@ -86,7 +79,9 @@ public static IndexShard createMockIndexShard() throws IOException { mockShard.getOperationPrimaryTerm(), 0L, 0L, - Codec.getDefault().getName() + 0L, + Codec.getDefault().getName(), + SI_SNAPSHOT.asMap() ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index bf1c4d4c94e04..51190982f0568 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1603,12 +1603,10 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - try { - final CopyState copyState = new CopyState(primaryShard.getLatestReplicationCheckpoint(), primaryShard); + try (final CopyState copyState = new CopyState(primaryShard)) { listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); - copyState.decRef(); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState");