Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] [2.x] feat: add vertical scaling and SoftReference for snapshot repository data cache (#16489) #16624

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Increase segrep pressure checkpoint default limit to 30 ([#16577](https://github.com/opensearch-project/OpenSearch/pull/16577/files))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))

### Dependencies
- Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241021-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502), [#16548](https://github.com/opensearch-project/OpenSearch/pull/16548))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ public void apply(Settings value, Settings current, Settings previous) {
// Snapshot related Settings
BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING,
BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING,
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,

// Composite index settings
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.IndexMetaDataGenerations;
Expand All @@ -168,6 +169,7 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.SoftReference;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -197,6 +199,7 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;
Expand Down Expand Up @@ -254,6 +257,23 @@
*/
public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";

public static final String SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME = "snapshot.repository_data.cache.threshold";

public static final double SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE = 0.01;

public static final long CACHE_MIN_THRESHOLD = ByteSizeUnit.KB.toBytes(500);

public static final long CACHE_MAX_THRESHOLD = calculateMaxSnapshotRepositoryDataCacheThreshold();

public static final long CACHE_DEFAULT_THRESHOLD = calculateDefaultSnapshotRepositoryDataCacheThreshold();

/**
* Set to Integer.MAX_VALUE - 8 to prevent OutOfMemoryError due to array header requirements, following the limit used in certain JDK versions.
* This ensures compatibility across various JDK versions. For a practical usage example,
* see this link: https://github.com/openjdk/jdk11u/blob/cee8535a9d3de8558b4b5028d68e397e508bef71/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ByteArrayChannel.java#L226
*/
private static final int MAX_SAFE_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
* contents will not result in the repository being marked as corrupted.
Expand All @@ -276,6 +296,58 @@
Setting.Property.Deprecated
);

/**
* Sets the cache size for snapshot repository data: the valid range is within 500Kb ... 1% of the node heap memory.
*/
public static final Setting<ByteSizeValue> SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD = new Setting<>(
SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME,
CACHE_DEFAULT_THRESHOLD + "b",
(s) -> {
ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME);
long userDefinedLimitBytes = userDefinedLimit.getBytes();

if (userDefinedLimitBytes > CACHE_MAX_THRESHOLD) {
throw new IllegalArgumentException(
"["
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
+ "] cannot be larger than ["
+ CACHE_MAX_THRESHOLD
+ "] bytes."
);
}

if (userDefinedLimitBytes < CACHE_MIN_THRESHOLD) {
throw new IllegalArgumentException(

Check warning on line 320 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L320

Added line #L320 was not covered by tests
"["
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
+ "] cannot be smaller than ["
+ CACHE_MIN_THRESHOLD
+ "] bytes."
);
}

return userDefinedLimit;
},
Setting.Property.NodeScope
);

public static long calculateDefaultSnapshotRepositoryDataCacheThreshold() {
return Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2);
}

public static long calculateMaxSnapshotRepositoryDataCacheThreshold() {
long jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
long defaultThresholdOfHeap = (long) (jvmHeapSize * SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);
long maxThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

return maxThreshold;
}

protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, long defaultAbsoluteThreshold) {
return Math.min(Math.max(defaultThresholdOfHeap, defaultAbsoluteThreshold), MAX_SAFE_ARRAY_SIZE);
}

/**
* Size hint for the IO buffer size to use when reading from and writing to the repository.
*/
Expand Down Expand Up @@ -462,6 +534,8 @@

private volatile boolean enableAsyncDeletion;

protected final long repositoryDataCacheThreshold;

/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
Expand Down Expand Up @@ -519,6 +593,7 @@
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings());
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion);
this.repositoryDataCacheThreshold = SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD.get(clusterService.getSettings()).getBytes();
}

@Override
Expand Down Expand Up @@ -1157,7 +1232,8 @@
cached = null;
} else {
genToLoad = latestKnownRepoGen.get();
cached = latestKnownRepositoryData.get();
SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
cached = (softRef != null) ? softRef.get() : null;
}
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
Expand Down Expand Up @@ -3025,15 +3101,19 @@
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);

// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
private final AtomicReference<SoftReference<Tuple<Long, BytesReference>>> latestKnownRepositoryData = new AtomicReference<>(
new SoftReference<>(null)
);

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null));
return;
}
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;

// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
Expand Down Expand Up @@ -3082,7 +3162,8 @@
genToLoad = latestKnownRepoGen.get();
}
try {
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
Expand Down Expand Up @@ -3149,19 +3230,22 @@
try {
serialized = CompressorRegistry.defaultCompressor().compress(updated);
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
long cacheWarningThreshold = Math.min(repositoryDataCacheThreshold * 10, MAX_SAFE_ARRAY_SIZE);
if (len > repositoryDataCacheThreshold) {
logger.debug(
"Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in"
"Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in"
+ " serialized size",
len,
metadata.name()
metadata.name(),
repositoryDataCacheThreshold

Check warning on line 3240 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3239-L3240

Added lines #L3239 - L3240 were not covered by tests
);
if (len > ByteSizeUnit.MB.toBytes(5)) {
if (len > cacheWarningThreshold) {
logger.warn(
"Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh"
"Your repository metadata blob for repository [{}] is larger than [{}] bytes. Consider moving to a fresh"
+ " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable"
+ " repository behavior going forward.",
metadata.name()
metadata.name(),
cacheWarningThreshold

Check warning on line 3248 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3247-L3248

Added lines #L3247 - L3248 were not covered by tests
);
}
// Set empty repository data to not waste heap for an outdated cached value
Expand All @@ -3173,11 +3257,12 @@
logger.warn("Failed to serialize repository data", e);
return;
}
latestKnownRepositoryData.updateAndGet(known -> {
latestKnownRepositoryData.updateAndGet(knownRef -> {
Tuple<Long, BytesReference> known = (knownRef != null) ? knownRef.get() : null;
if (known != null && known.v1() > generation) {
return known;
return knownRef;

Check warning on line 3263 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3263

Added line #L3263 was not covered by tests
}
return new Tuple<>(generation, serialized);
return new SoftReference<>(new Tuple<>(generation, serialized));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@

import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -127,22 +129,75 @@ public void testIndicesFieldDataCacheSetting() {
);
}

public void testSnapshotRepositoryDataCacheSizeSetting() {
assertMemorySizeSettingInRange(
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,
"snapshot.repository_data.cache.threshold",
new ByteSizeValue(BlobStoreRepository.calculateDefaultSnapshotRepositoryDataCacheThreshold()),
ByteSizeUnit.KB.toBytes(500),
1.0
);
}

private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue) {
assertMemorySizeSetting(setting, settingKey, defaultValue, Settings.EMPTY);
}

private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue, Settings settings) {
assertMemorySizeSetting(setting, settingKey, defaultValue, 25.0, 1024, settings);
}

private void assertMemorySizeSetting(
Setting<ByteSizeValue> setting,
String settingKey,
ByteSizeValue defaultValue,
double availablePercentage,
long availableBytes,
Settings settings
) {
assertThat(setting, notNullValue());
assertThat(setting.getKey(), equalTo(settingKey));
assertThat(setting.getProperties(), hasItem(Property.NodeScope));
assertThat(setting.getDefault(settings), equalTo(defaultValue));
Settings settingWithPercentage = Settings.builder().put(settingKey, "25%").build();
Settings settingWithPercentage = Settings.builder().put(settingKey, percentageAsString(availablePercentage)).build();
assertThat(
setting.get(settingWithPercentage),
equalTo(new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.25)))
equalTo(
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * percentageAsFraction(availablePercentage)))
)
);
Settings settingWithBytesValue = Settings.builder().put(settingKey, "1024b").build();
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(1024)));
Settings settingWithBytesValue = Settings.builder().put(settingKey, availableBytes + "b").build();
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(availableBytes)));
}

private void assertMemorySizeSettingInRange(
Setting<ByteSizeValue> setting,
String settingKey,
ByteSizeValue defaultValue,
long minBytes,
double maxPercentage
) {
assertMemorySizeSetting(setting, settingKey, defaultValue, maxPercentage, minBytes, Settings.EMPTY);

assertThrows(IllegalArgumentException.class, () -> {
Settings settingWithTooSmallValue = Settings.builder().put(settingKey, minBytes - 1).build();
setting.get(settingWithTooSmallValue);
});

assertThrows(IllegalArgumentException.class, () -> {
double unavailablePercentage = maxPercentage + 0.1;
Settings settingWithPercentageExceedingLimit = Settings.builder()
.put(settingKey, percentageAsString(unavailablePercentage))
.build();
setting.get(settingWithPercentageExceedingLimit);
});
}

private double percentageAsFraction(double availablePercentage) {
return availablePercentage / 100.0;
}

private String percentageAsString(double availablePercentage) {
return availablePercentage + "%";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.stream.Collectors;

import static org.opensearch.repositories.RepositoryDataTests.generateRandomRepoData;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.calculateMaxWithinIntLimit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -653,4 +654,53 @@ public void testGetRestrictedSystemRepositorySettings() {
assertTrue(settings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY));
repository.close();
}

public void testSnapshotRepositoryDataCacheDefaultSetting() {
// given
BlobStoreRepository repository = setupRepo();
long maxThreshold = BlobStoreRepository.calculateMaxSnapshotRepositoryDataCacheThreshold();

// when
long expectedThreshold = Math.max(ByteSizeUnit.KB.toBytes(500), maxThreshold / 2);

// then
assertEquals(repository.repositoryDataCacheThreshold, expectedThreshold);
}

public void testHeapThresholdUsed() {
// given
long defaultThresholdOfHeap = ByteSizeUnit.GB.toBytes(1);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(defaultThresholdOfHeap, expectedThreshold);
}

public void testAbsoluteThresholdUsed() {
// given
long defaultThresholdOfHeap = ByteSizeUnit.KB.toBytes(499);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long result = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(defaultAbsoluteThreshold, result);
}

public void testThresholdCappedAtIntMax() {
// given
int maxSafeArraySize = Integer.MAX_VALUE - 8;
long defaultThresholdOfHeap = (long) maxSafeArraySize + 1;
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(maxSafeArraySize, expectedThreshold);
}
}
Loading