Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow AsyncTasks to indicate if they should not be scheduled during a shutdown #10860

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))
- Add task completion count in search backpressure stats API ([#10028](https://github.com/opensearch-project/OpenSearch/pull/10028/))
- Performance improvement for Datetime field caching ([#4558](https://github.com/opensearch-project/OpenSearch/issues/4558))
- Silently ignore rejections when threadpools are terminating ([#10860](https://github.com/opensearch-project/OpenSearch/pull/10860))


### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.common.util.concurrent;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;

Expand All @@ -44,6 +46,8 @@
* @opensearch.internal
*/
public class OpenSearchAbortPolicy implements XRejectedExecutionHandler {
private static final Logger LOG = LogManager.getLogger(OpenSearchAbortPolicy.class);

private final CounterMetric rejected = new CounterMetric();

@Override
Expand All @@ -64,6 +68,10 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
rejected.inc();
if (executor.isTerminating() || executor.isTerminated() || executor.isShutdown()) {
Copy link
Collaborator

@reta reta Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peternied we should not do that: the listeners will be called on rejection to notify the caller that the submission failed, with this change it won't happen anymore, leaving some callbacks in dangling state

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta Thanks for the feedback, I picked an ugly issue to attempt to root cause and fix in the flaky test space - I might be making assumptions that are not well founded.

I understand from the programming purity - this is against the conventions of executors; however, aren't the only cases where we are shutting down executors when we are shutting down nodes? Is there a better way to determine that the cluster is going down so these sources of failures can be ignored?

My goal with this change is to stabilize the test infrastructure assuming this change passes CI, don't we have sufficient coverage to accept this change?

Copy link
Collaborator

@reta reta Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand from the programming purity - this is against the conventions of executors; however, aren't the only cases where we are shutting down executors when we are shutting down nodes?

It may be the case, but even than we need a clean shutdown, tracing comes immediately as an example here - shutting down the node should gracefully flush the data to collector but with this change, the trace spans won't be flushed since they will be left in dangling state.

My goal with this change is to stabilize the test infrastructure assuming this change passes CI, don't we have sufficient coverage to accept this change?

We should have coverage but we may end up well in a flaky state. But this is not really about coverage - we break the contract by not calling the callbacks when we should.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To revisit;

Is there a better way to determine that the cluster is going down so these sources of failures can be ignored?

I’ve got two ways forward that I’ll pursue:

  1. The stale state detection task in seg-rep is not-useful while a node is shutting down maybe I can dequeue or swallow the scheduling from that system.

  2. Failing that, I’m going to see if there is a way to keep honoring the contract - if there are callbacks - but not in other scenarios.

Copy link
Collaborator

@reta reta Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stale state detection task in seg-rep is not-useful while a node is shutting down maybe I can dequeue or swallow the scheduling from that system.

In some places, there are lifecycle check, when node goes down, the lifecycle check may not even proceed with the tasks (I would strongly advocate to not alter the pool behaviour - this is low level tool).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I found a way forward using the first path which is much cleaner in my mind, not 100% sure how I can validate other than get a number of test runs in. Let me know what you think of this.

LOG.warn("Silently rejecting " + r + " due to shutdown on " + executor);
return;
}
throw new OpenSearchRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@

package org.opensearch.common.util.concurrent;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.test.OpenSearchTestCase;
import org.hamcrest.Matcher;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;

import java.util.Locale;
import java.util.concurrent.CountDownLatch;
Expand All @@ -45,10 +44,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import org.hamcrest.Matcher;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.test.OpenSearchTestCase;

/**
* Tests for OpenSearchExecutors and its components like OpenSearchAbortPolicy.
Expand Down Expand Up @@ -344,30 +344,14 @@ public String toString() {
latch.countDown();
terminate(executor);
}
try {
executor.execute(new Runnable() {
@Override
public void run() {
// Doesn't matter is going to be rejected
}

@Override
public String toString() {
return "dummy runnable";
}
});
fail("Didn't get a rejection when we expected one.");
} catch (OpenSearchRejectedExecutionException e) {
assertTrue("Thread pool not registering as terminated when it is", e.isExecutorShutdown());
String message = e.getMessage();
assertThat(message, containsString("of dummy runnable"));
assertThat(message, containsString("on OpenSearchThreadPoolExecutor[name = " + getName()));
assertThat(message, containsString("queue capacity = " + queue));
assertThat(message, containsString("[Terminated"));
assertThat(message, containsString("active threads = 0"));
assertThat(message, containsString("queued tasks = 0"));
assertThat(message, containsString("completed tasks = " + actions));
}
final AtomicBoolean taskCompleted = new AtomicBoolean();
final long beforeCompletedTaskCount = executor.getCompletedTaskCount();
executor.execute(() -> taskCompleted.set(true));
executor.execute(() -> taskCompleted.set(true));

assertThat("All tasks were rejected, so the task completed flag was never set", taskCompleted.get(), equalTo(false));
assertThat("No additional completions recorded for rejected tasks", executor.getCompletedTaskCount(), equalTo(beforeCompletedTaskCount));
}

public void testInheritContext() throws InterruptedException {
Expand Down
Loading