diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 81a902a6992d8..cdd702371e296 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -46,6 +46,7 @@ import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.AbstractBlobContainer; @@ -210,6 +211,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } + @Override + public void asyncBlobDownload(ReadContext writeContext, ActionListener completionListener) throws IOException { + + } + // package private for testing long getLargeBlobThresholdInBytes() { return blobStore.bufferSizeInBytes(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 5b43ae84c51dc..3b84aef98636e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -12,6 +12,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.IOUtils; import org.opensearch.ExceptionsHelper; import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.exception.CorruptFileException; @@ -21,7 +25,10 @@ import org.opensearch.common.util.ByteUtils; import org.opensearch.repositories.s3.io.CheckedContainer; import org.opensearch.repositories.s3.SocketAccess; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -33,17 +40,22 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.utils.CompletableFutureUtils; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; import java.util.Arrays; import java.util.Base64; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -102,6 +114,37 @@ public CompletableFuture uploadObject(S3AsyncClient s3AsyncClient, UploadR return returnFuture; } + public InputStream downloadObject(S3AsyncClient s3AsyncClient) { + GetObjectRequest getObjectRequest = GetObjectRequest.builder() + .bucket("") + .key("") + .partNumber(1) + .build(); + + CompletableFuture> responseFuture = s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream()); + + return responseFuture.join(); +// Directory directory = null; +// try { +// IndexOutput indexOutput = directory.createOutput("", IOContext.DEFAULT); +// CompletableFuture> responseFuture = s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream()); +// ResponseInputStream responseResponseInputStream = responseFuture.join(); +// indexOutput.writeByte(responseResponseInputStream.readAllBytes()[responseResponseInputStream.available()]); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } + + + +// CompletableFuture> responseFuture = s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBytes()); +// responseFuture.join().asInputStream() + + +// Future getObjectResponseFuture = +// s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toFile(Path.of(""))); + + } + private void uploadInParts( S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index f01e4969b1fe7..e2e5a010bda99 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -85,7 +85,7 @@ private Map indexData(int numberOfIterations, boolean invokeFlush) } maxSeqNoRefreshedOrFlushed = maxSeqNo; refreshedOrFlushedOperations = totalOperations; - int numberOfOperations = randomIntBetween(20, 50); + int numberOfOperations = randomIntBetween(1, 5); for (int j = 0; j < numberOfOperations; j++) { IndexResponse response = indexSingleDoc(); maxSeqNo = response.getSeqNo(); @@ -112,7 +112,7 @@ private void verifyRestoredData(Map indexStats, boolean checkTotal assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1); } - private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException { + protected void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException { internalCluster().startDataOnlyNodes(3); if (remoteTranslog) { createIndex(INDEX_NAME, remoteTranslogIndexSettings(0)); @@ -129,12 +129,10 @@ private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boo client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); ensureGreen(INDEX_NAME); + // assertEquals(indexStats.get(TOTAL_OPERATIONS).longValue(), + // client().prepareSearch(INDEX_NAME).setSize(0).get().getInternalResponse().hits().getTotalHits().value); + verifyRestoredData(indexStats, remoteTranslog); - if (remoteTranslog) { - verifyRestoredData(indexStats, true); - } else { - verifyRestoredData(indexStats, false); - } } public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java index a523d5c0f5470..b80938462365d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -13,6 +13,7 @@ import org.opensearch.remotestore.RemoteStoreIT; import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import java.io.IOException; import java.nio.file.Path; import java.util.Collection; import java.util.stream.Collectors; @@ -29,10 +30,16 @@ protected Collection> nodePlugins() { @Override protected void putRepository(Path path) { + logger.error("Repo Path: {}", path); assertAcked( clusterAdmin().preparePutRepository(REPOSITORY_NAME) .setType(MockFsRepositoryPlugin.TYPE) .setSettings(Settings.builder().put("location", path)) ); } + + @Override + public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException { + testRestoreFlow(true, 2, true); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java index 8f2814eb7c4c4..7802dbe7b0c2d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java @@ -8,9 +8,15 @@ package org.opensearch.remotestore.multipart.mocks; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.BlobPath; @@ -34,6 +40,8 @@ public class MockFsVerifyingBlobContainer extends FsBlobContainer implements Ver private final boolean triggerDataIntegrityFailure; + private static final Logger logger = LogManager.getLogger(MockFsVerifyingBlobContainer.class); + public MockFsVerifyingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) { super(blobStore, blobPath, path); this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; @@ -41,7 +49,6 @@ public MockFsVerifyingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Pa @Override public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { - int nParts = 10; long partSize = writeContext.getFileSize() / nParts; StreamContext streamContext = writeContext.getStreamProvider(partSize); @@ -114,6 +121,71 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } + @Override + public void asyncBlobDownload(ReadContext readContext, ActionListener completionListener) throws IOException { + int nParts = 10; + long partSize = readContext.getFileSize() / nParts; + StreamContext streamContext = readContext.getStreamProvider(partSize); + Directory directory = readContext.getLocalDirectory(); + + byte[] buffer = new byte[(int) readContext.getFileSize()]; + AtomicLong totalContentRead = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts()); + for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { + int finalPartIdx = partIdx; + Thread thread = new Thread(() -> { + try { + InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx); + InputStream inputStream = inputStreamContainer.getInputStream(); + long remainingContentLength = inputStreamContainer.getContentLength(); + long offset = partSize * finalPartIdx; + while (remainingContentLength > 0) { + int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength); + totalContentRead.addAndGet(readContentLength); + remainingContentLength -= readContentLength; + offset += readContentLength; + } + inputStream.close(); + } catch (IOException e) { + completionListener.onFailure(e); + } finally { + latch.countDown(); + } + }); + thread.start(); + } + try { + if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for file transfer to complete for " + readContext.getFileName()); + } + } catch (InterruptedException e) { + throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + readContext.getFileName()); + } + logger.error("Buffer: {}", buffer); + try (IndexOutput output = directory.createOutput(readContext.getFileName(), IOContext.DEFAULT)) { + output.writeBytes(buffer, buffer.length); + } + + try { + // bulks need to succeed for segment files to be generated + if (isSegmentFile(readContext.getFileName()) && triggerDataIntegrityFailure) { + completionListener.onFailure( + new RuntimeException( + new CorruptIndexException( + "Data integrity check failure for file: " + readContext.getFileName(), + readContext.getFileName() + ) + ) + ); + } else { + readContext.getDownloadFinalizer().accept(true); + completionListener.onResponse(null); + } + } catch (Exception e) { + completionListener.onFailure(e); + } + } + private boolean isSegmentFile(String filename) { return !filename.endsWith(".tlog") && !filename.endsWith(".ckp"); } diff --git a/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java index 0dfcc5c50e4b1..ca33260653f93 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java @@ -9,6 +9,7 @@ package org.opensearch.common.blobstore; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import java.io.IOException; @@ -31,4 +32,15 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer { * @throws IOException if any of the input streams could not be read, or the target blob could not be written to */ void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException; + + /** + * Reads blob content from multiple streams, each from a specific part of the file, which is provided by the + * StreamContextSupplier in the WriteContext passed to this method. An {@link IOException} is thrown if reading + * any of the input streams fails, or writing to the target blob fails + * + * @param writeContext A WriteContext object encapsulating all information needed to perform the upload + * @param completionListener Listener on which upload events should be published. + * @throws IOException if any of the input streams could not be read, or the target blob could not be written to + */ + void asyncBlobDownload(ReadContext writeContext, ActionListener completionListener) throws IOException; } diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java new file mode 100644 index 0000000000000..3c3231219fb24 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.read; + +import org.apache.lucene.store.Directory; +import org.opensearch.common.CheckedConsumer; +import org.opensearch.common.Nullable; +import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.stream.write.StreamContextSupplier; + +import java.io.IOException; + +/** + * WriteContext is used to encapsulate all data needed by BlobContainer#readStreams + * + * @opensearch.internal + */ +public class ReadContext { + + private final String fileName; + private final String remoteFileName; + private final CheckedConsumer downloadFinalizer; + private final boolean doRemoteDataIntegrityCheck; + private Directory localDirectory; + + /** + * Construct a new WriteContext object + * + * @param fileName The name of the file being downloaded + * @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done + */ + public ReadContext( + String remoteFileName, + String fileName, + CheckedConsumer downloadFinalizer, + boolean doRemoteDataIntegrityCheck + ) { + this.remoteFileName = remoteFileName; + this.fileName = fileName; + this.downloadFinalizer = downloadFinalizer; + this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; + } + + public void setLocalDirectory(Directory directory) { + this.localDirectory = directory; + } + + public Directory getLocalDirectory() { + return this.localDirectory; + } + + /** + * @return The file name + */ + public String getFileName() { + return fileName; + } + + public String getRemoteFileName() { + return remoteFileName; + } + + /** + * @return The UploadFinalizer for this upload + */ + public CheckedConsumer getDownloadFinalizer() { + return downloadFinalizer; + } + + /** + * @return A boolean for whether remote data integrity check has to be done for this upload or not + */ + public boolean doRemoteDataIntegrityCheck() { + return doRemoteDataIntegrityCheck; + } + +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index ca744efae902d..ef158f4785d57 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -15,6 +15,7 @@ import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; @@ -24,6 +25,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; import java.util.Objects; /** @@ -36,7 +38,6 @@ public class RemoteTransferContainer implements Closeable { private int numberOfParts; private long partSize; private long lastPartSize; - private final long contentLength; private final SetOnce inputStreams = new SetOnce<>(); private final String fileName; @@ -46,6 +47,7 @@ public class RemoteTransferContainer implements Closeable { private final long expectedChecksum; private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; private final boolean isRemoteDataIntegritySupported; + private final Path localStorePath; private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); @@ -79,6 +81,15 @@ public RemoteTransferContainer( this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; this.expectedChecksum = expectedChecksum; this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + this.localStorePath = null; + } + + public RemoteTransferContainer( + String fileName, + String remoteFileName, + boolean isRemoteDataIntegritySupported + ) { + this(fileName, remoteFileName, -1L, false, null, null, -1L, isRemoteDataIntegritySupported); } /** @@ -97,6 +108,18 @@ public WriteContext createWriteContext() { ); } + /** + * @return The {@link ReadContext} for the current download + */ + public ReadContext createReadContext() { + return new ReadContext( + remoteFileName, + fileName, + this::finalizeUpload, + isRemoteDataIntegrityCheckPossible() + ); + } + // package-private for testing /** @@ -143,6 +166,10 @@ public interface OffsetRangeInputStreamSupplier { OffsetRangeInputStream get(long size, long position) throws IOException; } + public interface InputStreamSupplier { + InputStream get(long size, long position) throws IOException; + } + interface LocalStreamSupplier { Stream get() throws IOException; } @@ -154,10 +181,12 @@ private LocalStreamSupplier getMultipartStreamSupplier( ) { return () -> { try { + InputStream inputStream; OffsetRangeInputStream offsetRangeInputStream = offsetRangeInputStreamSupplier.get(size, position); - InputStream inputStream = !isRemoteDataIntegrityCheckPossible() + inputStream = !isRemoteDataIntegrityCheckPossible() ? new ResettableCheckedInputStream(offsetRangeInputStream, fileName) : offsetRangeInputStream; + Objects.requireNonNull(inputStreams.get())[streamIdx] = inputStream; return new InputStreamContainer(inputStream, size, position); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index fbb2df7f9640a..bc4578a48fd06 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -57,6 +57,8 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.ThreadInterruptedException; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.core.Assertions; @@ -194,6 +196,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; +import java.io.UncheckedIOException; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; @@ -2323,11 +2326,77 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); + + /* ActionListener listener = new ActionListener() { + @Override + public void onResponse(Void unused) { + try { + synchronized (engineMutex) { + assert currentEngineReference.get() == null : "engine is running"; + verifyNotClosed(); + + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + if (syncFromRemote) { + syncRemoteTranslogAndUpdateGlobalCheckpoint(); + } else { + // we will enter this block when we do not want to recover from remote translog. + // currently only during snapshot restore, we are coming into this block. + // here, as while initiliazing remote translog we cannot skip downloading translog files, + // so before that step, we are deleting the translog files present in remote store. + deleteTranslogFilesFromRemoteTranslog(); + + } + } + // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). + final Engine newEngine = engineFactory.newReadWriteEngine(config); + onNewEngine(newEngine); + currentEngineReference.set(newEngine); + // We set active because we are now writing operations to the engine; this way, + // we can flush if we go idle after some time and become inactive. + active.set(true); + + } + + // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, + // during + // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. + onSettingsChanged(); + assert assertSequenceNumbersInCommit(); + recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + + } catch (IOException e) { + onFailure(e); + throw new UncheckedIOException(e); + } + } + + @Override + public void onFailure(Exception e) { + + } + }; + + CountDownLatch latch = new CountDownLatch(1); + ActionListener latchedListener = new LatchedActionListener<>(listener, latch); + + synchronized (engineMutex) { + if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { + syncSegmentsFromRemoteSegmentStore(false, true, true, latchedListener); + } else { + listener.onResponse(null); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +*/ synchronized (engineMutex) { assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { - syncSegmentsFromRemoteSegmentStore(false, true, true); + syncSegmentsFromRemoteSegmentStore(false, true, true, new PlainActionFuture<>()); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { if (syncFromRemote) { @@ -4508,6 +4577,58 @@ void resetEngineToGlobalCheckpoint() throws IOException { SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); + +// ActionListener listener = new ActionListener() { +// @Override +// public void onResponse(Void unused) { +// try { +// if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { +// syncRemoteTranslogAndUpdateGlobalCheckpoint(); +// } +// newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); +// onNewEngine(newEngineReference.get()); +// +// final TranslogRecoveryRunner translogRunner = (snapshot) -> runTranslogRecovery( +// newEngineReference.get(), +// snapshot, +// Engine.Operation.Origin.LOCAL_RESET, +// () -> { +// // TODO: add a dedicate recovery stats for the reset translog +// } +// ); +// +// // When the new engine is created, translogs are synced from remote store onto local. Since remote store is the source +// // of truth for translog, we play all translogs that exists locally. Otherwise, the recoverUpto happens upto global +// // checkpoint. +// // We also replay all local translog ops with Segment replication, because on engine swap our local translog may +// // hold more ops than the global checkpoint. +// long recoverUpto = isRemoteTranslogEnabled() || indexSettings().isSegRepEnabled() ? Long.MAX_VALUE : globalCheckpoint; +// newEngineReference.get() +// .translogManager() +// .recoverFromTranslog(translogRunner, newEngineReference.get().getProcessedLocalCheckpoint(), recoverUpto); +// newEngineReference.get().refresh("reset_engine"); +// synchronized (engineMutex) { +// verifyNotClosed(); +// IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); +// // We set active because we are now writing operations to the engine; this way, +// // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. +// active.set(true); +// } +// // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, +// // during +// // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. +// onSettingsChanged(); +// } catch (IOException e) { +// throw new UncheckedIOException(e); +// } +// } +// +// @Override +// public void onFailure(Exception e) { +// +// } +// }; + synchronized (engineMutex) { verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, @@ -4555,7 +4676,7 @@ public void close() throws IOException { }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false, true, true); + syncSegmentsFromRemoteSegmentStore(false, true, true, new PlainActionFuture<>()); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4619,8 +4740,13 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { * @param shouldCommit if the shard requires committing the changes after sync from remote. * @throws IOException if exception occurs while reading segments from remote store */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) - throws IOException { + public void syncSegmentsFromRemoteSegmentStore( + boolean overrideLocal, + boolean refreshLevelSegmentSync, + boolean shouldCommit, + // This is unused. TODO: Remove if not needed going forward + ActionListener listener + ) throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); @@ -4647,8 +4773,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re } else { storeDirectory = store.directory(); } + + // At this point, we need all the files to be downloaded before we can move ahead. Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); - copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal); + CountDownLatch latch = new CountDownLatch(1); + ActionListener latchedListener = new LatchedActionListener<>(new PlainActionFuture<>(), latch); + copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, latchedListener); + latch.await(); if (refreshLevelSegmentSync && remoteSegmentMetadata != null) { try ( @@ -4696,10 +4827,86 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re } } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); + } catch (InterruptedException e) { + throw new RuntimeException(e); } finally { store.decRef(); remoteStore.decRef(); } + + /* + ActionListener internalListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) { + try { + Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); + if (refreshLevelSegmentSync && remoteSegmentMetadata != null) { + try ( + ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( + new ByteArrayIndexInput("Snapshot of SegmentInfos", remoteSegmentMetadata.getSegmentInfosBytes()) + ); + ) { + SegmentInfos infosSnapshot = SegmentInfos.readCommit( + store.directory(), + indexInput, + remoteSegmentMetadata.getGeneration() + ); + if (shouldCommit) { + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs + // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, + // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the + // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the + // latest + // commit. + Optional localMaxSegmentInfos = localSegmentFiles.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + if (localMaxSegmentInfos.isPresent() + && infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName( + localMaxSegmentInfos.get() + ) - 1) { + // If remote translog is not enabled, local translog will be created with different UUID. + // This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs + // to be same. Following code block make sure to have the same UUID. + if (indexSettings.isRemoteTranslogStoreEnabled() == false) { + SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo(); + Map userData = new HashMap<>(infosSnapshot.getUserData()); + userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY)); + infosSnapshot.setUserData(userData, false); + } + storeDirectory.deleteFile(localMaxSegmentInfos.get()); + } + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + } else { + finalizeReplication(infosSnapshot); + } + } + } + listener.onResponse(unused); + } catch (Exception ex) { + Exception exception = new IndexShardRecoveryException( + shardId, + "Exception while copying segment files from remote segment store", + ex + ); + onFailure(exception); + listener.onFailure(exception); + } finally { + store.decRef(); + remoteStore.decRef(); + } + + } + + @Override + public void onFailure(Exception e) { + + } + }; + + copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, internalListener); + */ } /** @@ -4729,13 +4936,15 @@ public void syncSegmentsFromGivenRemoteSegmentStore( final Directory storeDirectory = store.directory(); store.incRef(); + // TODO: Convert this into a listener pattern try { String segmentsNFile = copySegmentFiles( storeDirectory, sourceRemoteDirectory, remoteDirectory, uploadedSegments, - overrideLocal + overrideLocal, + null ); if (segmentsNFile != null) { try ( @@ -4768,8 +4977,10 @@ private String copySegmentFiles( RemoteSegmentStoreDirectory sourceRemoteDirectory, RemoteSegmentStoreDirectory targetRemoteDirectory, Map uploadedSegments, - boolean overrideLocal + boolean overrideLocal, + ActionListener listener ) throws IOException { + List toDownloadSegments = new ArrayList<>(); List downloadedSegments = new ArrayList<>(); List skippedSegments = new ArrayList<>(); String segmentNFile = null; @@ -4783,8 +4994,32 @@ private String copySegmentFiles( for (String file : uploadedSegments.keySet()) { long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { - storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT); - downloadedSegments.add(file); + toDownloadSegments.add(file); + ActionListener actionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) { + try { + storeDirectory.sync(Collections.singleton(file)); + downloadedSegments.add(file); + if (toDownloadSegments.size() == downloadedSegments.size()) { + listener.onResponse(unused); + } + } catch (IOException e) { + onFailure(e); + throw new UncheckedIOException(e); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("Exception when downloading segment: {}", e); + // TODO: Add retry mechanism here + listener.onFailure(e); + } + }; + sourceRemoteDirectory.copyTo(storeDirectory, file, IOContext.DEFAULT, actionListener); +// storeDirectory.sync(Collections.singleton(file)); +// downloadedSegments.add(file); } else { skippedSegments.add(file); } @@ -4797,6 +5032,9 @@ private String copySegmentFiles( } } } finally { + if (toDownloadSegments.isEmpty()) { + listener.onResponse(null); + } logger.info("Downloaded segments here: {}", downloadedSegments); logger.info("Skipped download for segments here: {}", skippedSegments); } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index d4e779c83644f..2d27418eb0c9e 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -45,6 +45,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.routing.RecoverySource; @@ -73,6 +74,7 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.HashMap; @@ -529,8 +531,40 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco store.incRef(); remoteStore.incRef(); try { +// ActionListener listener = new ActionListener() { +// @Override +// public void onResponse(Object o) { +// try { +// if (store.directory().listAll().length == 0) { +// store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); +// } +// if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) { +// indexShard.syncTranslogFilesFromRemoteTranslog(); +// } else { +// bootstrap(indexShard, store); +// } +// +// assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; +// indexShard.recoveryState().getIndex().setFileDetailsComplete(); +// indexShard.openEngineAndRecoverFromTranslog(); +// indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); +// indexShard.finalizeRecovery(); +// indexShard.postRecovery("post recovery from remote_store"); +// } catch (IOException e) { +// throw new UncheckedIOException(e); +// } finally { +// store.decRef(); +// remoteStore.decRef(); +// } +// } +// +// @Override +// public void onFailure(Exception e) { +// +// } +// }; // Download segments from remote segment store - indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true); + indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true, new PlainActionFuture<>()); if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index d3e8d961337cc..810270c054d63 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -27,6 +27,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; import org.opensearch.common.blobstore.exception.CorruptFileException; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; @@ -398,6 +399,76 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } } + /** + * Provides an async mechanism to copy file from a remote directory to the provided directory utilizing multi-stream + * downloads + * @param to directory where the file has to be copied + * @param src name of the file + * @param context context for the IO operation + * @param listener listener to be notified when download is complete + * @throws IOException + */ + public void copyTo(Directory to, String src, IOContext context, ActionListener listener) throws IOException { + try { + downloadBlob(to, src, context, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private void downloadBlob(Directory to, String localFileName, IOContext ioContext, ActionListener listener) + throws Exception { + + final String remoteFileName = getExistingRemoteFilename(localFileName); + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + localFileName, + remoteFileName, + remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer + + ); + + ActionListener completionListener = ActionListener.wrap(resp -> { + try { + postDownload(); + listener.onResponse(null); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception in segment postDownload for file [{}]", remoteFileName), e); + listener.onFailure(e); + } + }, ex -> { + logger.error(() -> new ParameterizedMessage("Failed to download blob {}", remoteFileName), ex); + IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); + if (corruptIndexException != null) { + listener.onFailure(corruptIndexException); + return; + } + Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); + if (throwable != null) { + CorruptFileException corruptFileException = (CorruptFileException) throwable; + listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); + return; + } + listener.onFailure(ex); + }); + + completionListener = ActionListener.runBefore(completionListener, () -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + }); + + ReadContext readContext = remoteTransferContainer.createReadContext(); + // TODO: Change this logic to parallel streams + temp directory + readContext.setLocalDirectory(to); + ((VerifyingMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer()).asyncBlobDownload(readContext, completionListener); + } + + private void postDownload() { + // TODO: Logic to merge smaller files from stream downloads + } + private void uploadBlob(Directory from, String src, String remoteFileName, IOContext ioContext, ActionListener listener) throws Exception { long expectedChecksum = calculateChecksumOfChecksum(from, src); @@ -411,7 +482,11 @@ private void uploadBlob(Directory from, String src, String remoteFileName, IOCon contentLength, true, WritePriority.NORMAL, - (size, position) -> new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position), + (size, position) -> new OffsetRangeIndexInputStream( + from.openInput(src, ioContext), + size, + position + ), expectedChecksum, remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer ); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 974e8af42b939..7096dbe2bb42f 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -120,7 +120,11 @@ private void uploadBlob( contentLength, true, writePriority, - (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), + (RemoteTransferContainer.OffsetRangeInputStreamSupplier) (size, position) -> new OffsetRangeFileInputStream( + fileSnapshot.getPath(), + size, + position + ), Objects.requireNonNull(fileSnapshot.getChecksum()), blobStore.blobContainer(blobPath) instanceof VerifyingMultiStreamBlobContainer ); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index a289c8f8a04b7..6aeea711bc4ad 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -41,6 +41,7 @@ import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.metadata.IndexMetadata; @@ -245,7 +246,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true); + indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true, new PlainActionFuture<>()); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index c5be7635782af..9c3d2235b3f16 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -93,8 +93,19 @@ public void getSegmentFiles( ActionListener listener ) { try { - indexShard.syncSegmentsFromRemoteSegmentStore(false, true, false); - listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + ActionListener listener1 = new ActionListener() { + @Override + public void onResponse(Object o) { + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }; + indexShard.syncSegmentsFromRemoteSegmentStore(false, true, false, listener1); + } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java index cae15a2c53f3f..87ff8f03da595 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.shard; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.NRTReplicationEngineFactory; @@ -36,7 +37,7 @@ public void testReplicaSyncingFromRemoteStore() throws IOException { assertDocs(primaryShard, "1", "2"); flushShard(primaryShard); - replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false); + replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false, new PlainActionFuture<>()); assertDocs(replicaShard, "1", "2"); closeShards(primaryShard, replicaShard); } diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index deae4eaca19e7..6ba7e692afb84 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -188,7 +188,12 @@ public void testGetSegmentFilesFailure() throws IOException { ); Mockito.doThrow(new RuntimeException("testing")) .when(mockShard) - .syncSegmentsFromRemoteSegmentStore(Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean()); + .syncSegmentsFromRemoteSegmentStore( + Mockito.anyBoolean(), + Mockito.anyBoolean(), + Mockito.anyBoolean(), + new PlainActionFuture<>() + ); assertThrows(ExecutionException.class, () -> { final PlainActionFuture res = PlainActionFuture.newFuture(); replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), mockShard, res);