From d30113fc89a231cf2def7458867063aa9a6e4467 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 12 Sep 2023 16:25:27 +0530 Subject: [PATCH] WIP - 2 Signed-off-by: Bhumika Saini --- .../repositories/s3/S3Repository.java | 41 ++++++- .../repositories/s3/S3RepositoryPlugin.java | 3 +- .../repositories/RepositoriesService.java | 5 +- .../blobstore/BlobStoreRepository.java | 11 +- .../blobstore/MeteredBlobStoreRepository.java | 4 +- .../repositories/fs/FsRepository.java | 16 +-- .../fs/ReloadableFsRepository.java | 103 ++++++++++++++++++ 7 files changed, 156 insertions(+), 27 deletions(-) create mode 100644 server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index ba67b74d82dbb..88859f378ef4c 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -44,6 +44,7 @@ import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.SecureSetting; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.settings.SecureString; @@ -62,6 +63,7 @@ import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.threadpool.Scheduler; +import java.nio.file.Path; import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -224,6 +226,34 @@ class S3Repository extends MeteredBlobStoreRepository { private final boolean multipartUploadEnabled; private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; + private final Path pluginConfigPath; + + S3Repository( + final RepositoryMetadata metadata, + final NamedXContentRegistry namedXContentRegistry, + final S3Service service, + final ClusterService clusterService, + final RecoverySettings recoverySettings, + final AsyncTransferManager asyncUploadUtils, + final AsyncExecutorContainer priorityExecutorBuilder, + final AsyncExecutorContainer normalExecutorBuilder, + final S3AsyncService s3AsyncService, + final boolean multipartUploadEnabled + ) { + this( + metadata, + namedXContentRegistry, + service, + clusterService, + recoverySettings, + asyncUploadUtils, + priorityExecutorBuilder, + normalExecutorBuilder, + s3AsyncService, + multipartUploadEnabled, + Path.of("") + ); + } /** * Constructs an s3 backed repository @@ -238,7 +268,8 @@ class S3Repository extends MeteredBlobStoreRepository { final AsyncExecutorContainer priorityExecutorBuilder, final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, - final boolean multipartUploadEnabled + final boolean multipartUploadEnabled, + Path pluginConfigPath ) { super( metadata, @@ -251,6 +282,7 @@ class S3Repository extends MeteredBlobStoreRepository { this.service = service; this.s3AsyncService = s3AsyncService; this.multipartUploadEnabled = multipartUploadEnabled; + this.pluginConfigPath = pluginConfigPath; this.repositoryMetadata = metadata; this.asyncUploadUtils = asyncUploadUtils; @@ -391,11 +423,12 @@ public boolean isReloadable() { @Override public void reload(RepositoryMetadata newRepositoryMetadata, boolean compress) { // Reload configs for S3Repository - super.reload(newRepositoryMetadata, COMPRESS_SETTING.get(newRepositoryMetadata.settings())); + super.reload(newRepositoryMetadata, compress); repositoryMetadata = newRepositoryMetadata; // Reload configs for S3RepositoryPlugin - final Map clientsSettings = S3ClientSettings.load(settings, configPath); + Settings settings = clusterService.getSettings(); + final Map clientsSettings = S3ClientSettings.load(settings, pluginConfigPath); service.refreshAndClearCache(clientsSettings); s3AsyncService.refreshAndClearCache(clientsSettings); @@ -404,8 +437,6 @@ public void reload(RepositoryMetadata newRepositoryMetadata, boolean compress) { blobStore.reload(newRepositoryMetadata); } - - @Override protected ByteSizeValue chunkSize() { return chunkSize; diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index 6ef60474afe8c..a80ee0ca35fae 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -182,7 +182,8 @@ protected S3Repository createRepository( priorityExecutorBuilder, normalExecutorBuilder, s3AsyncService, - S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()) + S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()), + configPath ); } diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 74315e7191709..fa4cf99328ac2 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -83,6 +83,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.COMPRESS_SETTING; import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; /** @@ -457,8 +458,8 @@ public void applyClusterState(ClusterChangedEvent event) { || previousMetadata.settings().equals(repositoryMetadata.settings()) == false) { // Previous version is different from the version in settings logger.debug("updating repository [{}]", repositoryMetadata.name()); - if (repository.isReloadable()) { - repository.reload(repositoryMetadata); + if (repository.isSystemRepository() && repository.isReloadable()) { + repository.reload(repositoryMetadata, COMPRESS_SETTING.get(repositoryMetadata.settings())); } else { closeRepository(repository); archiveRepositoryStats(repository, state.version()); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 09a2e5c403b0e..5799d913d2342 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -307,9 +307,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private RateLimiter restoreRateLimiter; - private final RateLimiter remoteUploadRateLimiter; + private RateLimiter remoteUploadRateLimiter; - private final RateLimiter remoteDownloadRateLimiter; + private RateLimiter remoteDownloadRateLimiter; private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric(); @@ -356,7 +356,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private boolean readOnly; - private final boolean isSystemRepository; + private boolean isSystemRepository; private final Object lock = new Object(); @@ -364,7 +364,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final SetOnce blobStore = new SetOnce<>(); - private final ClusterService clusterService; + protected final ClusterService clusterService; private final RecoverySettings recoverySettings; @@ -413,6 +413,8 @@ protected BlobStoreRepository( final RecoverySettings recoverySettings ) { reload(repositoryMetadata, compress); + + isSystemRepository = SYSTEM_REPOSITORY_SETTING.get(metadata.settings()); this.namedXContentRegistry = namedXContentRegistry; this.threadPool = clusterService.getClusterApplierService().threadPool(); this.clusterService = clusterService; @@ -429,7 +431,6 @@ public void reload(RepositoryMetadata repositoryMetadata, boolean compress) { remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO); remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO); readOnly = READONLY_SETTING.get(metadata.settings()); - isSystemRepository = SYSTEM_REPOSITORY_SETTING.get(metadata.settings()); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java index 0dcc4f51db1ec..5f076b2c5fcfd 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java @@ -73,7 +73,9 @@ public MeteredBlobStoreRepository( @Override public void reload(RepositoryMetadata repositoryMetadata, boolean compress) { super.reload(repositoryMetadata, compress); - // Note: repositoryInfo parameters cannot be changed + + // Not adding any additional reload logic here is intentional as the constructor only + // initializes the repositoryInfo from the repo metadata, which cannot be changed. } public RepositoryStatsSnapshot statsSnapshot() { diff --git a/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java b/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java index c67c547743658..dcd528dc15f01 100644 --- a/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java @@ -101,11 +101,11 @@ public class FsRepository extends BlobStoreRepository { public static final Setting BASE_PATH_SETTING = Setting.simpleString("base_path"); - private final Environment environment; + protected final Environment environment; - private ByteSizeValue chunkSize; + protected ByteSizeValue chunkSize; - private final BlobPath basePath; + protected BlobPath basePath; /** * Constructs a shared file system repository. @@ -187,14 +187,4 @@ protected ByteSizeValue chunkSize() { public BlobPath basePath() { return basePath; } - - @Override - public boolean isReloadable() { - return true; - } - - @Override - public void reload(RepositoryMetadata repositoryMetadata, boolean compress) { - // TODO - } } diff --git a/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java b/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java new file mode 100644 index 0000000000000..563809b455059 --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.fs; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.RepositoryException; + +import java.nio.file.Path; + +public class ReloadableFsRepository extends FsRepository { + private static final Logger logger = LogManager.getLogger(ReloadableFsRepository.class); + + /** + * Constructs a shared file system repository. + * + * @param metadata + * @param environment + * @param namedXContentRegistry + * @param clusterService + * @param recoverySettings + */ + public ReloadableFsRepository( + RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + } + + @Override + public boolean isReloadable() { + return true; + } + + @Override + public void reload(RepositoryMetadata repositoryMetadata, boolean compress) { + super.reload(repositoryMetadata, compress); + metadata = repositoryMetadata; + + // TODO - deduplicate the below block + String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); + if (location.isEmpty()) { + logger.warn( + "the repository location is missing, it should point to a shared file system location" + + " that is available on all cluster-manager and data nodes" + ); + throw new RepositoryException(metadata.name(), "missing location"); + } + Path locationFile = environment.resolveRepoFile(location); + if (locationFile == null) { + if (environment.repoFiles().length > 0) { + logger.warn( + "The specified location [{}] doesn't start with any " + "repository paths specified by the path.repo setting: [{}] ", + location, + environment.repoFiles() + ); + throw new RepositoryException( + metadata.name(), + "location [" + location + "] doesn't match any of the locations specified by path.repo" + ); + } else { + logger.warn( + "The specified location [{}] should start with a repository path specified by" + + " the path.repo setting, but the path.repo setting was not set on this node", + location + ); + throw new RepositoryException( + metadata.name(), + "location [" + location + "] doesn't match any of the locations specified by path.repo because this setting is empty" + ); + } + } + + if (CHUNK_SIZE_SETTING.exists(metadata.settings())) { + chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings()); + } else { + this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(environment.settings()); + } + + final String path = BASE_PATH_SETTING.get(metadata.settings()); + if (Strings.hasLength(path)) { + basePath = new BlobPath().add(path); + } else { + this.basePath = BlobPath.cleanPath(); + } + } +}