Skip to content

Commit

Permalink
[Remote Store] Fix relocation failure due to transport receive timeout (
Browse files Browse the repository at this point in the history
#10761)

* [Remote Store] Fix relocation failure due to transport receive timeout

Signed-off-by: Ashish Singh <[email protected]>

* Fix existing extended shardIdle for remote backed shards

Signed-off-by: Ashish Singh <[email protected]>

* Incorporate PR review comments

Signed-off-by: Ashish Singh <[email protected]>

---------

Signed-off-by: Ashish Singh <[email protected]>
(cherry picked from commit a1fde65)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 20, 2023
1 parent eed9d99 commit a75ec2e
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,4 +509,27 @@ public void testRestoreSnapshotToIndexWithSameNameDifferentUUID() throws Excepti
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}

public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
String primaryShardNode = internalCluster().startDataOnlyNodes(1).get(0);

createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
IndexShard indexShard = getIndexShard(primaryShardNode);
assertFalse(indexShard.isSearchIdleSupported());

String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertFalse(indexShard.isSearchIdleSupported());

indexShard = getIndexShard(replicaShardNode);
assertFalse(indexShard.isSearchIdleSupported());
}
}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.isRemoteStoreEnabled) {
logger.warn("Search idle is not supported for remote backed indices");
}
if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) {
logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4436,7 +4436,6 @@ public final boolean isSearchIdle() {
}

/**
*
* Returns true if this shard supports search idle.
* <p>
* Indices using Segment Replication will ignore search idle unless there are no replicas.
Expand All @@ -4445,6 +4444,11 @@ public final boolean isSearchIdle() {
* a new set of segments.
*/
public final boolean isSearchIdleSupported() {
// If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
// task continues to upload to remote store periodically.
if (isRemoteTranslogEnabled()) {
return false;
}
return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,10 @@ public String getTranslogUUID() {
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo(
localCheckpointOfLastCommit + 1
).translogFileGeneration;
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1);
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration;
if (translog.sizeInBytesByMinGen(minReferencedTranslogGeneration) < flushThreshold) {
return false;
}
/*
Expand All @@ -453,7 +453,7 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
localCheckpointTrackerSupplier.get().getProcessedCheckpoint() + 1
).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
return minReferencedTranslogGeneration < translogGenerationOfNewCommit
|| localCheckpointTrackerSupplier.get().getProcessedCheckpoint() == localCheckpointTrackerSupplier.get().getMaxSeqNo();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,9 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
}
}
}

@Override
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minSeqNoToKeep;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2034,4 +2034,8 @@ public static String createEmptyTranslog(
writer.close();
return uuid;
}

public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minUnrefCheckpointInLastCommit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
transportService,
request.targetNode(),
recoverySettings,
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime),
shard.isRemoteTranslogEnabled()
);
handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
return Tuple.tuple(handler, recoveryTarget);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final AtomicLong requestSeqNoGenerator = new AtomicLong(0);
private final RetryableTransportClient retryableTransportClient;
private final RemoteSegmentFileChunkWriter fileChunkWriter;
private final boolean remoteStoreEnabled;

public RemoteRecoveryTargetHandler(
long recoveryId,
ShardId shardId,
TransportService transportService,
DiscoveryNode targetNode,
RecoverySettings recoverySettings,
Consumer<Long> onSourceThrottle
Consumer<Long> onSourceThrottle,
boolean remoteStoreEnabled
) {
this.transportService = transportService;
// It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler
Expand Down Expand Up @@ -111,6 +113,7 @@ public RemoteRecoveryTargetHandler(
requestSeqNoGenerator,
onSourceThrottle
);
this.remoteStoreEnabled = remoteStoreEnabled;
}

public DiscoveryNode targetNode() {
Expand All @@ -129,7 +132,13 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
if (remoteStoreEnabled) {
// If remote store is enabled, during the prepare_translog phase, translog is also downloaded on the
// target host along with incremental segments download.
retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);
} else {
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,15 @@ public void onReplicationFailure(
}
}

@Override
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
assertFalse(primary.isSearchIdleSupported());
assertTrue(primary.isSearchIdle());
assertTrue(primary.scheduledRefresh());
assertFalse(primary.hasRefreshPending());
}

private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException {
final Set<String> segmentsFileNames = Arrays.stream(shard.store().directory().listAll())
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,17 @@ public void testShardIdleWithNoReplicas() throws Exception {
shards.startAll();
final IndexShard primary = shards.getPrimary();
shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
validateShardIdleWithNoReplicas(primary);
}
}

protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
}

/**
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/
Expand Down

0 comments on commit a75ec2e

Please sign in to comment.