Skip to content

Commit

Permalink
WIP - 2
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Sep 12, 2023
1 parent 35b8e00 commit d30113f
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.settings.SecureString;
Expand All @@ -62,6 +63,7 @@
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.threadpool.Scheduler;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -224,6 +226,34 @@ class S3Repository extends MeteredBlobStoreRepository {
private final boolean multipartUploadEnabled;
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final Path pluginConfigPath;

S3Repository(
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service,
final ClusterService clusterService,
final RecoverySettings recoverySettings,
final AsyncTransferManager asyncUploadUtils,
final AsyncExecutorContainer priorityExecutorBuilder,
final AsyncExecutorContainer normalExecutorBuilder,
final S3AsyncService s3AsyncService,
final boolean multipartUploadEnabled
) {
this(
metadata,
namedXContentRegistry,
service,
clusterService,
recoverySettings,
asyncUploadUtils,
priorityExecutorBuilder,
normalExecutorBuilder,
s3AsyncService,
multipartUploadEnabled,
Path.of("")
);
}

/**
* Constructs an s3 backed repository
Expand All @@ -238,7 +268,8 @@ class S3Repository extends MeteredBlobStoreRepository {
final AsyncExecutorContainer priorityExecutorBuilder,
final AsyncExecutorContainer normalExecutorBuilder,
final S3AsyncService s3AsyncService,
final boolean multipartUploadEnabled
final boolean multipartUploadEnabled,
Path pluginConfigPath
) {
super(
metadata,
Expand All @@ -251,6 +282,7 @@ class S3Repository extends MeteredBlobStoreRepository {
this.service = service;
this.s3AsyncService = s3AsyncService;
this.multipartUploadEnabled = multipartUploadEnabled;
this.pluginConfigPath = pluginConfigPath;

this.repositoryMetadata = metadata;
this.asyncUploadUtils = asyncUploadUtils;
Expand Down Expand Up @@ -391,11 +423,12 @@ public boolean isReloadable() {
@Override
public void reload(RepositoryMetadata newRepositoryMetadata, boolean compress) {
// Reload configs for S3Repository
super.reload(newRepositoryMetadata, COMPRESS_SETTING.get(newRepositoryMetadata.settings()));
super.reload(newRepositoryMetadata, compress);
repositoryMetadata = newRepositoryMetadata;

// Reload configs for S3RepositoryPlugin
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings, configPath);
Settings settings = clusterService.getSettings();
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings, pluginConfigPath);
service.refreshAndClearCache(clientsSettings);
s3AsyncService.refreshAndClearCache(clientsSettings);

Expand All @@ -404,8 +437,6 @@ public void reload(RepositoryMetadata newRepositoryMetadata, boolean compress) {
blobStore.reload(newRepositoryMetadata);
}



@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ protected S3Repository createRepository(
priorityExecutorBuilder,
normalExecutorBuilder,
s3AsyncService,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings())
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()),
configPath
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.COMPRESS_SETTING;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY;

/**
Expand Down Expand Up @@ -457,8 +458,8 @@ public void applyClusterState(ClusterChangedEvent event) {
|| previousMetadata.settings().equals(repositoryMetadata.settings()) == false) {
// Previous version is different from the version in settings
logger.debug("updating repository [{}]", repositoryMetadata.name());
if (repository.isReloadable()) {
repository.reload(repositoryMetadata);
if (repository.isSystemRepository() && repository.isReloadable()) {
repository.reload(repositoryMetadata, COMPRESS_SETTING.get(repositoryMetadata.settings()));
} else {
closeRepository(repository);
archiveRepositoryStats(repository, state.version());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private RateLimiter restoreRateLimiter;

private final RateLimiter remoteUploadRateLimiter;
private RateLimiter remoteUploadRateLimiter;

private final RateLimiter remoteDownloadRateLimiter;
private RateLimiter remoteDownloadRateLimiter;

private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();

Expand Down Expand Up @@ -356,15 +356,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private boolean readOnly;

private final boolean isSystemRepository;
private boolean isSystemRepository;

private final Object lock = new Object();

private final SetOnce<BlobContainer> blobContainer = new SetOnce<>();

private final SetOnce<BlobStore> blobStore = new SetOnce<>();

private final ClusterService clusterService;
protected final ClusterService clusterService;

private final RecoverySettings recoverySettings;

Expand Down Expand Up @@ -413,6 +413,8 @@ protected BlobStoreRepository(
final RecoverySettings recoverySettings
) {
reload(repositoryMetadata, compress);

isSystemRepository = SYSTEM_REPOSITORY_SETTING.get(metadata.settings());
this.namedXContentRegistry = namedXContentRegistry;
this.threadPool = clusterService.getClusterApplierService().threadPool();
this.clusterService = clusterService;
Expand All @@ -429,7 +431,6 @@ public void reload(RepositoryMetadata repositoryMetadata, boolean compress) {
remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO);
remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = READONLY_SETTING.get(metadata.settings());
isSystemRepository = SYSTEM_REPOSITORY_SETTING.get(metadata.settings());
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public MeteredBlobStoreRepository(
@Override
public void reload(RepositoryMetadata repositoryMetadata, boolean compress) {
super.reload(repositoryMetadata, compress);
// Note: repositoryInfo parameters cannot be changed

// Not adding any additional reload logic here is intentional as the constructor only
// initializes the repositoryInfo from the repo metadata, which cannot be changed.
}

public RepositoryStatsSnapshot statsSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ public class FsRepository extends BlobStoreRepository {

public static final Setting<String> BASE_PATH_SETTING = Setting.simpleString("base_path");

private final Environment environment;
protected final Environment environment;

private ByteSizeValue chunkSize;
protected ByteSizeValue chunkSize;

private final BlobPath basePath;
protected BlobPath basePath;

/**
* Constructs a shared file system repository.
Expand Down Expand Up @@ -187,14 +187,4 @@ protected ByteSizeValue chunkSize() {
public BlobPath basePath() {
return basePath;
}

@Override
public boolean isReloadable() {
return true;
}

@Override
public void reload(RepositoryMetadata repositoryMetadata, boolean compress) {
// TODO
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.fs;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.RepositoryException;

import java.nio.file.Path;

public class ReloadableFsRepository extends FsRepository {
private static final Logger logger = LogManager.getLogger(ReloadableFsRepository.class);

/**
* Constructs a shared file system repository.
*
* @param metadata
* @param environment
* @param namedXContentRegistry
* @param clusterService
* @param recoverySettings
*/
public ReloadableFsRepository(
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings);
}

@Override
public boolean isReloadable() {
return true;
}

@Override
public void reload(RepositoryMetadata repositoryMetadata, boolean compress) {
super.reload(repositoryMetadata, compress);
metadata = repositoryMetadata;

// TODO - deduplicate the below block
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
if (location.isEmpty()) {
logger.warn(
"the repository location is missing, it should point to a shared file system location"
+ " that is available on all cluster-manager and data nodes"
);
throw new RepositoryException(metadata.name(), "missing location");
}
Path locationFile = environment.resolveRepoFile(location);
if (locationFile == null) {
if (environment.repoFiles().length > 0) {
logger.warn(
"The specified location [{}] doesn't start with any " + "repository paths specified by the path.repo setting: [{}] ",
location,
environment.repoFiles()
);
throw new RepositoryException(
metadata.name(),
"location [" + location + "] doesn't match any of the locations specified by path.repo"
);
} else {
logger.warn(
"The specified location [{}] should start with a repository path specified by"
+ " the path.repo setting, but the path.repo setting was not set on this node",
location
);
throw new RepositoryException(
metadata.name(),
"location [" + location + "] doesn't match any of the locations specified by path.repo because this setting is empty"
);
}
}

if (CHUNK_SIZE_SETTING.exists(metadata.settings())) {
chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());
} else {
this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(environment.settings());
}

final String path = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(path)) {
basePath = new BlobPath().add(path);
} else {
this.basePath = BlobPath.cleanPath();
}
}
}

0 comments on commit d30113f

Please sign in to comment.