Skip to content

Commit

Permalink
Added BufferedInputStream to allow mark and reset ops during IO errors (
Browse files Browse the repository at this point in the history
#10690)

Signed-off-by: vikasvb90 <[email protected]>
(cherry picked from commit 75bd9f2)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 18, 2023
1 parent 1970085 commit 632ba3e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -55,6 +56,7 @@
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -93,17 +95,21 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) {
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));

Check warning on line 98 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L98

Added line #L98 was not covered by tests
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
);
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_STREAM_READER, priorityPoolCount(settings), 10_000, PRIORITY_STREAM_READER)
);
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

Check warning on line 102 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L102

Added line #L102 was not covered by tests

executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
executorBuilders.add(new FixedExecutorBuilder(settings, STREAM_READER, normalPoolCount(settings), 10_000, STREAM_READER));
executorBuilders.add(new ScalingExecutorBuilder(STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

Check warning on line 105 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L105

Added line #L105 was not covered by tests
return executorBuilders;
}

static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);

Check warning on line 110 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L110

Added line #L110 was not covered by tests
}

S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) {
this.service = Objects.requireNonNull(service, "S3 service must not be null");
this.configPath = configPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -142,7 +144,9 @@ private static void uploadPart(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
AsyncRequestBody.fromInputStream(
inputStreamContainer.getInputStream(),
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Base64;
Expand Down Expand Up @@ -303,7 +304,9 @@ private void uploadInOneChunk(
() -> s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
AsyncRequestBody.fromInputStream(
inputStreamContainer.getInputStream(),
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
Expand Down

0 comments on commit 632ba3e

Please sign in to comment.