Skip to content

Commit

Permalink
Merge branch 'main' into remote-metadata-equals
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Oct 20, 2023
2 parents 4cb564e + c400d84 commit 4df543c
Show file tree
Hide file tree
Showing 45 changed files with 1,667 additions and 174 deletions.
1 change: 1 addition & 0 deletions .ci/bwcVersions
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ BWC_VERSION:
- "2.10.0"
- "2.10.1"
- "2.11.0"
- "2.11.1"
- "2.12.0"
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @reta @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @gbbafna @setiah @kartg @kotwanikunal @mch2 @nknize @owaiskazi19 @peternied @Rishikesh1159 @ryanbogan @saratvemulapalli @shwetathareja @dreamer-89 @tlfeng @VachaShah @dbwiddis @sachinpkale @sohami @msfroh
* @abbashus @adnapibar @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @dreamer-89 @gbbafna @kartg @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @ryanbogan @sachinpkale @saratvemulapalli @setiah @shwetathareja @sohami @tlfeng @VachaShah
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission control] Add Resource usage collector service and resource usage tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890))
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
<<<<<<< Updated upstream
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
=======
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
>>>>>>> Stashed changes

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -94,6 +91,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
1 change: 1 addition & 0 deletions libs/core/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_2_10_0 = new Version(2100099, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_10_1 = new Version(2100199, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_11_0 = new Version(2110099, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_11_1 = new Version(2110199, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_12_0 = new Version(2120099, org.apache.lucene.util.Version.LUCENE_9_8_0);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_8_0);
public static final Version CURRENT = V_3_0_0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return builder.build();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10735")
@Override
public void testRequestStats() throws Exception {
final String repository = createRepository(randomName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -140,28 +141,39 @@ private static void uploadPart(
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
AsyncRequestBody.fromInputStream(
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
)
);

CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
inputStreamContainers,
uploadPartResponse,
partNumber,
uploadRequest.doRemoteDataIntegrityCheck()
)
);
CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException ex) {
log.error(
() -> new ParameterizedMessage(
"Failed to close stream while uploading a part of idx {} and file {}.",
uploadPartRequest.partNumber(),
uploadPartRequest.key()
),
ex
);
}
})
.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
inputStreamContainers,
uploadPartResponse,
partNumber,
uploadRequest.doRemoteDataIntegrityCheck()
)
);
futures.add(convertFuture);

CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
Expand Down Expand Up @@ -310,17 +311,22 @@ private void uploadInOneChunk(
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
AsyncRequestBody.fromInputStream(
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
).handle((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException e) {
log.error(
() -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()),
e
);
}
if (throwable != null) {
Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class);
if (unwrappedThrowable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -71,17 +76,16 @@ public void testOneChunkUpload() {
putObjectResponseCompletableFuture
);

AtomicReference<InputStream> streamRef = new AtomicReference<>();
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, false, null),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
1
),
new StreamContext((partIdx, partSize, position) -> {
streamRef.set(new ZeroInputStream(partSize));
return new InputStreamContainer(streamRef.get(), partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1),
new StatsMetricPublisher()
);

Expand All @@ -92,6 +96,14 @@ public void testOneChunkUpload() {
}

verify(s3AsyncClient, times(1)).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class));

boolean closeError = false;
try {
streamRef.get().available();
} catch (IOException e) {
closeError = e.getMessage().equals("Stream closed");
}
assertTrue("InputStream was still open after upload", closeError);
}

public void testOneChunkUploadCorruption() {
Expand Down Expand Up @@ -162,17 +174,17 @@ public void testMultipartUpload() {
abortMultipartUploadResponseCompletableFuture
);

List<InputStream> streams = new ArrayList<>();
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, true, 3376132981L),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
5
),
new StreamContext((partIdx, partSize, position) -> {
InputStream stream = new ZeroInputStream(partSize);
streams.add(stream);
return new InputStreamContainer(stream, partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5),
new StatsMetricPublisher()
);

Expand All @@ -182,6 +194,16 @@ public void testMultipartUpload() {
fail("did not expect resultFuture to fail");
}

streams.forEach(stream -> {
boolean closeError = false;
try {
stream.available();
} catch (IOException e) {
closeError = e.getMessage().equals("Stream closed");
}
assertTrue("InputStream was still open after upload", closeError);
});

verify(s3AsyncClient, times(1)).createMultipartUpload(any(CreateMultipartUploadRequest.class));
verify(s3AsyncClient, times(5)).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class));
verify(s3AsyncClient, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
Expand Down
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-2.11.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Added
- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386))
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Add parallel file download support for remote store based replication ([#8596](https://github.com/opensearch-project/OpenSearch/pull/8596))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ protected void cleanupRepo() {
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
return setup(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure, 0);
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure, int replicaCount) {
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
Expand All @@ -128,6 +132,7 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep

internalCluster().startClusterManagerOnlyNode(settings.build());
String dataNodeName = internalCluster().startDataOnlyNode(settings.build());
internalCluster().startDataOnlyNodes(replicaCount, settings.build());
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down
Loading

0 comments on commit 4df543c

Please sign in to comment.