Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Aug 28, 2023
1 parent 628bbd0 commit 62f5929
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class S3BlobStore implements BlobStore {

private final StorageClass storageClass;

private final RepositoryMetadata repositoryMetadata;
private RepositoryMetadata repositoryMetadata;

private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher();

Expand Down Expand Up @@ -105,6 +105,16 @@ class S3BlobStore implements BlobStore {
this.priorityExecutorBuilder = priorityExecutorBuilder;
}

@Override
public boolean isReloadable() {
return true;
}

@Override
public void reload(RepositoryMetadata repositoryMetadata) {
this.repositoryMetadata = repositoryMetadata;
}

public boolean isMultipartUploadEnabled() {
return multipartUploadEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class S3Repository extends MeteredBlobStoreRepository {

private final String cannedACL;

private final RepositoryMetadata repositoryMetadata;
private RepositoryMetadata repositoryMetadata;

private final AsyncTransferManager asyncUploadUtils;
private final S3AsyncService s3AsyncService;
Expand Down Expand Up @@ -383,6 +383,29 @@ public BlobPath basePath() {
return basePath;
}

@Override
public boolean isReloadable() {
return true;
}

@Override
public void reload(RepositoryMetadata newRepositoryMetadata, boolean compress) {
// Reload configs for S3Repository
super.reload(newRepositoryMetadata, COMPRESS_SETTING.get(newRepositoryMetadata.settings()));
repositoryMetadata = newRepositoryMetadata;

// Reload configs for S3RepositoryPlugin
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings, configPath);
service.refreshAndClearCache(clientsSettings);
s3AsyncService.refreshAndClearCache(clientsSettings);

// Reload configs for S3BlobStore
BlobStore blobStore = getBlobStore();
blobStore.reload(newRepositoryMetadata);
}



@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

package org.opensearch.common.blobstore;

import org.opensearch.cluster.metadata.RepositoryMetadata;

import java.io.Closeable;
import java.util.Collections;
import java.util.Map;
Expand All @@ -53,4 +55,17 @@ public interface BlobStore extends Closeable {
default Map<String, Long> stats() {
return Collections.emptyMap();
}

/**
* Checks if the blob store can be reloaded inplace or not
* @return true if the blob store can be reloaded inplace, false otherwise
*/
default boolean isReloadable() {
return false;
}

/**
* Reload the blob store inplace
*/
default void reload(RepositoryMetadata repositoryMetadata) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,18 +425,22 @@ public void applyClusterState(ClusterChangedEvent event) {
|| previousMetadata.settings().equals(repositoryMetadata.settings()) == false) {
// Previous version is different from the version in settings
logger.debug("updating repository [{}]", repositoryMetadata.name());
closeRepository(repository);
archiveRepositoryStats(repository, state.version());
repository = null;
try {
repository = createRepository(repositoryMetadata, typesRegistry);
} catch (RepositoryException ex) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
logger.warn(
() -> new ParameterizedMessage("failed to change repository [{}]", repositoryMetadata.name()),
ex
);
if (repository.isReloadable()) {
repository.reload(repositoryMetadata);
} else {
closeRepository(repository);
archiveRepositoryStats(repository, state.version());
repository = null;
try {
repository = createRepository(repositoryMetadata, typesRegistry);
} catch (RepositoryException ex) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
logger.warn(
() -> new ParameterizedMessage("failed to change repository [{}]", repositoryMetadata.name()),
ex
);
}
}
}
} else {
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/repositories/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,4 +423,17 @@ default void cloneRemoteStoreIndexShardSnapshot(
default Map<String, Object> adaptUserMetadata(Map<String, Object> userMetadata) {
return userMetadata;
}

/**
* Checks if the repository can be reloaded inplace or not
* @return true if the repository can be reloaded inplace, false otherwise
*/
default boolean isReloadable() {
return false;
}

/**
* Reload the repository inplace
*/
default void reload(RepositoryMetadata repositoryMetadata, boolean compress) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,17 +283,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Setting.Property.NodeScope);

protected final boolean supportURLRepo;
protected boolean supportURLRepo;

private final int maxShardBlobDeleteBatch;
private int maxShardBlobDeleteBatch;

private final Compressor compressor;
private Compressor compressor;

private final boolean cacheRepositoryData;
private boolean cacheRepositoryData;

private final RateLimiter snapshotRateLimiter;
private RateLimiter snapshotRateLimiter;

private final RateLimiter restoreRateLimiter;
private RateLimiter restoreRateLimiter;

private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();

Expand Down Expand Up @@ -334,7 +334,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
BlobStoreIndexShardSnapshots::fromXContent
);

private final boolean readOnly;
private boolean readOnly;

private final Object lock = new Object();

Expand Down Expand Up @@ -376,33 +376,39 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
/**
* IO buffer size hint for reading and writing to the underlying blob store.
*/
protected final int bufferSize;
protected int bufferSize;

/**
* Constructs new BlobStoreRepository
* @param metadata The metadata for this repository including name and settings
* @param repositoryMetadata The metadata for this repository including name and settings
* @param clusterService ClusterService
*/
protected BlobStoreRepository(
final RepositoryMetadata metadata,
final RepositoryMetadata repositoryMetadata,
final boolean compress,
final NamedXContentRegistry namedXContentRegistry,
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
this.metadata = metadata;
reload(repositoryMetadata, compress);
this.namedXContentRegistry = namedXContentRegistry;
this.threadPool = clusterService.getClusterApplierService().threadPool();
this.clusterService = clusterService;
this.recoverySettings = recoverySettings;
this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings());
}

@Override
public void reload(RepositoryMetadata repositoryMetadata, boolean compress) {
this.metadata = repositoryMetadata;

supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings());
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = READONLY_SETTING.get(metadata.settings());
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()) : CompressorRegistry.none();
compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()) : CompressorRegistry.none();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public MeteredBlobStoreRepository(
);
}

@Override
public void reload(RepositoryMetadata repositoryMetadata, boolean compress) {
super.reload(repositoryMetadata, compress);
// Note: repositoryInfo parameters cannot be changed
}

public RepositoryStatsSnapshot statsSnapshot() {
return new RepositoryStatsSnapshot(repositoryInfo, stats(), RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,14 @@ protected ByteSizeValue chunkSize() {
public BlobPath basePath() {
return basePath;
}

@Override
public boolean isReloadable() {
return true;
}

@Override
public void reload(RepositoryMetadata repositoryMetadata, boolean compress) {
// TODO
}
}

0 comments on commit 62f5929

Please sign in to comment.