Skip to content

Commit

Permalink
Remote reindex: Add support for configurable retry mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala committed Mar 8, 2024
1 parent 11836d0 commit 990457c
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public List<Setting<?>> getSettings() {
final List<Setting<?>> settings = new ArrayList<>();
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT);
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -85,6 +87,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.synchronizedList;
import static java.util.Objects.requireNonNull;
import static org.opensearch.action.bulk.BackoffPolicy.exponentialBackoff;
import static org.opensearch.index.VersionType.INTERNAL;

public class Reindexer {
Expand Down Expand Up @@ -141,7 +144,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
task,
logger,
// Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging.
Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())),
assigningClient,
threadPool,
scriptService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.ScriptService;
Expand All @@ -51,6 +52,7 @@

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand All @@ -71,11 +73,29 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
Function.identity(),
Property.NodeScope
);

public static final Setting<TimeValue> REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting(
"reindex.remote.retry.initial_backoff",
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(50),
TimeValue.timeValueMillis(5000),
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Integer> REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting(
"reindex.remote.retry.max_count", 15, 1, 100,
Property.Dynamic,
Property.NodeScope
);

public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();

private final ReindexValidator reindexValidator;
private final Reindexer reindexer;

private final ClusterService clusterService;

@Inject
public TransportReindexAction(
Settings settings,
Expand All @@ -92,10 +112,16 @@ public TransportReindexAction(
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
if (request.getRemoteInfo() != null) {
request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT));
request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF));
}

reindexValidator.initialValidation(request);
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
reindexer.initTask(bulkByScrollTask, request, new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@
import org.opensearch.core.xcontent.XContentParseException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.reindex.RejectAwareActionListener;
import org.opensearch.index.reindex.RetryListener;
import org.opensearch.index.reindex.ScrollableHitSource;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -99,21 +102,29 @@ public RemoteScrollableHitSource(

@Override
protected void doStart(RejectAwareActionListener<Response> searchListener) {
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
logger.debug("Starting remote reindex for {}", Arrays.toString(searchRequest.indices()));
lookupRemoteVersion(RejectAwareActionListener.wrap(version -> {
remoteVersion = version;
execute(
logger.debug("Starting initial search");
executeWithRetries(
RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
RESPONSE_PARSER,
RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r))
);
}));
// Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll) level.
}, searchListener::onFailure, searchListener::onFailure));
}

void lookupRemoteVersion(RejectAwareActionListener<Version> listener) {
execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener);
logger.debug("Checking version for remote domain");
// We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately
// instead of retrying for longer duration.
//execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener);
listener.onResponse(Version.CURRENT);
}

private void onStartResponse(RejectAwareActionListener<Response> searchListener, Response response) {
logger.debug("On initial search response");
if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) {
logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener);
Expand All @@ -124,12 +135,14 @@ private void onStartResponse(RejectAwareActionListener<Response> searchListener,

@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
logger.debug("Starting next scroll call");
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
}

@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
logger.debug("Clearing the scrollID {}", scrollId);
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
Expand Down Expand Up @@ -180,17 +193,31 @@ protected void cleanup(Runnable onCompletion) {
});
}

private void executeWithRetries(
Request request,
BiFunction<XContentParser, MediaType, Response> parser,
RejectAwareActionListener<Response> childListener
) {
execute(request, parser,
new RetryListener(logger, threadPool, backoffPolicy, r -> {
logger.info("Retrying execute request");
execute(request, parser, r);
}, childListener));
}

private <T> void execute(
Request request,
BiFunction<XContentParser, MediaType, T> parser,
RejectAwareActionListener<? super T> listener
) {
logger.debug("Executing http request to remote cluster");
// Preserve the thread context so headers survive after the call
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
try {
client.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
logger.debug("Successfully got response from the remote");
// Restore the thread context to get the precious headers
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
Expand All @@ -205,7 +232,7 @@ public void onSuccess(org.opensearch.client.Response response) {
}
if (mediaType == null) {
try {
logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
throw new OpenSearchException(
"Response didn't include supported Content-Type, remote is likely not an OpenSearch instance"
);
Expand Down Expand Up @@ -237,22 +264,27 @@ public void onSuccess(org.opensearch.client.Response response) {
public void onFailure(Exception e) {
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
logger.debug("Received response failure {}", e.getMessage());
if (e instanceof ResponseException) {
ResponseException re = (ResponseException) e;
int statusCode = re.getResponse().getStatusLine().getStatusCode();
e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re);
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) {
// retry all 5xx & 429s.
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode || statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
listener.onRejection(e);
return;
}
} else if (e instanceof ConnectException) {
listener.onRejection(e);
return;
} else if (e instanceof ContentTooLongException) {
e = new IllegalArgumentException(
"Remote responded with a chunk that was too large. Use a smaller batch size.",
e
);
}
listener.onFailure(e);
}
listener.onFailure(e);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
*
* @opensearch.internal
*/
class RetryListener implements RejectAwareActionListener<ScrollableHitSource.Response> {
public class RetryListener implements RejectAwareActionListener<ScrollableHitSource.Response> {
private final Logger logger;
private final Iterator<TimeValue> retries;
private final ThreadPool threadPool;
private final Consumer<RejectAwareActionListener<ScrollableHitSource.Response>> retryScrollHandler;
private final ActionListener<ScrollableHitSource.Response> delegate;
private int retryCount = 0;

RetryListener(
public RetryListener(
Logger logger,
ThreadPool threadPool,
BackoffPolicy backoffPolicy,
Expand Down Expand Up @@ -84,7 +84,7 @@ public void onRejection(Exception e) {
if (retries.hasNext()) {
retryCount += 1;
TimeValue delay = retries.next();
logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
logger.info(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay, e.getMessage()));
schedule(() -> retryScrollHandler.accept(this), delay);
} else {
logger.warn(() -> new ParameterizedMessage("giving up on search because we retried [{}] times without success", retryCount), e);
Expand Down

0 comments on commit 990457c

Please sign in to comment.