Skip to content

Commit

Permalink
Initial commit for POC
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Aug 4, 2023
1 parent 57d5e90 commit de65295
Show file tree
Hide file tree
Showing 16 changed files with 646 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -210,6 +211,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@Override
public void asyncBlobDownload(ReadContext writeContext, ActionListener<Void> completionListener) throws IOException {

}

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.exception.CorruptFileException;
Expand All @@ -21,7 +25,10 @@
import org.opensearch.common.util.ByteUtils;
import org.opensearch.repositories.s3.io.CheckedContainer;
import org.opensearch.repositories.s3.SocketAccess;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand All @@ -33,17 +40,22 @@
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CompletableFutureUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Supplier;
Expand Down Expand Up @@ -102,6 +114,37 @@ public CompletableFuture<Void> uploadObject(S3AsyncClient s3AsyncClient, UploadR
return returnFuture;
}

public InputStream downloadObject(S3AsyncClient s3AsyncClient) {
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket("")
.key("")
.partNumber(1)
.build();

CompletableFuture<ResponseInputStream<GetObjectResponse>> responseFuture = s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream());

return responseFuture.join();
// Directory directory = null;
// try {
// IndexOutput indexOutput = directory.createOutput("", IOContext.DEFAULT);
// CompletableFuture<ResponseInputStream<GetObjectResponse>> responseFuture = s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream());
// ResponseInputStream<GetObjectResponse> responseResponseInputStream = responseFuture.join();
// indexOutput.writeByte(responseResponseInputStream.readAllBytes()[responseResponseInputStream.available()]);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }



// CompletableFuture<ResponseBytes<GetObjectResponse>> responseFuture = s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBytes());
// responseFuture.join().asInputStream()


// Future<GetObjectResponse> getObjectResponseFuture =
// s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toFile(Path.of("")));

}

private void uploadInParts(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush)
}
maxSeqNoRefreshedOrFlushed = maxSeqNo;
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
int numberOfOperations = randomIntBetween(1, 5);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc();
maxSeqNo = response.getSeqNo();
Expand All @@ -112,7 +112,7 @@ private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1);
}

private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
protected void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
Expand All @@ -129,12 +129,10 @@ private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boo

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);
// assertEquals(indexStats.get(TOTAL_OPERATIONS).longValue(),
// client().prepareSearch(INDEX_NAME).setSize(0).get().getInternalResponse().hits().getTotalHits().value);
verifyRestoredData(indexStats, remoteTranslog);

if (remoteTranslog) {
verifyRestoredData(indexStats, true);
} else {
verifyRestoredData(indexStats, false);
}
}

public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.stream.Collectors;
Expand All @@ -29,10 +30,16 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
protected void putRepository(Path path) {
logger.error("Repo Path: {}", path);
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(Settings.builder().put("location", path))
);
}

@Override
public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(true, 2, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@

package org.opensearch.remotestore.multipart.mocks;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -34,14 +40,15 @@ public class MockFsVerifyingBlobContainer extends FsBlobContainer implements Ver

private final boolean triggerDataIntegrityFailure;

private static final Logger logger = LogManager.getLogger(MockFsVerifyingBlobContainer.class);

public MockFsVerifyingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) {
super(blobStore, blobPath, path);
this.triggerDataIntegrityFailure = triggerDataIntegrityFailure;
}

@Override
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {

int nParts = 10;
long partSize = writeContext.getFileSize() / nParts;
StreamContext streamContext = writeContext.getStreamProvider(partSize);
Expand Down Expand Up @@ -114,6 +121,71 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

}

@Override
public void asyncBlobDownload(ReadContext readContext, ActionListener<Void> completionListener) throws IOException {
int nParts = 10;
long partSize = readContext.getFileSize() / nParts;
StreamContext streamContext = readContext.getStreamProvider(partSize);
Directory directory = readContext.getLocalDirectory();

byte[] buffer = new byte[(int) readContext.getFileSize()];
AtomicLong totalContentRead = new AtomicLong();
CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts());
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
int finalPartIdx = partIdx;
Thread thread = new Thread(() -> {
try {
InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
InputStream inputStream = inputStreamContainer.getInputStream();
long remainingContentLength = inputStreamContainer.getContentLength();
long offset = partSize * finalPartIdx;
while (remainingContentLength > 0) {
int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength);
totalContentRead.addAndGet(readContentLength);
remainingContentLength -= readContentLength;
offset += readContentLength;
}
inputStream.close();
} catch (IOException e) {
completionListener.onFailure(e);
} finally {
latch.countDown();
}
});
thread.start();
}
try {
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for file transfer to complete for " + readContext.getFileName());
}
} catch (InterruptedException e) {
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + readContext.getFileName());
}
logger.error("Buffer: {}", buffer);
try (IndexOutput output = directory.createOutput(readContext.getFileName(), IOContext.DEFAULT)) {
output.writeBytes(buffer, buffer.length);
}

try {
// bulks need to succeed for segment files to be generated
if (isSegmentFile(readContext.getFileName()) && triggerDataIntegrityFailure) {
completionListener.onFailure(
new RuntimeException(
new CorruptIndexException(
"Data integrity check failure for file: " + readContext.getFileName(),
readContext.getFileName()
)
)
);
} else {
readContext.getDownloadFinalizer().accept(true);
completionListener.onResponse(null);
}
} catch (Exception e) {
completionListener.onFailure(e);
}
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.common.blobstore;

import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;

import java.io.IOException;
Expand All @@ -31,4 +32,15 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Reads blob content from multiple streams, each from a specific part of the file, which is provided by the
* StreamContextSupplier in the WriteContext passed to this method. An {@link IOException} is thrown if reading
* any of the input streams fails, or writing to the target blob fails
*
* @param writeContext A WriteContext object encapsulating all information needed to perform the upload
* @param completionListener Listener on which upload events should be published.
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobDownload(ReadContext writeContext, ActionListener<Void> completionListener) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read;

import org.apache.lucene.store.Directory;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.StreamContextSupplier;

import java.io.IOException;

/**
* WriteContext is used to encapsulate all data needed by <code>BlobContainer#readStreams</code>
*
* @opensearch.internal
*/
public class ReadContext {

private final String fileName;
private final String remoteFileName;
private final CheckedConsumer<Boolean, IOException> downloadFinalizer;
private final boolean doRemoteDataIntegrityCheck;
private Directory localDirectory;

/**
* Construct a new WriteContext object
*
* @param fileName The name of the file being downloaded
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
*/
public ReadContext(
String remoteFileName,
String fileName,
CheckedConsumer<Boolean, IOException> downloadFinalizer,
boolean doRemoteDataIntegrityCheck
) {
this.remoteFileName = remoteFileName;
this.fileName = fileName;
this.downloadFinalizer = downloadFinalizer;
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
}

public void setLocalDirectory(Directory directory) {
this.localDirectory = directory;
}

public Directory getLocalDirectory() {
return this.localDirectory;
}

/**
* @return The file name
*/
public String getFileName() {
return fileName;
}

public String getRemoteFileName() {
return remoteFileName;
}

/**
* @return The <code>UploadFinalizer</code> for this upload
*/
public CheckedConsumer<Boolean, IOException> getDownloadFinalizer() {
return downloadFinalizer;
}

/**
* @return A boolean for whether remote data integrity check has to be done for this upload or not
*/
public boolean doRemoteDataIntegrityCheck() {
return doRemoteDataIntegrityCheck;
}

}
Loading

0 comments on commit de65295

Please sign in to comment.