Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Remote Store] Fix relocation failure due to transport receive timeout #10788

Merged
merged 1 commit into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -509,4 +509,27 @@ public void testRestoreSnapshotToIndexWithSameNameDifferentUUID() throws Excepti
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}

public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
String primaryShardNode = internalCluster().startDataOnlyNodes(1).get(0);

createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
IndexShard indexShard = getIndexShard(primaryShardNode);
assertFalse(indexShard.isSearchIdleSupported());

String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertFalse(indexShard.isSearchIdleSupported());

indexShard = getIndexShard(replicaShardNode);
assertFalse(indexShard.isSearchIdleSupported());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.isRemoteStoreEnabled) {
logger.warn("Search idle is not supported for remote backed indices");
}
if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) {
logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4436,7 +4436,6 @@ public final boolean isSearchIdle() {
}

/**
*
* Returns true if this shard supports search idle.
* <p>
* Indices using Segment Replication will ignore search idle unless there are no replicas.
Expand All @@ -4445,6 +4444,11 @@ public final boolean isSearchIdle() {
* a new set of segments.
*/
public final boolean isSearchIdleSupported() {
// If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
// task continues to upload to remote store periodically.
if (isRemoteTranslogEnabled()) {
return false;
}
return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,10 @@ public String getTranslogUUID() {
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo(
localCheckpointOfLastCommit + 1
).translogFileGeneration;
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1);
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration;
if (translog.sizeInBytesByMinGen(minReferencedTranslogGeneration) < flushThreshold) {
return false;
}
/*
Expand All @@ -453,7 +453,7 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
localCheckpointTrackerSupplier.get().getProcessedCheckpoint() + 1
).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
return minReferencedTranslogGeneration < translogGenerationOfNewCommit
|| localCheckpointTrackerSupplier.get().getProcessedCheckpoint() == localCheckpointTrackerSupplier.get().getMaxSeqNo();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,9 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
}
}
}

@Override
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minSeqNoToKeep;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2034,4 +2034,8 @@ public static String createEmptyTranslog(
writer.close();
return uuid;
}

public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minUnrefCheckpointInLastCommit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
transportService,
request.targetNode(),
recoverySettings,
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime),
shard.isRemoteTranslogEnabled()
);
handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
return Tuple.tuple(handler, recoveryTarget);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final AtomicLong requestSeqNoGenerator = new AtomicLong(0);
private final RetryableTransportClient retryableTransportClient;
private final RemoteSegmentFileChunkWriter fileChunkWriter;
private final boolean remoteStoreEnabled;

public RemoteRecoveryTargetHandler(
long recoveryId,
ShardId shardId,
TransportService transportService,
DiscoveryNode targetNode,
RecoverySettings recoverySettings,
Consumer<Long> onSourceThrottle
Consumer<Long> onSourceThrottle,
boolean remoteStoreEnabled
) {
this.transportService = transportService;
// It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler
Expand Down Expand Up @@ -111,6 +113,7 @@ public RemoteRecoveryTargetHandler(
requestSeqNoGenerator,
onSourceThrottle
);
this.remoteStoreEnabled = remoteStoreEnabled;
}

public DiscoveryNode targetNode() {
Expand All @@ -129,7 +132,13 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
if (remoteStoreEnabled) {
// If remote store is enabled, during the prepare_translog phase, translog is also downloaded on the
// target host along with incremental segments download.
retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);
} else {
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,15 @@ public void onReplicationFailure(
}
}

@Override
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
assertFalse(primary.isSearchIdleSupported());
assertTrue(primary.isSearchIdle());
assertTrue(primary.scheduledRefresh());
assertFalse(primary.hasRefreshPending());
}

private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException {
final Set<String> segmentsFileNames = Arrays.stream(shard.store().directory().listAll())
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,17 @@ public void testShardIdleWithNoReplicas() throws Exception {
shards.startAll();
final IndexShard primary = shards.getPrimary();
shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
validateShardIdleWithNoReplicas(primary);
}
}

protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
}

/**
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/
Expand Down