From 64539d8ad7fe299e867ffb41b30513f8cbe50378 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Wed, 18 Oct 2023 23:23:10 +0530 Subject: [PATCH 01/14] fix change log conflicts (#10713) Signed-off-by: bansvaru --- CHANGELOG.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f4332c7847db6..0ad18b94f31b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,11 +16,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Admission control] Add Resource usage collector service and resource usage tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890)) - [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557)) - [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) -<<<<<<< Updated upstream - [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535)) -======= - [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) ->>>>>>> Stashed changes ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 From 267bd5a84dfc62b265c053e5cd3abf31b9e01ff5 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Wed, 18 Oct 2023 23:45:36 +0530 Subject: [PATCH 02/14] Version fixes for Resource Usage Stats and FS Stats (#10712) Signed-off-by: Gaurav Bafna --- .../action/admin/cluster/node/stats/NodeStats.java | 4 ++-- .../src/main/java/org/opensearch/monitor/fs/FsInfo.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 6ce6ca40cbce4..0c8aa027e5f01 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -206,7 +206,7 @@ public NodeStats(StreamInput in) throws IOException { } else { searchPipelineStats = null; } - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { resourceUsageStats = in.readOptionalWriteable(NodesResourceUsageStats::new); } else { resourceUsageStats = null; @@ -462,7 +462,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalWriteable(searchPipelineStats); } - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(resourceUsageStats); } if (out.getVersion().onOrAfter(Version.V_3_0_0)) { diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 4e2e9f280d765..8446ab0dd6166 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -335,7 +335,7 @@ public DeviceStats(StreamInput in) throws IOException { previousSectorsRead = in.readLong(); currentSectorsWritten = in.readLong(); previousSectorsWritten = in.readLong(); - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { currentReadTime = in.readLong(); previousReadTime = in.readLong(); currentWriteTime = in.readLong(); @@ -369,7 +369,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(previousSectorsRead); out.writeLong(currentSectorsWritten); out.writeLong(previousSectorsWritten); - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeLong(currentReadTime); out.writeLong(previousReadTime); out.writeLong(currentWriteTime); @@ -533,7 +533,7 @@ public IoStats(StreamInput in) throws IOException { this.totalWriteOperations = in.readLong(); this.totalReadKilobytes = in.readLong(); this.totalWriteKilobytes = in.readLong(); - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { this.totalReadTime = in.readLong(); this.totalWriteTime = in.readLong(); this.totalQueueSize = in.readLong(); @@ -557,7 +557,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalWriteOperations); out.writeLong(totalReadKilobytes); out.writeLong(totalWriteKilobytes); - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeLong(totalReadTime); out.writeLong(totalWriteTime); out.writeLong(totalQueueSize); From 5ec2fe9fb1b215be0da604f4f7e91a75d15f5f87 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Wed, 18 Oct 2023 12:09:01 -0700 Subject: [PATCH 03/14] Add missing entry for 2.11 release notes (#10679) Signed-off-by: Kunal Kotwani --- release-notes/opensearch.release-notes-2.11.0.md | 1 + 1 file changed, 1 insertion(+) diff --git a/release-notes/opensearch.release-notes-2.11.0.md b/release-notes/opensearch.release-notes-2.11.0.md index 7ebf1b433c7c6..040cc053469ed 100644 --- a/release-notes/opensearch.release-notes-2.11.0.md +++ b/release-notes/opensearch.release-notes-2.11.0.md @@ -5,6 +5,7 @@ ### Added - Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386)) - Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) +- Add parallel file download support for remote store based replication ([#8596](https://github.com/opensearch-project/OpenSearch/pull/8596)) - Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694)) - [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666)) - Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131)) From a3c1d505903361a757945400c440aeac9e7ad973 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Wed, 18 Oct 2023 18:53:01 -0700 Subject: [PATCH 04/14] Add @abbashus and @adnapibar back to CODEOWNERS (#10681) --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 4fa118e8486f1..8076adcf00ca9 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @reta @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @gbbafna @setiah @kartg @kotwanikunal @mch2 @nknize @owaiskazi19 @peternied @Rishikesh1159 @ryanbogan @saratvemulapalli @shwetathareja @dreamer-89 @tlfeng @VachaShah @dbwiddis @sachinpkale @sohami @msfroh +* @abbashus @adnapibar @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @dreamer-89 @gbbafna @kartg @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @ryanbogan @sachinpkale @saratvemulapalli @setiah @shwetathareja @sohami @tlfeng @VachaShah From 7936f94ef6692516c58f4c46a8a3747da9cd9f1d Mon Sep 17 00:00:00 2001 From: Vikas Bansal <43470111+vikasvb90@users.noreply.github.com> Date: Thu, 19 Oct 2023 08:40:13 +0530 Subject: [PATCH 05/14] Added close on buffered stream in s3 async upload for additional cleanup (#10710) Signed-off-by: vikasvb90 --- .../s3/async/AsyncPartsHandler.java | 44 +++++++++++------- .../s3/async/AsyncTransferManager.java | 20 +++++--- .../s3/async/AsyncTransferManagerTests.java | 46 ++++++++++++++----- 3 files changed, 75 insertions(+), 35 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index 86bb70e5a40a2..6007d9f9c8a1c 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -29,6 +29,7 @@ import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -140,28 +141,39 @@ private static void uploadPart( ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH ? priorityExecutorService : executorService; + // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered + // data can be retried instead of retrying whole file by the application. + InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); CompletableFuture uploadPartResponseFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.uploadPart( uploadPartRequest, - AsyncRequestBody.fromInputStream( - // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered - // data can be retried instead of retrying whole file by the application. - new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)), - inputStreamContainer.getContentLength(), - streamReadExecutor - ) + AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) ) ); - CompletableFuture convertFuture = uploadPartResponseFuture.thenApply( - uploadPartResponse -> convertUploadPartResponse( - completedParts, - inputStreamContainers, - uploadPartResponse, - partNumber, - uploadRequest.doRemoteDataIntegrityCheck() - ) - ); + CompletableFuture convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> { + try { + inputStream.close(); + } catch (IOException ex) { + log.error( + () -> new ParameterizedMessage( + "Failed to close stream while uploading a part of idx {} and file {}.", + uploadPartRequest.partNumber(), + uploadPartRequest.key() + ), + ex + ); + } + }) + .thenApply( + uploadPartResponse -> convertUploadPartResponse( + completedParts, + inputStreamContainers, + uploadPartResponse, + partNumber, + uploadRequest.doRemoteDataIntegrityCheck() + ) + ); futures.add(convertFuture); CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index db04636b89d50..a52745e33073e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -40,6 +40,7 @@ import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Base64; import java.util.List; @@ -310,17 +311,22 @@ private void uploadInOneChunk( ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH ? priorityExecutorService : executorService; + // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered + // data can be retried instead of retrying whole file by the application. + InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); CompletableFuture putObjectFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.putObject( putObjectRequestBuilder.build(), - AsyncRequestBody.fromInputStream( - // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered - // data can be retried instead of retrying whole file by the application. - new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)), - inputStreamContainer.getContentLength(), - streamReadExecutor - ) + AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) ).handle((resp, throwable) -> { + try { + inputStream.close(); + } catch (IOException e) { + log.error( + () -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()), + e + ); + } if (throwable != null) { Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class); if (unwrappedThrowable != null) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 607453986ab16..97a746cdeed93 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -37,9 +37,14 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -71,17 +76,16 @@ public void testOneChunkUpload() { putObjectResponseCompletableFuture ); + AtomicReference streamRef = new AtomicReference<>(); CompletableFuture resultFuture = asyncTransferManager.uploadObject( s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing }, false, null), - new StreamContext( - (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), - ByteSizeUnit.MB.toBytes(1), - ByteSizeUnit.MB.toBytes(1), - 1 - ), + new StreamContext((partIdx, partSize, position) -> { + streamRef.set(new ZeroInputStream(partSize)); + return new InputStreamContainer(streamRef.get(), partSize, position); + }, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1), new StatsMetricPublisher() ); @@ -92,6 +96,14 @@ public void testOneChunkUpload() { } verify(s3AsyncClient, times(1)).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)); + + boolean closeError = false; + try { + streamRef.get().available(); + } catch (IOException e) { + closeError = e.getMessage().equals("Stream closed"); + } + assertTrue("InputStream was still open after upload", closeError); } public void testOneChunkUploadCorruption() { @@ -162,17 +174,17 @@ public void testMultipartUpload() { abortMultipartUploadResponseCompletableFuture ); + List streams = new ArrayList<>(); CompletableFuture resultFuture = asyncTransferManager.uploadObject( s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing }, true, 3376132981L), - new StreamContext( - (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), - ByteSizeUnit.MB.toBytes(1), - ByteSizeUnit.MB.toBytes(1), - 5 - ), + new StreamContext((partIdx, partSize, position) -> { + InputStream stream = new ZeroInputStream(partSize); + streams.add(stream); + return new InputStreamContainer(stream, partSize, position); + }, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5), new StatsMetricPublisher() ); @@ -182,6 +194,16 @@ public void testMultipartUpload() { fail("did not expect resultFuture to fail"); } + streams.forEach(stream -> { + boolean closeError = false; + try { + stream.available(); + } catch (IOException e) { + closeError = e.getMessage().equals("Stream closed"); + } + assertTrue("InputStream was still open after upload", closeError); + }); + verify(s3AsyncClient, times(1)).createMultipartUpload(any(CreateMultipartUploadRequest.class)); verify(s3AsyncClient, times(5)).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class)); verify(s3AsyncClient, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); From 1d23b88cbf2177861f3a98ab906accf39c32c766 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Thu, 19 Oct 2023 12:28:09 +0530 Subject: [PATCH 06/14] Muting s3 request stats test (#10736) Signed-off-by: Gaurav Bafna --- .../opensearch/repositories/s3/S3BlobStoreRepositoryTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index 1361f3165b653..4df30bfd2169e 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -165,6 +165,7 @@ protected Settings nodeSettings(int nodeOrdinal) { return builder.build(); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10735") @Override public void testRequestStats() throws Exception { final String repository = createRepository(randomName()); From 3899d117722e5517c3c02709cffc981bdf677fd5 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Thu, 19 Oct 2023 17:49:27 +0530 Subject: [PATCH 07/14] Changing version for repo stats blob post backport to 2.x (#10717) Signed-off-by: Gaurav Bafna --- .../opensearch/action/admin/cluster/node/stats/NodeStats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 0c8aa027e5f01..e9bfa358103c8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -211,7 +211,7 @@ public NodeStats(StreamInput in) throws IOException { } else { resourceUsageStats = null; } - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { repositoriesStats = in.readOptionalWriteable(RepositoriesStats::new); } else { repositoriesStats = null; @@ -465,7 +465,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(resourceUsageStats); } - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(repositoriesStats); } } From 69f6f4e30909e215b4ca7fd55fd80cf8c4e8d3a4 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 19 Oct 2023 10:36:25 -0400 Subject: [PATCH 08/14] [AUTO] [main] Add bwc version 2.11.1. (#10648) * Add bwc version 2.11.1 Signed-off-by: GitHub * Update Version.java Signed-off-by: Andriy Redko --------- Signed-off-by: GitHub Signed-off-by: Andriy Redko Co-authored-by: opensearch-ci-bot Co-authored-by: Andriy Redko --- .ci/bwcVersions | 1 + libs/core/src/main/java/org/opensearch/Version.java | 1 + 2 files changed, 2 insertions(+) diff --git a/.ci/bwcVersions b/.ci/bwcVersions index cfaadc5ed1e5e..144a8b71fca39 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -25,4 +25,5 @@ BWC_VERSION: - "2.10.0" - "2.10.1" - "2.11.0" + - "2.11.1" - "2.12.0" diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index eef4da719994c..8d9ee73a02c1d 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -96,6 +96,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_10_0 = new Version(2100099, org.apache.lucene.util.Version.LUCENE_9_7_0); public static final Version V_2_10_1 = new Version(2100199, org.apache.lucene.util.Version.LUCENE_9_7_0); public static final Version V_2_11_0 = new Version(2110099, org.apache.lucene.util.Version.LUCENE_9_7_0); + public static final Version V_2_11_1 = new Version(2110199, org.apache.lucene.util.Version.LUCENE_9_7_0); public static final Version V_2_12_0 = new Version(2120099, org.apache.lucene.util.Version.LUCENE_9_8_0); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_8_0); public static final Version CURRENT = V_3_0_0; From da24ca756a3140f062e2c54d8fd0be88dc62e355 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 19 Oct 2023 15:25:46 -0400 Subject: [PATCH 09/14] Performance Improvement for Datetime formats (update version checks to 2.12.0) (#10754) Signed-off-by: Andriy Redko --- .../src/main/java/org/opensearch/search/DocValueFormat.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/DocValueFormat.java b/server/src/main/java/org/opensearch/search/DocValueFormat.java index 412191c57abd8..7be51643eeb7d 100644 --- a/server/src/main/java/org/opensearch/search/DocValueFormat.java +++ b/server/src/main/java/org/opensearch/search/DocValueFormat.java @@ -243,7 +243,7 @@ public DateTime(DateFormatter formatter, ZoneId timeZone, DateFieldMapper.Resolu } public DateTime(StreamInput in) throws IOException { - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { this.formatter = DateFormatter.forPattern(in.readString(), in.readOptionalString()); } else { this.formatter = DateFormatter.forPattern(in.readString()); @@ -265,12 +265,12 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().before(Version.V_3_0_0) && formatter.equals(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER)) { + if (out.getVersion().before(Version.V_2_12_0) && formatter.equals(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER)) { out.writeString(DateFieldMapper.LEGACY_DEFAULT_DATE_TIME_FORMATTER.pattern()); // required for backwards compatibility } else { out.writeString(formatter.pattern()); } - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalString(formatter.printPattern()); } out.writeString(timeZone.getId()); From e389a09640cf4d687ae5bbe59d36f5e15624e985 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 19 Oct 2023 13:31:01 -0700 Subject: [PATCH 10/14] Fix bug where retries within RemoteStoreRefreshListener cause infos/checkpoint mismatch (#10655) * Fix bug where retries within RemoteStoreRefreshListener cause mismatch between ReplicationCheckpoint and uploaded SegmentInfos. Retries within RemoteStoreRefreshListener run outside of the refresh thread. This means that concurrent refreshes may occur during syncSegments execution updating the on-reader SegmentInfos. A shard's latest ReplicationCheckpoint is computed and set in a refresh listener, but it is not guaranteed the listener has run before the retry fetches the infos or checkpoint independently. This fix ensures the listener recomputes the checkpoint while fetching the SegmentInfos. This change also ensures that we only recompute the checkpoint when necessary because it comes with an IO cost to compute StoreFileMetadata. Signed-off-by: Marc Handalian Update refresh listener to recompute checkpoint from latest infos snapshot. Signed-off-by: Marc Handalian Fix broken test case by comparing segments gen Signed-off-by: Marc Handalian spotless Signed-off-by: Marc Handalian Fix RemoteStoreRefreshListener tests Signed-off-by: Marc Handalian * add extra log Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- .../opensearch/index/shard/IndexShard.java | 67 +++++++++++-------- .../shard/RemoteStoreRefreshListener.java | 6 +- .../RemoteStoreRefreshListenerTests.java | 4 +- .../SegmentReplicationIndexShardTests.java | 27 ++++++++ 4 files changed, 70 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9489c7d7fc1dd..5ebfd3863a6cf 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1608,8 +1608,11 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } /** - * Compute and return the latest ReplicationCheckpoint for a particular shard. - * @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices + * return the most recently computed ReplicationCheckpoint for a particular shard. + * The checkpoint is updated inside a refresh listener and may lag behind the SegmentInfos on the reader. + * To guarantee the checkpoint is upto date with the latest on-reader infos, use `getLatestSegmentInfosAndCheckpoint` instead. + * + * @return {@link ReplicationCheckpoint} - The most recently computed ReplicationCheckpoint. */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { return replicationTracker.getLatestReplicationCheckpoint(); @@ -1628,34 +1631,12 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { public Tuple, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() { assert indexSettings.isSegRepEnabled(); - Tuple, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>( - new GatedCloseable<>(null, () -> {}), - getLatestReplicationCheckpoint() - ); - - if (getEngineOrNull() == null) { - return nullSegmentInfosEmptyCheckpoint; - } // do not close the snapshot - caller will close it. GatedCloseable snapshot = null; try { snapshot = getSegmentInfosSnapshot(); - if (snapshot.get() != null) { - SegmentInfos segmentInfos = snapshot.get(); - final Map metadataMap = store.getSegmentMetadataMap(segmentInfos); - return new Tuple<>( - snapshot, - new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - segmentInfos.getGeneration(), - segmentInfos.getVersion(), - metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(), - getEngine().config().getCodec().getName(), - metadataMap - ) - ); - } + final SegmentInfos segmentInfos = snapshot.get(); + return new Tuple<>(snapshot, computeReplicationCheckpoint(segmentInfos)); } catch (IOException | AlreadyClosedException e) { logger.error("Error Fetching SegmentInfos and latest checkpoint", e); if (snapshot != null) { @@ -1666,7 +1647,39 @@ public Tuple, ReplicationCheckpoint> getLatestSegme } } } - return nullSegmentInfosEmptyCheckpoint; + return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint()); + } + + /** + * Compute the latest {@link ReplicationCheckpoint} from a SegmentInfos. + * This function fetches a metadata snapshot from the store that comes with an IO cost. + * We will reuse the existing stored checkpoint if it is at the same SI version. + * + * @param segmentInfos {@link SegmentInfos} infos to use to compute. + * @return {@link ReplicationCheckpoint} Checkpoint computed from the infos. + * @throws IOException When there is an error computing segment metadata from the store. + */ + ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) throws IOException { + if (segmentInfos == null) { + return ReplicationCheckpoint.empty(shardId); + } + final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint(); + if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion() + && latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) { + return latestReplicationCheckpoint; + } + final Map metadataMap = store.getSegmentMetadataMap(segmentInfos); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + segmentInfos.getVersion(), + metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(), + getEngine().config().getCodec().getName(), + metadataMap + ); + logger.trace("Recomputed ReplicationCheckpoint for shard {}", checkpoint); + return checkpoint; } /** diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 698e61f6f7a09..c650edc31da8d 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -181,7 +181,6 @@ private boolean syncSegments() { // in the remote store. return indexShard.state() != IndexShardState.STARTED || !(indexShard.getEngine() instanceof InternalEngine); } - ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); beforeSegmentsSync(); long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); @@ -199,10 +198,7 @@ private boolean syncSegments() { try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); - assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: " - + segmentInfos.getGeneration() - + " does not match metadata generation: " - + checkpoint.getSegmentsGen(); + final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos); // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 5a13f57db2c87..51814283c5eb3 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -520,8 +520,8 @@ private Tuple mockIn if (counter.incrementAndGet() <= succeedOnAttempt) { throw new RuntimeException("Inducing failure in upload"); } - return indexShard.getLatestSegmentInfosAndCheckpoint(); - })).when(shard).getLatestSegmentInfosAndCheckpoint(); + return indexShard.getLatestReplicationCheckpoint(); + })).when(shard).computeReplicationCheckpoint(any()); doAnswer(invocation -> { if (Objects.nonNull(successLatch)) { diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 52f28aead533d..eab38bfe5c64d 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -925,6 +925,33 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception { } } + public void testReuseReplicationCheckpointWhenLatestInfosIsUnChanged() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + shards.startAll(); + shards.indexDocs(10); + shards.refresh("test"); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(10); + final ReplicationCheckpoint latestReplicationCheckpoint = primaryShard.getLatestReplicationCheckpoint(); + try (GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(segmentInfosSnapshot.get())); + } + final Tuple, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = primaryShard + .getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable closeable = latestSegmentInfosAndCheckpoint.v1()) { + assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(closeable.get())); + } + } + } + + public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + assertEquals(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard.computeReplicationCheckpoint(null)); + } + } + private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard) { final TransportService transportService = mock(TransportService.class); when(transportService.getThreadPool()).thenReturn(threadPool); From 781968b2e53f3214b73d4d8e7c1baa572b334f27 Mon Sep 17 00:00:00 2001 From: Siddhant Deshmukh Date: Thu, 19 Oct 2023 14:39:29 -0700 Subject: [PATCH 11/14] Categorize search queries by type and log query shape (#10724) * Search Query Categorizor initial skeleton using QueryBuilderVisitor Signed-off-by: Siddhant Deshmukh * Integrate metrics framework, add counters and log query shape Signed-off-by: Siddhant Deshmukh * Update changelog Signed-off-by: Siddhant Deshmukh * Add level attribute to QueryBuilderVisitor and as a tag in Counters Signed-off-by: Siddhant Deshmukh * Log query shape as debug log Signed-off-by: Siddhant Deshmukh * Integrate metrics framework, refactor code and update tests Signed-off-by: Siddhant Deshmukh * Fix build Signed-off-by: Siddhant Deshmukh * Add javadocs Signed-off-by: Siddhant Deshmukh * Minor fix Signed-off-by: Siddhant Deshmukh * Spotless check changes Signed-off-by: Siddhant Deshmukh * Address comments, add agg and sort counters, add feature flag, refactoring Signed-off-by: Siddhant Deshmukh * Build fix Signed-off-by: Siddhant Deshmukh * spotless check Signed-off-by: Siddhant Deshmukh * Fix tests Signed-off-by: Siddhant Deshmukh * Dynamic feature flag with callback Signed-off-by: Siddhant Deshmukh * Minor fix Signed-off-by: Siddhant Deshmukh * Add initialization in callback Signed-off-by: Siddhant Deshmukh * Address comments Signed-off-by: Siddhant Deshmukh * Add exception handling Signed-off-by: Siddhant Deshmukh * Refactoring and renaming Signed-off-by: Siddhant Deshmukh * Minor fix Signed-off-by: Siddhant Deshmukh * Fix changelog and minor refactoring Signed-off-by: Siddhant Deshmukh * Address review comments Signed-off-by: Siddhant Deshmukh * Add unit tests Signed-off-by: Siddhant Deshmukh * Address review comments and add complex query unit test Signed-off-by: Siddhant Deshmukh * Add sort order as a tag to sort counter Signed-off-by: Siddhant Deshmukh * Address review comments Signed-off-by: Siddhant Deshmukh * Address final comments Signed-off-by: Siddhant Deshmukh * Build fix Signed-off-by: Siddhant Deshmukh * Fix build tests failure Signed-off-by: Siddhant Deshmukh * Minor fix Signed-off-by: Siddhant Deshmukh * Minor fix Signed-off-by: Siddhant Deshmukh * Empty commit Signed-off-by: Siddhant Deshmukh * Remove extra newline Signed-off-by: Michael Froh * Empty commit Signed-off-by: Siddhant Deshmukh --------- Signed-off-by: Siddhant Deshmukh Signed-off-by: Michael Froh Co-authored-by: Michael Froh --- CHANGELOG.md | 1 + .../action/search/SearchQueryCategorizer.java | 81 +++++++ .../SearchQueryCategorizingVisitor.java | 73 ++++++ .../action/search/SearchQueryCounters.java | 117 +++++++++ .../action/search/TransportSearchAction.java | 36 ++- .../common/settings/ClusterSettings.java | 1 + .../index/query/QueryShapeVisitor.java | 86 +++++++ .../search/SearchQueryCategorizerTests.java | 228 ++++++++++++++++++ .../index/query/QueryShapeVisitorTests.java | 31 +++ .../snapshots/SnapshotResiliencyTests.java | 4 +- 10 files changed, 656 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/search/SearchQueryCategorizer.java create mode 100644 server/src/main/java/org/opensearch/action/search/SearchQueryCategorizingVisitor.java create mode 100644 server/src/main/java/org/opensearch/action/search/SearchQueryCounters.java create mode 100644 server/src/main/java/org/opensearch/index/query/QueryShapeVisitor.java create mode 100644 server/src/test/java/org/opensearch/action/search/SearchQueryCategorizerTests.java create mode 100644 server/src/test/java/org/opensearch/index/query/QueryShapeVisitorTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ad18b94f31b7..552c277789dd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) +- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryCategorizer.java b/server/src/main/java/org/opensearch/action/search/SearchQueryCategorizer.java new file mode 100644 index 0000000000000..9cbe2d2ffcb7d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryCategorizer.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilderVisitor; +import org.opensearch.index.query.QueryShapeVisitor; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.sort.SortBuilder; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.List; +import java.util.ListIterator; + +/** + * Class to categorize the search queries based on the type and increment the relevant counters. + * Class also logs the query shape. + */ +final class SearchQueryCategorizer { + + private static final Logger log = LogManager.getLogger(SearchQueryCategorizer.class); + + final SearchQueryCounters searchQueryCounters; + + public SearchQueryCategorizer(MetricsRegistry metricsRegistry) { + searchQueryCounters = new SearchQueryCounters(metricsRegistry); + } + + public void categorize(SearchSourceBuilder source) { + QueryBuilder topLevelQueryBuilder = source.query(); + + logQueryShape(topLevelQueryBuilder); + incrementQueryTypeCounters(topLevelQueryBuilder); + incrementQueryAggregationCounters(source.aggregations()); + incrementQuerySortCounters(source.sorts()); + } + + private void incrementQuerySortCounters(List> sorts) { + if (sorts != null && sorts.size() > 0) { + for (ListIterator> it = sorts.listIterator(); it.hasNext();) { + SortBuilder sortBuilder = it.next(); + String sortOrder = sortBuilder.order().toString(); + searchQueryCounters.sortCounter.add(1, Tags.create().addTag("sort_order", sortOrder)); + } + } + } + + private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggregations) { + if (aggregations != null) { + searchQueryCounters.aggCounter.add(1); + } + } + + private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder) { + if (topLevelQueryBuilder == null) { + return; + } + QueryBuilderVisitor searchQueryVisitor = new SearchQueryCategorizingVisitor(searchQueryCounters); + topLevelQueryBuilder.visit(searchQueryVisitor); + } + + private void logQueryShape(QueryBuilder topLevelQueryBuilder) { + if (topLevelQueryBuilder == null) { + return; + } + QueryShapeVisitor shapeVisitor = new QueryShapeVisitor(); + topLevelQueryBuilder.visit(shapeVisitor); + log.debug("Query shape : {}", shapeVisitor.prettyPrintTree(" ")); + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryCategorizingVisitor.java b/server/src/main/java/org/opensearch/action/search/SearchQueryCategorizingVisitor.java new file mode 100644 index 0000000000000..98f0169e69a5c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryCategorizingVisitor.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.lucene.search.BooleanClause; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchPhraseQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.MultiMatchQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilderVisitor; +import org.opensearch.index.query.QueryStringQueryBuilder; +import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.index.query.RegexpQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.query.WildcardQueryBuilder; +import org.opensearch.index.query.functionscore.FunctionScoreQueryBuilder; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * Class to visit the querybuilder tree and also track the level information. + * Increments the counters related to Search Query type. + */ +final class SearchQueryCategorizingVisitor implements QueryBuilderVisitor { + private static final String LEVEL_TAG = "level"; + private final int level; + private final SearchQueryCounters searchQueryCounters; + + public SearchQueryCategorizingVisitor(SearchQueryCounters searchQueryCounters) { + this(searchQueryCounters, 0); + } + + private SearchQueryCategorizingVisitor(SearchQueryCounters counters, int level) { + this.searchQueryCounters = counters; + this.level = level; + } + + public void accept(QueryBuilder qb) { + if (qb instanceof BoolQueryBuilder) { + searchQueryCounters.boolCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof FunctionScoreQueryBuilder) { + searchQueryCounters.functionScoreCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof MatchQueryBuilder) { + searchQueryCounters.matchCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof MatchPhraseQueryBuilder) { + searchQueryCounters.matchPhrasePrefixCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof MultiMatchQueryBuilder) { + searchQueryCounters.multiMatchCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof QueryStringQueryBuilder) { + searchQueryCounters.queryStringQueryCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof RangeQueryBuilder) { + searchQueryCounters.rangeCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof RegexpQueryBuilder) { + searchQueryCounters.regexCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof TermQueryBuilder) { + searchQueryCounters.termCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else if (qb instanceof WildcardQueryBuilder) { + searchQueryCounters.wildcardCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } else { + searchQueryCounters.otherQueryCounter.add(1, Tags.create().addTag(LEVEL_TAG, level)); + } + } + + public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) { + return new SearchQueryCategorizingVisitor(searchQueryCounters, level + 1); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryCounters.java b/server/src/main/java/org/opensearch/action/search/SearchQueryCounters.java new file mode 100644 index 0000000000000..7e0259af07701 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryCounters.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; + +/** + * Class contains all the Counters related to search query types. + */ +final class SearchQueryCounters { + private static final String UNIT = "1"; + private final MetricsRegistry metricsRegistry; + + // Counters related to Query types + public final Counter aggCounter; + public final Counter boolCounter; + public final Counter functionScoreCounter; + public final Counter matchCounter; + public final Counter matchPhrasePrefixCounter; + public final Counter multiMatchCounter; + public final Counter otherQueryCounter; + public final Counter queryStringQueryCounter; + public final Counter rangeCounter; + public final Counter regexCounter; + + public final Counter sortCounter; + public final Counter skippedCounter; + public final Counter termCounter; + public final Counter totalCounter; + public final Counter wildcardCounter; + + public SearchQueryCounters(MetricsRegistry metricsRegistry) { + this.metricsRegistry = metricsRegistry; + this.aggCounter = metricsRegistry.createCounter( + "search.query.type.agg.count", + "Counter for the number of top level agg search queries", + UNIT + ); + this.boolCounter = metricsRegistry.createCounter( + "search.query.type.bool.count", + "Counter for the number of top level and nested bool search queries", + UNIT + ); + this.functionScoreCounter = metricsRegistry.createCounter( + "search.query.type.functionscore.count", + "Counter for the number of top level and nested function score search queries", + UNIT + ); + this.matchCounter = metricsRegistry.createCounter( + "search.query.type.match.count", + "Counter for the number of top level and nested match search queries", + UNIT + ); + this.matchPhrasePrefixCounter = metricsRegistry.createCounter( + "search.query.type.matchphrase.count", + "Counter for the number of top level and nested match phrase prefix search queries", + UNIT + ); + this.multiMatchCounter = metricsRegistry.createCounter( + "search.query.type.multimatch.count", + "Counter for the number of top level and nested multi match search queries", + UNIT + ); + this.otherQueryCounter = metricsRegistry.createCounter( + "search.query.type.other.count", + "Counter for the number of top level and nested search queries that do not match any other categories", + UNIT + ); + this.queryStringQueryCounter = metricsRegistry.createCounter( + "search.query.type.querystringquery.count", + "Counter for the number of top level and nested queryStringQuery search queries", + UNIT + ); + this.rangeCounter = metricsRegistry.createCounter( + "search.query.type.range.count", + "Counter for the number of top level and nested range search queries", + UNIT + ); + this.regexCounter = metricsRegistry.createCounter( + "search.query.type.regex.count", + "Counter for the number of top level and nested regex search queries", + UNIT + ); + this.skippedCounter = metricsRegistry.createCounter( + "search.query.type.skipped.count", + "Counter for the number queries skipped due to error", + UNIT + ); + this.sortCounter = metricsRegistry.createCounter( + "search.query.type.sort.count", + "Counter for the number of top level sort search queries", + UNIT + ); + this.termCounter = metricsRegistry.createCounter( + "search.query.type.term.count", + "Counter for the number of top level and nested term search queries", + UNIT + ); + this.totalCounter = metricsRegistry.createCounter( + "search.query.type.total.count", + "Counter for the number of top level and nested search queries", + UNIT + ); + this.wildcardCounter = metricsRegistry.createCounter( + "search.query.type.wildcard.count", + "Counter for the number of top level and nested wildcard search queries", + UNIT + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 284f71bd9da62..a6fb8453af4ff 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -88,6 +88,7 @@ import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterAware; import org.opensearch.transport.RemoteClusterService; @@ -137,6 +138,13 @@ public class TransportSearchAction extends HandledTransportAction SEARCH_QUERY_METRICS_ENABLED_SETTING = Setting.boolSetting( + "search.query.metrics.enabled", + false, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + // cluster level setting for timeout based search cancellation. If search request level parameter is present then that will take // precedence over the cluster setting value public static final String SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY = "search.cancel_after_time_interval"; @@ -177,8 +185,14 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -211,6 +226,17 @@ public TransportSearchAction( this.isRequestStatsEnabled = clusterService.getClusterSettings().get(SEARCH_REQUEST_STATS_ENABLED); clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setIsRequestStatsEnabled); this.searchRequestStats = searchRequestStats; + this.metricsRegistry = metricsRegistry; + this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled); + } + + private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) { + this.searchQueryMetricsEnabled = searchQueryMetricsEnabled; + if ((this.searchQueryMetricsEnabled == true) && this.searchQueryCategorizer == null) { + this.searchQueryCategorizer = new SearchQueryCategorizer(metricsRegistry); + } } private void setIsRequestStatsEnabled(boolean isRequestStatsEnabled) { @@ -489,6 +515,14 @@ private void executeRequest( return; } + if (searchQueryMetricsEnabled) { + try { + searchQueryCategorizer.categorize(searchRequest.source()); + } catch (Exception e) { + logger.error("Error while trying to categorize the query.", e); + } + } + ActionListener rewriteListener = ActionListener.wrap(source -> { if (source != searchRequest.source()) { // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 90f91dcb7c553..76883c200542e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -377,6 +377,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED, TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED, + TransportSearchAction.SEARCH_QUERY_METRICS_ENABLED_SETTING, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/index/query/QueryShapeVisitor.java b/server/src/main/java/org/opensearch/index/query/QueryShapeVisitor.java new file mode 100644 index 0000000000000..3ba13bc7a2da4 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/query/QueryShapeVisitor.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.query; + +import org.apache.lucene.search.BooleanClause; +import org.opensearch.common.SetOnce; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * Class to traverse the QueryBuilder tree and capture the query shape + */ +public final class QueryShapeVisitor implements QueryBuilderVisitor { + private final SetOnce queryType = new SetOnce<>(); + private final Map> childVisitors = new EnumMap<>(BooleanClause.Occur.class); + + @Override + public void accept(QueryBuilder qb) { + queryType.set(qb.getName()); + } + + @Override + public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) { + // Should get called once per Occur value + if (childVisitors.containsKey(occur)) { + throw new IllegalStateException("child visitor already called for " + occur); + } + final List childVisitorList = new ArrayList<>(); + QueryBuilderVisitor childVisitorWrapper = new QueryBuilderVisitor() { + QueryShapeVisitor currentChild; + + @Override + public void accept(QueryBuilder qb) { + currentChild = new QueryShapeVisitor(); + childVisitorList.add(currentChild); + currentChild.accept(qb); + } + + @Override + public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) { + return currentChild.getChildVisitor(occur); + } + }; + childVisitors.put(occur, childVisitorList); + return childVisitorWrapper; + } + + String toJson() { + StringBuilder outputBuilder = new StringBuilder("{\"type\":\"").append(queryType.get()).append("\""); + for (Map.Entry> entry : childVisitors.entrySet()) { + outputBuilder.append(",\"").append(entry.getKey().name().toLowerCase(Locale.ROOT)).append("\"["); + boolean first = true; + for (QueryShapeVisitor child : entry.getValue()) { + if (!first) { + outputBuilder.append(","); + } + outputBuilder.append(child.toJson()); + first = false; + } + outputBuilder.append("]"); + } + outputBuilder.append("}"); + return outputBuilder.toString(); + } + + public String prettyPrintTree(String indent) { + StringBuilder outputBuilder = new StringBuilder(indent).append(queryType.get()).append("\n"); + for (Map.Entry> entry : childVisitors.entrySet()) { + outputBuilder.append(indent).append(" ").append(entry.getKey().name().toLowerCase(Locale.ROOT)).append(":\n"); + for (QueryShapeVisitor child : entry.getValue()) { + outputBuilder.append(child.prettyPrintTree(indent + " ")); + } + } + return outputBuilder.toString(); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryCategorizerTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryCategorizerTests.java new file mode 100644 index 0000000000000..a2e301143d694 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryCategorizerTests.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.BoostingQueryBuilder; +import org.opensearch.index.query.MatchNoneQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.MultiMatchQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.QueryStringQueryBuilder; +import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.index.query.RegexpQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.query.WildcardQueryBuilder; +import org.opensearch.index.query.functionscore.FunctionScoreQueryBuilder; +import org.opensearch.search.aggregations.bucket.range.RangeAggregationBuilder; +import org.opensearch.search.aggregations.bucket.terms.MultiTermsAggregationBuilder; +import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.sort.ScoreSortBuilder; +import org.opensearch.search.sort.SortOrder; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.Arrays; + +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +public final class SearchQueryCategorizerTests extends OpenSearchTestCase { + + private MetricsRegistry metricsRegistry; + + private SearchQueryCategorizer searchQueryCategorizer; + + @Before + public void setup() { + metricsRegistry = mock(MetricsRegistry.class); + when(metricsRegistry.createCounter(any(String.class), any(String.class), any(String.class))).thenAnswer( + invocation -> mock(Counter.class) + ); + searchQueryCategorizer = new SearchQueryCategorizer(metricsRegistry); + } + + public void testAggregationsQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.aggregation( + new MultiTermsAggregationBuilder("agg1").terms( + Arrays.asList( + new MultiTermsValuesSourceConfig.Builder().setFieldName("username").build(), + new MultiTermsValuesSourceConfig.Builder().setFieldName("rating").build() + ) + ) + ); + sourceBuilder.size(0); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.aggCounter).add(eq(1.0d)); + } + + public void testBoolQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + sourceBuilder.query(new BoolQueryBuilder().must(new MatchQueryBuilder("searchText", "fox"))); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.boolCounter).add(eq(1.0d), any(Tags.class)); + Mockito.verify(searchQueryCategorizer.searchQueryCounters.matchCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testFunctionScoreQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + sourceBuilder.query(new FunctionScoreQueryBuilder(QueryBuilders.prefixQuery("text", "bro"))); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.functionScoreCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testMatchQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + sourceBuilder.query(QueryBuilders.matchQuery("tags", "php")); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.matchCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testMatchPhraseQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + sourceBuilder.query(QueryBuilders.matchPhraseQuery("tags", "php")); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.matchPhrasePrefixCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testMultiMatchQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + sourceBuilder.query(new MultiMatchQueryBuilder("foo bar", "myField")); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.multiMatchCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testOtherQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + BoostingQueryBuilder queryBuilder = new BoostingQueryBuilder( + new TermQueryBuilder("unmapped_field", "foo"), + new MatchNoneQueryBuilder() + ); + sourceBuilder.query(queryBuilder); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.otherQueryCounter, times(2)).add(eq(1.0d), any(Tags.class)); + Mockito.verify(searchQueryCategorizer.searchQueryCounters.termCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testQueryStringQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder("foo:*"); + sourceBuilder.query(queryBuilder); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.queryStringQueryCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testRangeQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date"); + rangeQuery.gte("1970-01-01"); + rangeQuery.lt("1982-01-01"); + sourceBuilder.query(rangeQuery); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.rangeCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testRegexQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(new RegexpQueryBuilder("field", "text")); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.regexCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testSortQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(QueryBuilders.matchQuery("tags", "ruby")); + sourceBuilder.sort("creationDate", SortOrder.DESC); + sourceBuilder.sort(new ScoreSortBuilder()); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.matchCounter).add(eq(1.0d), any(Tags.class)); + Mockito.verify(searchQueryCategorizer.searchQueryCounters.sortCounter, times(2)).add(eq(1.0d), any(Tags.class)); + } + + public void testTermQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + sourceBuilder.query(QueryBuilders.termQuery("field", "value2")); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.termCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testWildcardQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + sourceBuilder.query(new WildcardQueryBuilder("field", "text")); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.wildcardCounter).add(eq(1.0d), any(Tags.class)); + } + + public void testComplexQuery() { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(50); + + TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("field", "value2"); + MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("tags", "php"); + RegexpQueryBuilder regexpQueryBuilder = new RegexpQueryBuilder("field", "text"); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder().must(termQueryBuilder) + .filter(matchQueryBuilder) + .should(regexpQueryBuilder); + sourceBuilder.query(boolQueryBuilder); + sourceBuilder.aggregation(new RangeAggregationBuilder("agg1").field("num")); + + searchQueryCategorizer.categorize(sourceBuilder); + + Mockito.verify(searchQueryCategorizer.searchQueryCounters.termCounter).add(eq(1.0d), any(Tags.class)); + Mockito.verify(searchQueryCategorizer.searchQueryCounters.matchCounter).add(eq(1.0d), any(Tags.class)); + Mockito.verify(searchQueryCategorizer.searchQueryCounters.regexCounter).add(eq(1.0d), any(Tags.class)); + Mockito.verify(searchQueryCategorizer.searchQueryCounters.boolCounter).add(eq(1.0d), any(Tags.class)); + Mockito.verify(searchQueryCategorizer.searchQueryCounters.aggCounter).add(eq(1.0d)); + } +} diff --git a/server/src/test/java/org/opensearch/index/query/QueryShapeVisitorTests.java b/server/src/test/java/org/opensearch/index/query/QueryShapeVisitorTests.java new file mode 100644 index 0000000000000..18b814aec61c2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/query/QueryShapeVisitorTests.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.query; + +import org.opensearch.test.OpenSearchTestCase; + +import static org.junit.Assert.assertEquals; + +public final class QueryShapeVisitorTests extends OpenSearchTestCase { + public void testQueryShapeVisitor() { + QueryBuilder builder = new BoolQueryBuilder().must(new TermQueryBuilder("foo", "bar")) + .filter(new ConstantScoreQueryBuilder(new RangeQueryBuilder("timestamp").from("12345677").to("2345678"))) + .should( + new BoolQueryBuilder().must(new MatchQueryBuilder("text", "this is some text")) + .mustNot(new RegexpQueryBuilder("color", "red.*")) + ) + .must(new TermsQueryBuilder("genre", "action", "drama", "romance")); + QueryShapeVisitor shapeVisitor = new QueryShapeVisitor(); + builder.visit(shapeVisitor); + assertEquals( + "{\"type\":\"bool\",\"must\"[{\"type\":\"term\"},{\"type\":\"terms\"}],\"filter\"[{\"type\":\"constant_score\",\"filter\"[{\"type\":\"range\"}]}],\"should\"[{\"type\":\"bool\",\"must\"[{\"type\":\"match\"}],\"must_not\"[{\"type\":\"regexp\"}]}]}", + shapeVisitor.toJson() + ); + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 97c5d23831965..2f9f38d18a064 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -222,6 +222,7 @@ import org.opensearch.search.query.QueryPhase; import org.opensearch.snapshots.mockstore.MockEventuallyConsistentRepository; import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.disruption.DisruptableMockTransport; @@ -2302,7 +2303,8 @@ public void onFailure(final Exception e) { List.of(), client ), - null + null, + NoopMetricsRegistry.INSTANCE ) ); actions.put( From 200ad5d28a577877be530ecab507601898025c5c Mon Sep 17 00:00:00 2001 From: Ticheng Lin <51488860+ticheng-aws@users.noreply.github.com> Date: Thu, 19 Oct 2023 19:58:13 -0700 Subject: [PATCH 12/14] Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight (#10352) * Fix timer race condition in profile rewrite and create weight for concurrent segment search (#10352) Signed-off-by: Ticheng Lin * Refactor and work on the PR comments (#10352) Signed-off-by: Ticheng Lin --------- Signed-off-by: Ticheng Lin --- CHANGELOG.md | 1 + .../search/profile/query/QueryProfilerIT.java | 157 +++++++++++++++++- .../opensearch/search/profile/Profilers.java | 7 +- .../org/opensearch/search/profile/Timer.java | 12 ++ .../query/AbstractQueryProfileTree.java | 5 +- .../ConcurrentQueryProfileBreakdown.java | 27 ++- .../query/ConcurrentQueryProfiler.java | 134 +++++++++++++++ .../search/profile/query/QueryProfiler.java | 14 +- .../ConcurrentQueryProfileBreakdownTests.java | 52 ++++++ .../query/ConcurrentQueryProfilerTests.java | 36 ++++ .../profile/query/QueryProfilerTests.java | 16 +- 11 files changed, 438 insertions(+), 23 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java create mode 100644 server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 552c277789dd7..5c52c43a35b8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) - Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) +- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java index 5f794d2abf878..ef73438114079 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java @@ -32,6 +32,8 @@ package org.opensearch.search.profile.query; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.lucene.tests.util.English; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; @@ -40,20 +42,23 @@ import org.opensearch.action.search.SearchType; import org.opensearch.action.search.ShardSearchFailure; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; import org.opensearch.search.profile.ProfileResult; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.sort.SortOrder; -import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedOpenSearchIntegTestCase; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder; import static org.hamcrest.Matchers.emptyOrNullString; import static org.hamcrest.Matchers.equalTo; @@ -61,8 +66,32 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase { + private final boolean concurrentSearchEnabled; + private static final String MAX_PREFIX = "max_"; + private static final String MIN_PREFIX = "min_"; + private static final String AVG_PREFIX = "avg_"; + private static final String TIMING_TYPE_COUNT_SUFFIX = "_count"; + + public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) { + super(settings); + this.concurrentSearchEnabled = concurrentSearchEnabled; + } -public class QueryProfilerIT extends OpenSearchIntegTestCase { + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true } + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } /** * This test simply checks to make sure nothing crashes. Test indexes 100-150 documents, @@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception { assertEquals(result.getLuceneDescription(), "field1:one"); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -271,6 +301,7 @@ public void testBool() throws Exception { assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); assertEquals(result.getProfiledChildren().size(), 2); + assertQueryProfileResult(result); // Check the children List children = result.getProfiledChildren(); @@ -282,12 +313,14 @@ public void testBool() throws Exception { assertThat(childProfile.getTime(), greaterThan(0L)); assertNotNull(childProfile.getTimeBreakdown()); assertEquals(childProfile.getProfiledChildren().size(), 0); + assertQueryProfileResult(childProfile); childProfile = children.get(1); assertEquals(childProfile.getQueryName(), "TermQuery"); assertEquals(childProfile.getLuceneDescription(), "field1:two"); assertThat(childProfile.getTime(), greaterThan(0L)); assertNotNull(childProfile.getTimeBreakdown()); + assertQueryProfileResult(childProfile); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -415,6 +450,90 @@ public void testBoosting() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); + } + + CollectorResult result = searchProfiles.getCollectorResult(); + assertThat(result.getName(), is(not(emptyOrNullString()))); + assertThat(result.getTime(), greaterThan(0L)); + } + } + } + + public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception { + createIndex("test"); + ensureGreen(); + + int numDocs = 122; + IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; i++) { + docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i); + } + + List terms = Arrays.asList("zero", "zero", "one"); + + indexRandom(true, docs); + + refresh(); + + QueryBuilder q = QueryBuilders.boostingQuery( + QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())), + QueryBuilders.termsQuery("field1", terms) + ).boost(randomFloat()).negativeBoost(randomFloat()); + logger.info("Query: {}", q); + + SearchResponse resp = client().prepareSearch() + .setQuery(q) + .setTrackTotalHits(true) + .setProfile(true) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .get(); + + assertNotNull("Profile response element should not be null", resp.getProfileResults()); + assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0)); + + for (Map.Entry shardResult : resp.getProfileResults().entrySet()) { + assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L)); + assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L)); + for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) { + List results = searchProfiles.getQueryResults(); + for (ProfileResult result : results) { + assertNotNull(result.getQueryName()); + assertNotNull(result.getLuceneDescription()); + assertThat(result.getTime(), greaterThan(0L)); + Map breakdown = result.getTimeBreakdown(); + Long maxSliceTime = result.getMaxSliceTime(); + Long minSliceTime = result.getMinSliceTime(); + Long avgSliceTime = result.getAvgSliceTime(); + if (concurrentSearchEnabled && results.get(0).equals(result)) { + assertNotNull(maxSliceTime); + assertNotNull(minSliceTime); + assertNotNull(avgSliceTime); + assertThat(breakdown.size(), equalTo(66)); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + if (queryTimingType != QueryTimingType.CREATE_WEIGHT) { + String maxTimingType = MAX_PREFIX + queryTimingType; + String minTimingType = MIN_PREFIX + queryTimingType; + String avgTimingType = AVG_PREFIX + queryTimingType; + assertNotNull(breakdown.get(maxTimingType)); + assertNotNull(breakdown.get(minTimingType)); + assertNotNull(breakdown.get(avgTimingType)); + assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX)); + } + } + } else if (concurrentSearchEnabled) { + assertThat(maxSliceTime, equalTo(0L)); + assertThat(minSliceTime, equalTo(0L)); + assertThat(avgSliceTime, equalTo(0L)); + assertThat(breakdown.size(), equalTo(27)); + } else { + assertThat(maxSliceTime, is(nullValue())); + assertThat(minSliceTime, is(nullValue())); + assertThat(avgSliceTime, is(nullValue())); + assertThat(breakdown.size(), equalTo(27)); + } } CollectorResult result = searchProfiles.getCollectorResult(); @@ -455,6 +574,7 @@ public void testDisMaxRange() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -494,6 +614,7 @@ public void testRange() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -547,6 +668,7 @@ public void testPhrase() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -579,4 +701,35 @@ public void testNoProfile() throws Exception { assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0)); } + private void assertQueryProfileResult(ProfileResult result) { + Map breakdown = result.getTimeBreakdown(); + Long maxSliceTime = result.getMaxSliceTime(); + Long minSliceTime = result.getMinSliceTime(); + Long avgSliceTime = result.getAvgSliceTime(); + if (concurrentSearchEnabled) { + assertNotNull(maxSliceTime); + assertNotNull(minSliceTime); + assertNotNull(avgSliceTime); + assertThat(breakdown.size(), equalTo(66)); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + if (queryTimingType != QueryTimingType.CREATE_WEIGHT) { + String maxTimingType = MAX_PREFIX + queryTimingType; + String minTimingType = MIN_PREFIX + queryTimingType; + String avgTimingType = AVG_PREFIX + queryTimingType; + assertNotNull(breakdown.get(maxTimingType)); + assertNotNull(breakdown.get(minTimingType)); + assertNotNull(breakdown.get(avgTimingType)); + assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX)); + } + } + } else { + assertThat(maxSliceTime, is(nullValue())); + assertThat(minSliceTime, is(nullValue())); + assertThat(avgSliceTime, is(nullValue())); + assertThat(breakdown.size(), equalTo(27)); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/profile/Profilers.java b/server/src/main/java/org/opensearch/search/profile/Profilers.java index 8e87c7ff4acd4..68cf05c988b5b 100644 --- a/server/src/main/java/org/opensearch/search/profile/Profilers.java +++ b/server/src/main/java/org/opensearch/search/profile/Profilers.java @@ -35,6 +35,9 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.profile.aggregation.AggregationProfiler; import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler; +import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; +import org.opensearch.search.profile.query.ConcurrentQueryProfiler; +import org.opensearch.search.profile.query.InternalQueryProfileTree; import org.opensearch.search.profile.query.QueryProfiler; import java.util.ArrayList; @@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc /** Switch to a new profile. */ public QueryProfiler addQueryProfiler() { - QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled); + QueryProfiler profiler = isConcurrentSegmentSearchEnabled + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); queryProfilers.add(profiler); return profiler; diff --git a/server/src/main/java/org/opensearch/search/profile/Timer.java b/server/src/main/java/org/opensearch/search/profile/Timer.java index 172762cabeb6a..864c689cf7fa0 100644 --- a/server/src/main/java/org/opensearch/search/profile/Timer.java +++ b/server/src/main/java/org/opensearch/search/profile/Timer.java @@ -53,6 +53,18 @@ public class Timer { private boolean doTiming; private long timing, count, lastCount, start, earliestTimerStartTime; + public Timer() { + this(0, 0, 0, 0, 0); + } + + public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) { + this.timing = timing; + this.count = count; + this.lastCount = lastCount; + this.start = start; + this.earliestTimerStartTime = earliestTimerStartTime; + } + /** pkg-private for testing */ long nanoTime() { return System.nanoTime(); diff --git a/server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java b/server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java index 8e825def13f5d..2f5d632ee2d87 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java @@ -54,14 +54,11 @@ public void startRewriteTime() { * startRewriteTime() must be called for a particular context prior to calling * stopAndAddRewriteTime(), otherwise the elapsed time will be negative and * nonsensical - * - * @return The elapsed time */ - public long stopAndAddRewriteTime() { + public void stopAndAddRewriteTime() { long time = Math.max(1, System.nanoTime() - rewriteScratch); rewriteTime += time; rewriteScratch = 0; - return time; } public long getRewriteTime() { diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java index e567fdd2d436c..59ef01f9f947a 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java @@ -70,7 +70,7 @@ public Map toBreakdownMap() { ); final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString()); - if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) { + if (contexts.isEmpty()) { // If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the // create_weight time/count queryNodeTime = createWeightTime; @@ -78,6 +78,21 @@ public Map toBreakdownMap() { minSliceNodeTime = 0L; avgSliceNodeTime = 0L; return buildDefaultQueryBreakdownMap(createWeightTime); + } else if (sliceCollectorsToLeaves.isEmpty()) { + // This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It + // creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for + // the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later + // in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no + // concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination. + AbstractProfileBreakdown breakdown = contexts.values().iterator().next(); + queryNodeTime = breakdown.toNodeTime() + createWeightTime; + maxSliceNodeTime = 0L; + minSliceNodeTime = 0L; + avgSliceNodeTime = 0L; + Map queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap()); + queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime); + queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L); + return queryBreakdownMap; } // first create the slice level breakdowns @@ -191,10 +206,12 @@ Map> buildSliceLevelBreakdown() { } // compute sliceMaxEndTime as max of sliceEndTime across all timing types sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE)); - sliceMinStartTime = Math.min( - sliceMinStartTime, - currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE) - ); + long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE); + if (currentSliceStartTime == 0L) { + // The timer for the current timing type never starts, so we continue here + continue; + } + sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime); // compute total time for each timing type at slice level using sliceEndTime and sliceStartTime currentSliceBreakdown.put( timingType.toString(), diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java new file mode 100644 index 0000000000000..42bf23bb13fbe --- /dev/null +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.apache.lucene.search.Query; +import org.opensearch.search.profile.ContextualProfileBreakdown; +import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.Timer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class acts as a thread-local storage for profiling a query with concurrent execution + * + * @opensearch.internal + */ +public final class ConcurrentQueryProfiler extends QueryProfiler { + + private final Map threadToProfileTree; + // The LinkedList does not need to be thread safe, as the map associates thread IDs with LinkedList, and only + // one thread will access the LinkedList at a time. + private final Map> threadToRewriteTimers; + + public ConcurrentQueryProfiler(AbstractQueryProfileTree profileTree) { + super(profileTree); + long threadId = getCurrentThreadId(); + // We utilize LinkedHashMap to preserve the insertion order of the profiled queries + threadToProfileTree = Collections.synchronizedMap(new LinkedHashMap<>()); + threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); + threadToRewriteTimers = new ConcurrentHashMap<>(); + threadToRewriteTimers.put(threadId, new LinkedList<>()); + } + + @Override + public ContextualProfileBreakdown getQueryBreakdown(Query query) { + ConcurrentQueryProfileTree profileTree = threadToProfileTree.computeIfAbsent( + getCurrentThreadId(), + k -> new ConcurrentQueryProfileTree() + ); + return profileTree.getProfileBreakdown(query); + } + + /** + * Removes the last (e.g. most recent) element on ConcurrentQueryProfileTree stack. + */ + @Override + public void pollLastElement() { + ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(getCurrentThreadId()); + if (concurrentProfileTree != null) { + concurrentProfileTree.pollLast(); + } + } + + /** + * @return a hierarchical representation of the profiled tree + */ + @Override + public List getTree() { + List profileResults = new ArrayList<>(); + for (Map.Entry profile : threadToProfileTree.entrySet()) { + profileResults.addAll(profile.getValue().getTree()); + } + return profileResults; + } + + /** + * Begin timing the rewrite phase of a request + */ + @Override + public void startRewriteTime() { + Timer rewriteTimer = new Timer(); + threadToRewriteTimers.computeIfAbsent(getCurrentThreadId(), k -> new LinkedList<>()).add(rewriteTimer); + rewriteTimer.start(); + } + + /** + * Stop recording the current rewrite timer + */ + public void stopAndAddRewriteTime() { + Timer rewriteTimer = threadToRewriteTimers.get(getCurrentThreadId()).getLast(); + rewriteTimer.stop(); + } + + /** + * @return total time taken to rewrite all queries in this concurrent query profiler + */ + @Override + public long getRewriteTime() { + long totalRewriteTime = 0L; + List rewriteTimers = new LinkedList<>(); + threadToRewriteTimers.values().forEach(rewriteTimers::addAll); + LinkedList mergedIntervals = mergeRewriteTimeIntervals(rewriteTimers); + for (long[] interval : mergedIntervals) { + totalRewriteTime += interval[1] - interval[0]; + } + return totalRewriteTime; + } + + // package private for unit testing + LinkedList mergeRewriteTimeIntervals(List timers) { + LinkedList mergedIntervals = new LinkedList<>(); + timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); + for (Timer timer : timers) { + long startTime = timer.getEarliestTimerStartTime(); + long endTime = startTime + timer.getApproximateTiming(); + if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < startTime) { + long[] interval = new long[2]; + interval[0] = startTime; + interval[1] = endTime; + mergedIntervals.add(interval); + } else { + mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); + } + } + return mergedIntervals; + } + + private long getCurrentThreadId() { + return Thread.currentThread().getId(); + } +} diff --git a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java index a80ce1c658081..332c4b3551450 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java @@ -51,15 +51,15 @@ * * @opensearch.internal */ -public final class QueryProfiler extends AbstractProfiler, Query> { +public class QueryProfiler extends AbstractProfiler, Query> { /** * The root Collector used in the search */ private InternalProfileComponent collector; - public QueryProfiler(boolean concurrent) { - super(concurrent ? new ConcurrentQueryProfileTree() : new InternalQueryProfileTree()); + public QueryProfiler(AbstractQueryProfileTree profileTree) { + super(profileTree); } /** Set the collector that is associated with this profiler. */ @@ -81,14 +81,14 @@ public void startRewriteTime() { /** * Stop recording the current rewrite and add it's time to the total tally, returning the * cumulative time so far. - * - * @return cumulative rewrite time */ - public long stopAndAddRewriteTime() { - return ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); + public void stopAndAddRewriteTime() { + ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); } /** + * The rewriting process is complex and hard to display because queries can undergo significant changes. + * Instead of showing intermediate results, we display the cumulative time for the non-concurrent search case. * @return total time taken to rewrite all queries in this profile */ public long getRewriteTime() { diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java index f29ba3b0cea07..db14eb90ef839 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java @@ -333,6 +333,58 @@ public void testBreakDownMapWithMultipleSlicesAndOneSliceWithNoLeafContext() thr directory.close(); } + public void testOneLeafContextWithEmptySliceCollectorsToLeaves() throws Exception { + final DirectoryReader directoryReader = getDirectoryReader(1); + final Directory directory = directoryReader.directory(); + final long createWeightEarliestStartTime = createWeightTimer.getEarliestTimerStartTime(); + final long createWeightEndTime = createWeightEarliestStartTime + createWeightTimer.getApproximateTiming(); + final Map leafProfileBreakdownMap_1 = getLeafBreakdownMap(createWeightEndTime + 10, 10, 1); + final AbstractProfileBreakdown leafProfileBreakdown_1 = new TestQueryProfileBreakdown( + QueryTimingType.class, + leafProfileBreakdownMap_1 + ); + testQueryProfileBreakdown.getContexts().put(directoryReader.leaves().get(0), leafProfileBreakdown_1); + final Map queryBreakDownMap = testQueryProfileBreakdown.toBreakdownMap(); + assertFalse(queryBreakDownMap == null || queryBreakDownMap.isEmpty()); + assertEquals(26, queryBreakDownMap.size()); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + String timingTypeKey = queryTimingType.toString(); + String timingTypeCountKey = queryTimingType + TIMING_TYPE_COUNT_SUFFIX; + + if (queryTimingType.equals(QueryTimingType.CREATE_WEIGHT)) { + final long createWeightTime = queryBreakDownMap.get(timingTypeKey); + assertEquals(createWeightTimer.getApproximateTiming(), createWeightTime); + assertEquals(1, (long) queryBreakDownMap.get(timingTypeCountKey)); + // verify there is no min/max/avg for weight type stats + assertFalse( + queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeCountKey) + ); + continue; + } + assertNotNull(queryBreakDownMap.get(timingTypeKey)); + assertNotNull(queryBreakDownMap.get(timingTypeCountKey)); + // verify there is no min/max/avg for current breakdown type stats + assertFalse( + queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeCountKey) + ); + } + assertEquals(0, testQueryProfileBreakdown.getMaxSliceNodeTime()); + assertEquals(0, testQueryProfileBreakdown.getMinSliceNodeTime()); + assertEquals(0, testQueryProfileBreakdown.getAvgSliceNodeTime()); + directoryReader.close(); + directory.close(); + } + private Map getLeafBreakdownMap(long startTime, long timeTaken, long count) { Map leafBreakDownMap = new HashMap<>(); for (QueryTimingType timingType : QueryTimingType.values()) { diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java new file mode 100644 index 0000000000000..736bbcdd9e8dd --- /dev/null +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.opensearch.search.profile.Timer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.LinkedList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class ConcurrentQueryProfilerTests extends OpenSearchTestCase { + + public void testMergeRewriteTimeIntervals() { + ConcurrentQueryProfiler profiler = new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()); + List timers = new LinkedList<>(); + timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); + LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); + assertThat(mergedIntervals.size(), equalTo(2)); + long[] interval = mergedIntervals.get(0); + assertThat(interval[0], equalTo(553074509287335L)); + assertThat(interval[1], equalTo(553074509516290L)); + interval = mergedIntervals.get(1); + assertThat(interval[0], equalTo(553074511206907L)); + assertThat(interval[1], equalTo(553074511424041L)); + } +} diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index 64a440b85eb10..481a224f2ff0e 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -161,7 +161,9 @@ public void tearDown() throws Exception { } public void testBasic() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1); @@ -228,7 +230,9 @@ public void testBasic() throws IOException { } public void testNoScoring() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1, Sort.INDEXORDER); // scores are not needed @@ -295,7 +299,9 @@ public void testNoScoring() throws IOException { } public void testUseIndexStats() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.count(query); // will use index stats @@ -309,7 +315,9 @@ public void testUseIndexStats() throws IOException { } public void testApproximations() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new RandomApproximationQuery(new TermQuery(new Term("foo", "bar")), random()); searcher.count(query); From 41a12e28fff58c7262bfeb725c8a31e5ffa266f3 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 19 Oct 2023 20:33:32 -0700 Subject: [PATCH 13/14] Make RemoteStoreReplicationSource#getSegmentFiles asynchronous (#10725) * Make RemoteStoreReplicationSource#getSegmentFiles asynchronous Also make the remote store download process cancellable in case the replication event is canceled. Signed-off-by: Andrew Ross * Add ITs ensuring segRep targets are cleaned up on cancellation during metadata and segment fetch steps. Signed-off-by: Marc Handalian * Wrap metadata fetch in cancellableThreads.executeIO Signed-off-by: Marc Handalian * self review Signed-off-by: Marc Handalian * spotless Signed-off-by: Marc Handalian * Add missing node settings when bootstrapping nodes in tests. Signed-off-by: Marc Handalian --------- Signed-off-by: Andrew Ross Signed-off-by: Marc Handalian Co-authored-by: Andrew Ross --- ...emoteStoreMockRepositoryIntegTestCase.java | 5 + ...plicationUsingRemoteStoreDisruptionIT.java | 133 ++++++++++++++++++ .../store/RemoteStoreFileDownloader.java | 72 ++++++---- .../RemoteStoreReplicationSource.java | 68 +++++---- .../replication/common/ReplicationTarget.java | 11 +- .../store/RemoteStoreFileDownloaderTests.java | 125 ++++++++++++++-- .../AbstractSnapshotIntegTestCase.java | 6 + .../snapshots/mockstore/MockRepository.java | 15 ++ 8 files changed, 373 insertions(+), 62 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java index 2053800504c89..8166c0008ed83 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java @@ -114,6 +114,10 @@ protected void cleanupRepo() { } protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) { + return setup(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure, 0); + } + + protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure, int replicaCount) { // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the // repository creation can happen without failure. @@ -128,6 +132,7 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep internalCluster().startClusterManagerOnlyNode(settings.build()); String dataNodeName = internalCluster().startDataOnlyNode(settings.build()); + internalCluster().startDataOnlyNodes(replicaCount, settings.build()); createIndex(INDEX_NAME); logger.info("--> Created index={}", INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java new file mode 100644 index 0000000000000..b7b3f1d14f422 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Optional; +import java.util.Set; + +/** + * This class runs tests with remote store + segRep while blocking file downloads + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationUsingRemoteStoreDisruptionIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(1); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testCancelReplicationWhileSyncingSegments() throws Exception { + Path location = randomRepoPath().toAbsolutePath(); + setup(location, 0d, "metadata", Long.MAX_VALUE, 1); + + final Set dataNodeNames = internalCluster().getDataNodeNames(); + final String replicaNode = getNode(dataNodeNames, false); + final String primaryNode = getNode(dataNodeNames, true); + + SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode); + ensureGreen(INDEX_NAME); + blockNodeOnAnySegmentFile(REPOSITORY_NAME, replicaNode); + final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); + indexSingleDoc(); + refresh(INDEX_NAME); + waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); + final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); + assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage()); + ReplicationCollection.ReplicationRef segmentReplicationTargetReplicationRef = targetService.get( + state.getReplicationId() + ); + final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get(); + // close the target ref here otherwise it will hold a refcount + segmentReplicationTargetReplicationRef.close(); + assertNotNull(segmentReplicationTarget); + assertTrue(segmentReplicationTarget.refCount() > 0); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + assertBusy(() -> { + assertTrue(indexShard.routingEntry().primary()); + assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); + assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); + }); + unblockNode(REPOSITORY_NAME, replicaNode); + cleanupRepo(); + } + + public void testCancelReplicationWhileFetchingMetadata() throws Exception { + Path location = randomRepoPath().toAbsolutePath(); + setup(location, 0d, "metadata", Long.MAX_VALUE, 1); + + final Set dataNodeNames = internalCluster().getDataNodeNames(); + final String replicaNode = getNode(dataNodeNames, false); + final String primaryNode = getNode(dataNodeNames, true); + + SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode); + ensureGreen(INDEX_NAME); + blockNodeOnAnyFiles(REPOSITORY_NAME, replicaNode); + final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); + indexSingleDoc(); + refresh(INDEX_NAME); + waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); + final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); + assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); + ReplicationCollection.ReplicationRef segmentReplicationTargetReplicationRef = targetService.get( + state.getReplicationId() + ); + final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get(); + // close the target ref here otherwise it will hold a refcount + segmentReplicationTargetReplicationRef.close(); + assertNotNull(segmentReplicationTarget); + assertTrue(segmentReplicationTarget.refCount() > 0); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + assertBusy(() -> { + assertTrue(indexShard.routingEntry().primary()); + assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); + assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); + }); + unblockNode(REPOSITORY_NAME, replicaNode); + cleanupRepo(); + } + + private String getNode(Set dataNodeNames, boolean primary) { + assertEquals(2, dataNodeNames.size()); + for (String name : dataNodeNames) { + final IndexShard indexShard = getIndexShard(name, INDEX_NAME); + if (indexShard.routingEntry().primary() == primary) { + return name; + } + } + return null; + } + + private IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexService(index); + assertNotNull(indexService); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return shardId.map(indexService::getShard).orElse(null); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java index 4fc721f2b96b5..727c57afd289b 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java @@ -16,7 +16,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.logging.Loggers; -import org.opensearch.common.util.concurrent.UncategorizedExecutionException; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; @@ -51,9 +51,16 @@ public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, Recover * @param source The remote directory to copy segment files from * @param destination The local directory to copy segment files to * @param toDownloadSegments The list of segment files to download + * @param listener Callback listener to be notified upon completion */ - public void download(Directory source, Directory destination, Collection toDownloadSegments) throws IOException { - downloadInternal(source, destination, null, toDownloadSegments, () -> {}); + public void downloadAsync( + CancellableThreads cancellableThreads, + Directory source, + Directory destination, + Collection toDownloadSegments, + ActionListener listener + ) { + downloadInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener); } /** @@ -74,17 +81,37 @@ public void download( Directory secondDestination, Collection toDownloadSegments, Runnable onFileCompletion - ) throws IOException { - downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion); + ) throws InterruptedException, IOException { + final CancellableThreads cancellableThreads = new CancellableThreads(); + final PlainActionFuture listener = PlainActionFuture.newFuture(); + downloadInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener); + try { + listener.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new RuntimeException(e); + } catch (InterruptedException e) { + // If the blocking call on the PlainActionFuture itself is interrupted, then we must + // cancel the asynchronous work we were waiting on + cancellableThreads.cancel(e.getMessage()); + Thread.currentThread().interrupt(); + throw e; + } } private void downloadInternal( + CancellableThreads cancellableThreads, Directory source, Directory destination, @Nullable Directory secondDestination, Collection toDownloadSegments, - Runnable onFileCompletion - ) throws IOException { + Runnable onFileCompletion, + ActionListener listener + ) { final Queue queue = new ConcurrentLinkedQueue<>(toDownloadSegments); // Choose the minimum of: // - number of files to download @@ -95,25 +122,14 @@ private void downloadInternal( Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams()) ); logger.trace("Starting download of {} files with {} threads", queue.size(), threads); - final PlainActionFuture> listener = PlainActionFuture.newFuture(); - final ActionListener allFilesListener = new GroupedActionListener<>(listener, threads); + final ActionListener allFilesListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), threads); for (int i = 0; i < threads; i++) { - copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener); - } - try { - listener.actionGet(); - } catch (UncategorizedExecutionException e) { - // Any IOException will be double-wrapped so dig it out and throw it - if (e.getCause() instanceof ExecutionException) { - if (e.getCause().getCause() instanceof IOException) { - throw (IOException) e.getCause().getCause(); - } - } - throw e; + copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, allFilesListener); } } private void copyOneFile( + CancellableThreads cancellableThreads, Directory source, Directory destination, @Nullable Directory secondDestination, @@ -129,18 +145,20 @@ private void copyOneFile( threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY).submit(() -> { logger.trace("Downloading file {}", file); try { - destination.copyFrom(source, file, file, IOContext.DEFAULT); - onFileCompletion.run(); - if (secondDestination != null) { - secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT); - } + cancellableThreads.executeIO(() -> { + destination.copyFrom(source, file, file, IOContext.DEFAULT); + onFileCompletion.run(); + if (secondDestination != null) { + secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT); + } + }); } catch (Exception e) { // Clear the queue to stop any future processing, report the failure, then return queue.clear(); listener.onFailure(e); return; } - copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener); + copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, listener); }); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 12eabf1e6554f..b06b3e0497cf7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -15,6 +15,7 @@ import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.Version; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -24,11 +25,14 @@ import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -43,6 +47,7 @@ public class RemoteStoreReplicationSource implements SegmentReplicationSource { private final IndexShard indexShard; private final RemoteSegmentStoreDirectory remoteDirectory; + private final CancellableThreads cancellableThreads = new CancellableThreads(); public RemoteStoreReplicationSource(IndexShard indexShard) { this.indexShard = indexShard; @@ -61,7 +66,7 @@ public void getCheckpointMetadata( // TODO: Need to figure out a way to pass this information for segment metadata via remote store. try (final GatedCloseable segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) { final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion(); - RemoteSegmentMetadata mdFile = remoteDirectory.init(); + final RemoteSegmentMetadata mdFile = getRemoteSegmentMetadata(); // During initial recovery flow, the remote store might not // have metadata as primary hasn't uploaded anything yet. if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) { @@ -106,39 +111,50 @@ public void getSegmentFiles( } logger.debug("Downloading segment files from remote store {}", filesToFetch); - RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); - Collection directoryFiles = List.of(indexShard.store().directory().listAll()); - if (remoteSegmentMetadata != null) { - try { - indexShard.store().incRef(); - indexShard.remoteStore().incRef(); - final Directory storeDirectory = indexShard.store().directory(); - final List toDownloadSegmentNames = new ArrayList<>(); - for (StoreFileMetadata fileMetadata : filesToFetch) { - String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; - toDownloadSegmentNames.add(file); - } - indexShard.getFileDownloader() - .download( - remoteDirectory, - new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker), - toDownloadSegmentNames - ); - logger.debug("Downloaded segment files from remote store {}", filesToFetch); - } finally { - indexShard.store().decRef(); - indexShard.remoteStore().decRef(); + if (remoteMetadataExists()) { + final Directory storeDirectory = indexShard.store().directory(); + final Collection directoryFiles = List.of(storeDirectory.listAll()); + final List toDownloadSegmentNames = new ArrayList<>(); + for (StoreFileMetadata fileMetadata : filesToFetch) { + String file = fileMetadata.name(); + assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; + toDownloadSegmentNames.add(file); } + indexShard.getFileDownloader() + .downloadAsync( + cancellableThreads, + remoteDirectory, + new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker), + toDownloadSegmentNames, + ActionListener.map(listener, r -> new GetSegmentFilesResponse(filesToFetch)) + ); + } else { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } - listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); - } catch (Exception e) { + } catch (IOException | RuntimeException e) { listener.onFailure(e); } } + @Override + public void cancel() { + this.cancellableThreads.cancel("Canceled by target"); + } + @Override public String getDescription() { return "RemoteStoreReplicationSource"; } + + private boolean remoteMetadataExists() throws IOException { + final AtomicBoolean metadataExists = new AtomicBoolean(false); + cancellableThreads.executeIO(() -> metadataExists.set(remoteDirectory.readLatestMetadataFile() != null)); + return metadataExists.get(); + } + + private RemoteSegmentMetadata getRemoteSegmentMetadata() throws IOException { + AtomicReference mdFile = new AtomicReference<>(); + cancellableThreads.executeIO(() -> mdFile.set(remoteDirectory.init())); + return mdFile.get(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index ec6b4d06b32c3..aac59df4f6573 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -91,6 +91,9 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); + if (indexShard.indexSettings().isRemoteStoreEnabled()) { + indexShard.remoteStore().incRef(); + } } public long getId() { @@ -278,6 +281,12 @@ public abstract void writeFileChunk( ); protected void closeInternal() { - store.decRef(); + try { + store.decRef(); + } finally { + if (indexShard.indexSettings().isRemoteStoreEnabled()) { + indexShard.remoteStore().decRef(); + } + } } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java index 588d9e8bb13a2..6d8b3fe4d69fb 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java @@ -9,12 +9,18 @@ package org.opensearch.index.store; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.NIOFSDirectory; +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.test.OpenSearchTestCase; @@ -31,8 +37,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; public class RemoteStoreFileDownloaderTests extends OpenSearchTestCase { @@ -76,31 +84,132 @@ public void stopThreadPool() throws Exception { } public void testDownload() throws IOException { - fileDownloader.download(source, destination, files.keySet()); + final PlainActionFuture l = new PlainActionFuture<>(); + fileDownloader.downloadAsync(new CancellableThreads(), source, destination, files.keySet(), l); + l.actionGet(); assertContent(files, destination); } - public void testDownloadWithSecondDestination() throws IOException { + public void testDownloadWithSecondDestination() throws IOException, InterruptedException { fileDownloader.download(source, destination, secondDestination, files.keySet(), () -> {}); assertContent(files, destination); assertContent(files, secondDestination); } - public void testDownloadWithFileCompletionHandler() throws IOException { + public void testDownloadWithFileCompletionHandler() throws IOException, InterruptedException { final AtomicInteger counter = new AtomicInteger(0); fileDownloader.download(source, destination, null, files.keySet(), counter::incrementAndGet); assertContent(files, destination); assertEquals(files.size(), counter.get()); } - public void testDownloadNonExistentFile() { - assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, Set.of("not real"))); + public void testDownloadNonExistentFile() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + fileDownloader.downloadAsync(new CancellableThreads(), source, destination, Set.of("not real"), new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + assertEquals(NoSuchFileException.class, e.getClass()); + latch.countDown(); + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); } - public void testDownloadExtraNonExistentFile() { - List filesWithExtra = new ArrayList<>(files.keySet()); + public void testDownloadExtraNonExistentFile() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final List filesWithExtra = new ArrayList<>(files.keySet()); filesWithExtra.add("not real"); - assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, filesWithExtra)); + fileDownloader.downloadAsync(new CancellableThreads(), source, destination, filesWithExtra, new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + assertEquals(NoSuchFileException.class, e.getClass()); + latch.countDown(); + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + + public void testCancellable() { + final CancellableThreads cancellableThreads = new CancellableThreads(); + final PlainActionFuture blockingListener = new PlainActionFuture<>(); + final Directory blockingDestination = new FilterDirectory(destination) { + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) { + try { + Thread.sleep(60_000); // Will be interrupted + fail("Expected to be interrupted"); + } catch (InterruptedException e) { + throw new RuntimeException("Failed due to interrupt", e); + } + } + }; + fileDownloader.downloadAsync(cancellableThreads, source, blockingDestination, files.keySet(), blockingListener); + assertThrows( + "Expected to timeout due to blocking directory", + OpenSearchTimeoutException.class, + () -> blockingListener.actionGet(TimeValue.timeValueMillis(500)) + ); + cancellableThreads.cancel("test"); + assertThrows( + "Expected to complete with cancellation failure", + CancellableThreads.ExecutionCancelledException.class, + blockingListener::actionGet + ); + } + + public void testBlockingCallCanBeInterrupted() throws Exception { + final Directory blockingDestination = new FilterDirectory(destination) { + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) { + try { + Thread.sleep(60_000); // Will be interrupted + fail("Expected to be interrupted"); + } catch (InterruptedException e) { + throw new RuntimeException("Failed due to interrupt", e); + } + } + }; + final AtomicReference capturedException = new AtomicReference<>(); + final Thread thread = new Thread(() -> { + try { + fileDownloader.download(source, blockingDestination, null, files.keySet(), () -> {}); + } catch (Exception e) { + capturedException.set(e); + } + }); + thread.start(); + thread.interrupt(); + thread.join(); + assertEquals(InterruptedException.class, capturedException.get().getClass()); + } + + public void testIOException() throws IOException, InterruptedException { + final Directory failureDirectory = new FilterDirectory(destination) { + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { + throw new IOException("test"); + } + }; + assertThrows(IOException.class, () -> fileDownloader.download(source, failureDirectory, null, files.keySet(), () -> {})); + + final CountDownLatch latch = new CountDownLatch(1); + fileDownloader.downloadAsync(new CancellableThreads(), source, failureDirectory, files.keySet(), new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + assertEquals(IOException.class, e.getClass()); + latch.countDown(); + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); } private static void assertContent(Map expected, Directory destination) throws IOException { diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 1bb1e44a8a600..0ee889af5ce1a 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -331,6 +331,12 @@ public static void blockNodeOnAnyFiles(String repository, String nodeName) { ); } + public static void blockNodeOnAnySegmentFile(String repository, String nodeName) { + ((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName).repository(repository)).blockOnSegmentFiles( + true + ); + } + public static void blockDataNode(String repository, String nodeName) { ((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName).repository(repository)).blockOnDataFiles(true); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index 7db71c4be0968..72c4ba44d0a31 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -139,6 +139,8 @@ public long getFailureCount() { private volatile boolean blockOnDataFiles; + private volatile boolean blockOnSegmentFiles; + private volatile boolean blockOnDeleteIndexN; /** @@ -190,6 +192,7 @@ public MockRepository( maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false); blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); + blockOnSegmentFiles = metadata.settings().getAsBoolean("block_on_segment", false); blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false); randomPrefix = metadata.settings().get("random", "default"); waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L); @@ -237,6 +240,7 @@ public synchronized void unblock() { blocked = false; // Clean blocking flags, so we wouldn't try to block again blockOnDataFiles = false; + blockOnSegmentFiles = false; blockOnAnyFiles = false; blockAndFailOnWriteIndexFile = false; blockOnWriteIndexFile = false; @@ -259,6 +263,14 @@ public void setBlockOnAnyFiles(boolean blocked) { blockOnAnyFiles = blocked; } + public void blockOnSegmentFiles(boolean blocked) { + blockOnSegmentFiles = blocked; + } + + public void setBlockOnSegmentFiles(boolean blocked) { + blockOnSegmentFiles = blocked; + } + public void setBlockAndFailOnWriteSnapFiles(boolean blocked) { blockAndFailOnWriteSnapFile = blocked; } @@ -306,6 +318,7 @@ private synchronized boolean blockExecution() { boolean wasBlocked = false; try { while (blockOnDataFiles + || blockOnSegmentFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile @@ -407,6 +420,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException { blockExecutionAndMaybeWait(blobName); } else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) { blockExecutionAndFail(blobName); + } else if (blockOnSegmentFiles && blobName.contains(".si__")) { + blockExecutionAndMaybeWait(blobName); } } } From c400d84f0e884217454ddfcc1503d02e0b280fa9 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Fri, 20 Oct 2023 17:39:58 +0530 Subject: [PATCH 14/14] [Remote State] fix lock release before deletion is completed (#10611) * fix lock release before deletion is completed Signed-off-by: bansvaru --- .../remote/RemoteClusterStateService.java | 8 +++-- .../RemoteClusterStateServiceTests.java | 34 +++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index b9d06c8fbb1c1..96ce2fc779ea0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1072,7 +1072,8 @@ public void onFailure(Exception e) { * @param clusterUUID uuid of cluster state to refer to in remote * @param manifestsToRetain no of latest manifest files to keep in remote */ - private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + // package private for testing + void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { logger.info("Delete stale cluster metadata task is already in progress."); return; @@ -1109,8 +1110,9 @@ public void onFailure(Exception e) { } } ); - } finally { + } catch (Exception e) { deleteStaleMetadataRunning.set(false); + throw e; } } @@ -1190,7 +1192,7 @@ private void deleteStalePaths(String clusterName, String clusterUUID, List { String clusterName = clusterState.getClusterName().value(); - logger.info("Deleting stale cluster UUIDs data from remote [{}]", clusterName); + logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); Set allClustersUUIDsInRemote; try { allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value())); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 49b7f0ff8d1a9..433eac63e9580 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -62,6 +62,9 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.mockito.ArgumentCaptor; @@ -73,6 +76,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -1004,6 +1008,36 @@ public void testFileNames() { assertThat(splittedName[3], is("P")); } + public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { + BlobContainer blobContainer = mock(BlobContainer.class); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> { + callCount.incrementAndGet(); + if (latch.await(5000, TimeUnit.SECONDS) == false) { + throw new Exception("Timed out waiting for delete task queuing to complete"); + } + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + any(String.class), + any(int.class), + any(BlobContainer.BlobNameSortOrder.class), + any(ActionListener.class) + ); + + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + + latch.countDown(); + assertBusy(() -> assertEquals(1, callCount.get())); + } + private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { final BlobPath blobPath = mock(BlobPath.class); when((blobStoreRepository.basePath())).thenReturn(blobPath);