Skip to content

Commit

Permalink
Introduce a unified download manager for remote store operations
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Jan 31, 2024
1 parent 16c5257 commit 5b9ad7d
Show file tree
Hide file tree
Showing 19 changed files with 398 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ public static final IndexShard newIndexShard(
null,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId,
null,
null
);
}
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.transfermanager.DownloadManager;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
Expand Down Expand Up @@ -605,7 +606,8 @@ public IndexService newIndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
RecoverySettings recoverySettings
RecoverySettings recoverySettings,
DownloadManager downloadManager
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -664,7 +666,8 @@ public IndexService newIndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
clusterRemoteTranslogBufferIntervalSupplier,
recoverySettings
recoverySettings,
downloadManager
);
success = true;
return indexService;
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.SearchIndexNameMatcher;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.remote.transfermanager.DownloadManager;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -182,6 +183,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
private final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier;
private final RecoverySettings recoverySettings;
private final DownloadManager downloadManager;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -217,7 +219,8 @@ public IndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
RecoverySettings recoverySettings
RecoverySettings recoverySettings,
DownloadManager downloadManager
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -295,6 +298,7 @@ public IndexService(
this.translogFactorySupplier = translogFactorySupplier;
this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier;
this.recoverySettings = recoverySettings;
this.downloadManager = downloadManager;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -528,7 +532,8 @@ public synchronized IndexShard createShard(
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier,
nodeEnv.nodeId(),
recoverySettings
recoverySettings,
downloadManager
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,74 @@
* compatible open source license.
*/

package org.opensearch.index.store;
package org.opensearch.index.remote.transfermanager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;

/**
* Helper class to downloads files from a {@link RemoteSegmentStoreDirectory}
* instance to a local {@link Directory} instance in parallel depending on thread
* pool size and recovery settings.
*
* @opensearch.api
*/
@PublicApi(since = "2.11.0")
public final class RemoteStoreFileDownloader {
private final Logger logger;
public class DownloadManager {

private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;
private static final Logger logger = LogManager.getLogger(DownloadManager.class);

public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, RecoverySettings recoverySettings) {
this.logger = Loggers.getLogger(RemoteStoreFileDownloader.class, shardId);
public DownloadManager(ThreadPool threadPool, RecoverySettings recoverySettings) {
this.threadPool = threadPool;
this.recoverySettings = recoverySettings;
}

public DownloadManager(DownloadManager other) {
this.threadPool = other.threadPool;
this.recoverySettings = other.recoverySettings;
}

public TranslogTransferMetadata fetchTranslogTransferMetadata(BlobContainer blobContainer, String fileName) throws IOException {
byte[] data = downloadFileFromRemoteStoreToMemory(blobContainer, fileName);
IndexInput indexInput = new ByteArrayIndexInput("metadata file", data);
return TranslogTransferManager.METADATA_STREAM_WRAPPER.readStream(indexInput);
}

public byte[] downloadFileFromRemoteStoreToMemory(BlobContainer blobContainer, String fileName) throws IOException {
// TODO: Add a circuit breaker check before putting all the data in heap
try (InputStream inputStream = blobContainer.readBlob(fileName)) {
return inputStream.readAllBytes();
}
}

public void downloadFileFromRemoteStore(BlobContainer blobContainer, String fileName, Path location) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
Files.deleteIfExists(filePath);

try (InputStream inputStream = blobContainer.readBlob(fileName)) {
Files.copy(inputStream, filePath);
}
}

/**
* Copies the given segments from the remote segment store to the given
* local directory.
Expand All @@ -55,14 +82,14 @@ public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, Recover
* @param toDownloadSegments The list of segment files to download
* @param listener Callback listener to be notified upon completion
*/
public void downloadAsync(
public void copySegmentsFromRemoteStoreAsync(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Collection<String> toDownloadSegments,
ActionListener<Void> listener
) {
downloadInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener);
copySegmentsInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener);
}

/**
Expand All @@ -77,7 +104,7 @@ public void downloadAsync(
* Must be thread safe as this may be invoked concurrently from
* different threads.
*/
public void download(
public void copySegmentsFromRemoteStore(
Directory source,
Directory destination,
Directory secondDestination,
Expand All @@ -86,7 +113,7 @@ public void download(
) throws InterruptedException, IOException {
final CancellableThreads cancellableThreads = new CancellableThreads();
final PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
downloadInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener);
copySegmentsInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener);
try {
listener.get();
} catch (ExecutionException e) {
Expand All @@ -105,7 +132,7 @@ public void download(
}
}

private void downloadInternal(
private void copySegmentsInternal(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Expand All @@ -126,11 +153,19 @@ private void downloadInternal(
logger.trace("Starting download of {} files with {} threads", queue.size(), threads);
final ActionListener<Void> allFilesListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), threads);
for (int i = 0; i < threads; i++) {
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, allFilesListener);
copyFileFromRemoteStoreDirectory(
cancellableThreads,
source,
destination,
secondDestination,
queue,
onFileCompletion,
allFilesListener
);
}
}

private void copyOneFile(
private void copyFileFromRemoteStoreDirectory(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Expand Down Expand Up @@ -160,7 +195,15 @@ private void copyOneFile(
listener.onFailure(e);
return;
}
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, listener);
copyFileFromRemoteStoreDirectory(
cancellableThreads,
source,
destination,
secondDestination,
queue,
onFileCompletion,
listener
);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.remote.transfermanager;

import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

public class StatsTrackingDownloadManager extends DownloadManager {

private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

public StatsTrackingDownloadManager(
ThreadPool threadPool,
RecoverySettings recoverySettings,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
) {
super(threadPool, recoverySettings);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
}

public StatsTrackingDownloadManager(DownloadManager downloadManager, RemoteTranslogTransferTracker remoteTranslogTransferTracker) {
super(downloadManager);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
}

@Override
public byte[] downloadFileFromRemoteStoreToMemory(BlobContainer blobContainer, String fileName) throws IOException {
// TODO: Rewrite stats logic to remove hard-wiring to translog transfer tracker
// (maybe make RemoteTranslogTransferTracker methods interface dependent?)
long downloadStartTime = System.nanoTime();
byte[] data = null;
try {
data = super.downloadFileFromRemoteStoreToMemory(blobContainer, fileName);
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
if (data != null) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(data.length);
}
}
return data;
}

@Override
public void downloadFileFromRemoteStore(BlobContainer blobContainer, String fileName, Path location) throws IOException {
// TODO: Rewrite stats logic to remove hard-wiring to translog transfer tracker
boolean downloadStatus = false;
long bytesToRead = 0, downloadStartTime = System.nanoTime();
try {
super.downloadFileFromRemoteStore(blobContainer, fileName, location);
bytesToRead = Files.size(location.resolve(fileName));
downloadStatus = true;
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}

}
}
21 changes: 14 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.remote.transfermanager.DownloadManager;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.search.stats.ShardSearchStats;
import org.opensearch.index.seqno.ReplicationTracker;
Expand All @@ -161,7 +162,6 @@
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteStoreFileDownloader;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -344,7 +344,7 @@ Runnable getGlobalCheckpointSyncer() {
private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
private final RemoteStoreFileDownloader fileDownloader;
private final DownloadManager downloadManager;
private final RecoverySettings recoverySettings;

public IndexShard(
Expand Down Expand Up @@ -374,7 +374,8 @@ public IndexShard(
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
final String nodeId,
final RecoverySettings recoverySettings
final RecoverySettings recoverySettings,
DownloadManager downloadManager
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -471,7 +472,7 @@ public boolean shouldCache(Query query) {
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
this.recoverySettings = recoverySettings;
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
this.downloadManager = downloadManager;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -573,8 +574,8 @@ public RecoverySettings getRecoverySettings() {
return recoverySettings;
}

public RemoteStoreFileDownloader getFileDownloader() {
return fileDownloader;
public DownloadManager getDownloadManager() {
return downloadManager;
}

@Override
Expand Down Expand Up @@ -5009,7 +5010,13 @@ private String copySegmentFiles(

if (toDownloadSegments.isEmpty() == false) {
try {
fileDownloader.download(sourceRemoteDirectory, storeDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync);
downloadManager.copySegmentsFromRemoteStore(
sourceRemoteDirectory,
storeDirectory,
targetRemoteDirectory,
toDownloadSegments,
onFileSync
);
} catch (Exception e) {
throw new IOException("Error occurred when downloading segments from remote store", e);
}
Expand Down
Loading

0 comments on commit 5b9ad7d

Please sign in to comment.