Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async segment file download support from remote store within OpenSearch core #9710

Merged

Conversation

kotwanikunal
Copy link
Member

Description

  • This PR builds on top of Add async blob read and download support using multiple streams #9592 and utilizes the new APIs within core
  • The APIs will be used within the RemoteSegmentStoreDirectory path to copy over segment files from the remote store.
  • Logically, the caller will not see any difference within the workings of download segments but the files will be parallelized and each file will support multiple streams for download due to the async nature of the APIs.

Related Issues

Resolves #8596

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff
  • Commit changes are listed out in CHANGELOG.md file (See: Changelog)

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Contributor

github-actions bot commented Sep 1, 2023

Compatibility status:

Checks if related components are compatible with change d31c829

Incompatible components

Incompatible components: [https://github.com/opensearch-project/k-nn.git]

Skipped components

Compatible components

Compatible components: [https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/security.git, https://github.com/opensearch-project/security-analytics.git, https://github.com/opensearch-project/custom-codecs.git, https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git, https://github.com/opensearch-project/neural-search.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/asynchronous-search.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/reporting.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer-rca.git]

@github-actions
Copy link
Contributor

github-actions bot commented Sep 1, 2023

Gradle Check (Jenkins) Run Completed with:

  • RESULT: UNSTABLE ❕
  • TEST FAILURES:
      1 org.opensearch.search.SearchWeightedRoutingIT.testShardRoutingWithNetworkDisruption_FailOpenEnabled
      1 org.opensearch.remotestore.SegmentReplicationUsingRemoteStoreIT.testRestartPrimary

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

Compatibility status:

Checks if related components are compatible with change 31ea3ec

Incompatible components

Incompatible components: [https://github.com/opensearch-project/security.git]

Skipped components

Compatible components

Compatible components: [https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/neural-search.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/security-analytics.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git, https://github.com/opensearch-project/k-nn.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/asynchronous-search.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer-rca.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/reporting.git]

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2023

Gradle Check (Jenkins) Run Completed with:

  • RESULT: UNSTABLE ❕
  • TEST FAILURES:
      1 org.opensearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT.test {yaml=pit/10_basic/Delete all}

@github-actions github-actions bot added distributed framework enhancement Enhancement or improvement to existing feature or request labels Sep 18, 2023
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

  • RESULT: UNSTABLE ❕
  • TEST FAILURES:
      1 org.opensearch.search.SearchWeightedRoutingIT.testMultiGetWithNetworkDisruption_FailOpenEnabled

@kotwanikunal kotwanikunal added the v2.11.0 Issues and PRs related to version 2.11.0 label Sep 18, 2023
@andrross
Copy link
Member

I am more concerned about which threadpool does the await.

@Bukhtawar, here is my understanding of how the threading works. IndexShard.copySegmentFiles() is a synchronous API as currently implemented, so it blocks until all files are downloaded. I've attempted to diagram the flow (note some of the functionality is repository-specific, so I'm describing the S3 implementation here):

-> (calling thread) IndexShard.copySegmentFiles()
-> (calling thread) For each segment file:
  -> (calling thread) RemoteSegmentStoreDirectory.copyTo()
  -> (calling thread) AsyncMultiStreamBlobContainer.asyncBlobDownload()
  -> (calling thread) AsyncMultiStreamBlobContainer.readBlobAsync()
  -> (calling thread) S3.GetObjectAttributesRequest (blocks on result)
    -> (s3 async thread) for each part: S3.GetObject (_starts_ streaming the object part)
    -> (s3 async thread) upon completion of the last part, trigger ReadContextListener.onResponse
    -> (GENERIC thread) for each input stream: drain InputStream to file (FilePartWriter.run())
<- (calling thread) blocks until all files are completely downloaded

I'll try to describe this in prose as well: Starting from IndexShard.copySegmentFiles(), for each file the calling thread does an initial blocking call to get the object metadata, but then returns immediately while registering listeners to do the rest of the work. IndexShard.copySegmentFiles() blocks because it is synchronous and expects the files to be downloaded when it returns. As for the multi-threading work, the initial call to get the S3 InputStreams is done by the s3AsyncClient on a repository-specific thread pool. Once those futures are complete (which means that streaming is just beginning for each part), the work to drain the streams into a file is done on the generic thread pool. Meanwhile, the original calling thread is blocking until all files are completely downloaded.

I think the initial call must block unless we do a more major refactoring to make the code flows that call it asynchronous. Let me know if you have any concerns here.

I have a couple comments/concerns here:

  • That blocking call to get object metadata is probably not ideal. Can we register a whenComplete handler on the CompletableFuture so that we don't sequentially block when starting all the individual file downloads?
  • I'm not sure that the handoff to the generic thread pool is the right thing to do. In theory we could chain the logic to drain the streams to a file onto the s3 async thread that did the initial GetObject call and not involve another thread pool at all.

/cc @kotwanikunal @vikasvb90

@gbbafna
Copy link
Collaborator

gbbafna commented Sep 22, 2023

I'm not sure that the handoff to the generic thread pool is the right thing to do. In theory we could chain the logic to drain the streams to a file onto the s3 async thread that did the initial GetObject call and not involve another thread pool at all.

We should create another threadpool with much bigger size purely to do I/O . I think we are already tracking it in #10106 .

For GetObjectAttributesRequest doing a blocking call , I am not too worried as this is just one lightweight remote call and is not doing lot of I/O work .

@kotwanikunal
Copy link
Member Author

kotwanikunal commented Sep 22, 2023

That blocking call to get object metadata is probably not ideal. Can we register a whenComplete handler on the CompletableFuture so that we don't sequentially block when starting all the individual file downloads?

Updated the S3 reference here: #10192

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@andrross andrross merged commit 9e90671 into opensearch-project:main Sep 22, 2023
13 checks passed
@andrross andrross added the backport 2.x Backport to 2.x branch label Sep 22, 2023
opensearch-trigger-bot bot pushed a commit that referenced this pull request Sep 22, 2023
Signed-off-by: Kunal Kotwani <[email protected]>
(cherry picked from commit 9e90671)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
@vikasvb90
Copy link
Contributor

vikasvb90 commented Sep 23, 2023

@andrross Handoff to generic pool is definitely not right and we can also make metadata call non-blocking by chaining it with subsequent downloads. There is a way by which downloads can be made completely async and we wouldn't need any threadpool for multiple parallel downloads but that requires significant changes in stream processing layer (decryption/data integrity).

To add more on this, there are two types of IO happening in downloads - fetch from S3 and disk writes. Fetch from S3 is async even in the current state but we are not able to truly benefit from it because each part download happens within a separate thread. And this is done to parallelize part downloads and because we need some pre-processing work like decryption to be done before committing a buffer to disk.

We can make both disk writes and fetch from S3 async and still be able to do pre-processing work like decryption by implementing AsyncResponseTransfer similar to what FileAsyncResponseTransformer does but this requires decent amount of changes across s3 and decryption layer. What is lacking is a clean communication between these two layers which can be easily addressed in golang via channels.

We can still take up this as a follow up task thoughtand as @gbbafna mentioned, since we will only be doing IO in this new pool, for now we can assign it a large size. This would mean that we would still continue to bear context switch cost and minimal CPU provided stream processing work is further bounded in a small pool. I would still very much like to see downloads happening in async given we are making certain compromises.

sarthakaggarwal97 pushed a commit to sarthakaggarwal97/OpenSearch that referenced this pull request Sep 24, 2023
brusic pushed a commit to brusic/OpenSearch that referenced this pull request Sep 25, 2023
kotwanikunal pushed a commit that referenced this pull request Sep 27, 2023
…0199)

(cherry picked from commit 9e90671)

Signed-off-by: Kunal Kotwani <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
vikasvb90 pushed a commit to vikasvb90/OpenSearch that referenced this pull request Oct 10, 2023
shiv0408 pushed a commit to Gaurav614/OpenSearch that referenced this pull request Apr 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 2.x Backport to 2.x branch distributed framework enhancement Enhancement or improvement to existing feature or request skip-changelog v2.11.0 Issues and PRs related to version 2.11.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Repository] Support multi-stream downloads within Repository
5 participants