Skip to content

Commit

Permalink
More robust timeout for repo analysis (elastic#101184)
Browse files Browse the repository at this point in the history
Replaces the transport-level timeout with an overall timeout on the
whole repository analysis task to ensure that all child tasks terminate
promptly.

Relates elastic#66992 Closes elastic#101182
  • Loading branch information
DaveCTurner authored Oct 23, 2023
1 parent 6b87905 commit cfb0780
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 19 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/101184.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 101184
summary: More robust timeout for repo analysis
area: Snapshot/Restore
type: bug
issues:
- 101182
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,5 @@ setup:
- match: { status: 500 }
- match: { error.type: repository_verification_exception }
- match: { error.reason: "/.*test_repo_slow..analysis.failed.*/" }
- match: { error.root_cause.0.type: receive_timeout_transport_exception }
- match: { error.root_cause.0.type: repository_verification_exception }
- match: { error.root_cause.0.reason: "/.*test_repo_slow..analysis.timed.out.after..1s.*/" }
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -347,8 +349,26 @@ public BytesReference onCompareAndExchange(BytesRegister register, BytesReferenc
}
}

private RepositoryAnalyzeAction.Response analyseRepository(RepositoryAnalyzeAction.Request request) {
return client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS);
public void testTimesOutSpinningRegisterAnalysis() {
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
request.timeout(TimeValue.timeValueMillis(between(1, 1000)));

blobStore.setDisruption(new Disruption() {
@Override
public boolean compareAndExchangeReturnsWitness() {
return false;
}
});
final var exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request));
assertThat(exception.getMessage(), containsString("analysis failed"));
assertThat(
asInstanceOf(RepositoryVerificationException.class, exception.getCause()).getMessage(),
containsString("analysis timed out")
);
}

private void analyseRepository(RepositoryAnalyzeAction.Request request) {
client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS);
}

private static void assertPurpose(OperationPurpose purpose) {
Expand Down Expand Up @@ -464,6 +484,10 @@ default boolean createBlobOnAbort() {
return false;
}

default boolean compareAndExchangeReturnsWitness() {
return true;
}

default BytesReference onCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
return register.compareAndExchange(expected, updated);
}
Expand Down Expand Up @@ -637,8 +661,12 @@ public void compareAndExchangeRegister(
ActionListener<OptionalBytesReference> listener
) {
assertPurpose(purpose);
final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister());
listener.onResponse(OptionalBytesReference.of(disruption.onCompareAndExchange(register, expected, updated)));
if (disruption.compareAndExchangeReturnsWitness()) {
final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister());
listener.onResponse(OptionalBytesReference.of(disruption.onCompareAndExchange(register, expected, updated)));
} else {
listener.onResponse(OptionalBytesReference.MISSING);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.Version;
Expand All @@ -22,6 +23,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -364,6 +366,7 @@ public static class AsyncAction {
private final DiscoveryNodes discoveryNodes;
private final LongSupplier currentTimeMillisSupplier;
private final ActionListener<Response> listener;
private final SubscribableListener<Void> cancellationListener;
private final long timeoutTimeMillis;

// choose the blob path nondeterministically to avoid clashes, assuming that the actual path doesn't matter for reproduction
Expand Down Expand Up @@ -394,15 +397,24 @@ public AsyncAction(
this.discoveryNodes = discoveryNodes;
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.timeoutTimeMillis = currentTimeMillisSupplier.getAsLong() + request.getTimeout().millis();
this.listener = listener;

this.cancellationListener = new SubscribableListener<>();
this.listener = ActionListener.runBefore(listener, () -> cancellationListener.onResponse(null));

responses = new ArrayList<>(request.blobCount);
}

private void fail(Exception e) {
private boolean setFirstFailure(Exception e) {
if (failure.compareAndSet(null, e)) {
transportService.getTaskManager().cancelTaskAndDescendants(task, "task failed", false, ActionListener.noop());
return true;
} else {
return false;
}
}

private void fail(Exception e) {
if (setFirstFailure(e) == false) {
if (innerFailures.tryAcquire()) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof TaskCancelledException || cause instanceof ReceiveTimeoutTransportException) {
Expand All @@ -424,24 +436,34 @@ private boolean isRunning() {
}

if (task.isCancelled()) {
failure.compareAndSet(null, new RepositoryVerificationException(request.repositoryName, "verification cancelled"));
setFirstFailure(new RepositoryVerificationException(request.repositoryName, "verification cancelled"));
// if this CAS failed then we're failing for some other reason, nbd; also if the task is cancelled then its descendants are
// also cancelled, so no further action is needed either way.
return false;
}

if (timeoutTimeMillis < currentTimeMillisSupplier.getAsLong()) {
if (failure.compareAndSet(
null,
new RepositoryVerificationException(request.repositoryName, "analysis timed out after [" + request.getTimeout() + "]")
)) {
transportService.getTaskManager().cancelTaskAndDescendants(task, "timed out", false, ActionListener.noop());
}
// if this CAS failed then we're already failing for some other reason, nbd
return false;
return true;
}

private class CheckForCancelListener implements ActionListener<Void> {
@Override
public void onResponse(Void unused) {
// task complete, nothing to do
}

return true;
@Override
public void onFailure(Exception e) {
assert e instanceof ElasticsearchTimeoutException : e;
if (isRunning()) {
// if this CAS fails then we're already failing for some other reason, nbd
setFirstFailure(
new RepositoryVerificationException(
request.repositoryName,
"analysis timed out after [" + request.getTimeout() + "]"
)
);
}
}
}

public void run() {
Expand All @@ -450,6 +472,9 @@ public void run() {

logger.info("running analysis of repository [{}] using path [{}]", request.getRepositoryName(), blobPath);

cancellationListener.addTimeout(request.getTimeout(), repository.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
cancellationListener.addListener(new CheckForCancelListener());

final Random random = new Random(request.getSeed());
final List<DiscoveryNode> nodes = getSnapshotNodes(discoveryNodes);

Expand Down Expand Up @@ -536,7 +561,7 @@ private void runBlobAnalysis(Releasable ref, final BlobAnalyzeAction.Request req
BlobAnalyzeAction.NAME,
request,
task,
TransportRequestOptions.timeout(TimeValue.timeValueMillis(timeoutTimeMillis - currentTimeMillisSupplier.getAsLong())),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(BlobAnalyzeAction.Response response) {
Expand Down

0 comments on commit cfb0780

Please sign in to comment.