Skip to content

Commit

Permalink
Merge branch 'main' into kp/integration-test-stalekeys
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 authored Apr 25, 2024
2 parents 931427c + db361ec commit e45119e
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 507 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.google.apis:google-api-services-compute` from v1-rev235-1.25.0 to v1-rev20240407-2.0.0 ([#13333](https://github.com/opensearch-project/OpenSearch/pull/13333))
- Bump `commons-cli:commons-cli` from 1.6.0 to 1.7.0 ([#13331](https://github.com/opensearch-project/OpenSearch/pull/13331))
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.11 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329))
- Bump `jakarta.enterprise:jakarta.enterprise.cdi-api` from 4.0.1 to 4.1.0 ([#13328](https://github.com/opensearch-project/OpenSearch/pull/13328))
- Bump `com.google.api.grpc:proto-google-iam-v1` from 0.12.0 to 1.33.0 ([#13332](https://github.com/opensearch-project/OpenSearch/pull/13332))

### Changed
- [BWC and API enforcement] Enforcing the presence of API annotations at build time ([#12872](https://github.com/opensearch-project/OpenSearch/pull/12872))
Expand All @@ -66,9 +68,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix from and size parameter can be negative when searching ([#13047](https://github.com/opensearch-project/OpenSearch/pull/13047))
- Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054))
- Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098))
- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)]
- Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812))
- Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012))
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))
- Fix mapper_parsing_exception when using flat_object fields with names longer than 11 characters ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13259))

### Security

Expand Down
2 changes: 1 addition & 1 deletion plugins/repository-gcs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ dependencies {
api 'com.google.api-client:google-api-client:2.2.0'

api 'com.google.api.grpc:proto-google-common-protos:2.37.1'
api 'com.google.api.grpc:proto-google-iam-v1:0.12.0'
api 'com.google.api.grpc:proto-google-iam-v1:1.33.0'

api "com.google.auth:google-auth-library-credentials:${versions.google_auth}"
api "com.google.auth:google-auth-library-oauth2-http:${versions.google_auth}"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4766da92d1f36c8b612c1c142d5f3ace3774f098
2 changes: 1 addition & 1 deletion qa/wildfly/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ apply plugin: 'opensearch.internal-distribution-download'
testFixtures.useFixture()

dependencies {
providedCompile('jakarta.enterprise:jakarta.enterprise.cdi-api:4.0.1') {
providedCompile('jakarta.enterprise:jakarta.enterprise.cdi-api:4.1.0') {
exclude module: 'jakarta.annotation-api'
}
providedCompile 'jakarta.ws.rs:jakarta.ws.rs-api:3.1.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ private void parseToken(StringBuilder path, String currentFieldName) throws IOEx
// skip
} else if (this.parser.currentToken() == Token.START_OBJECT) {
parseToken(path, currentFieldName);
int dotIndex = path.lastIndexOf(DOT_SYMBOL);
if (dotIndex != -1) {
int dotIndex = path.lastIndexOf(DOT_SYMBOL, path.length());

if (dotIndex != -1 && path.length() > currentFieldName.length()) {
path.setLength(path.length() - currentFieldName.length() - 1);
}
} else {
Expand All @@ -117,8 +118,8 @@ private void parseToken(StringBuilder path, String currentFieldName) throws IOEx
parseValue(parsedFields);
this.valueList.add(parsedFields.toString());
this.valueAndPathList.add(path + EQUAL_SYMBOL + parsedFields);
int dotIndex = path.lastIndexOf(DOT_SYMBOL);
if (dotIndex != -1) {
int dotIndex = path.lastIndexOf(DOT_SYMBOL, path.length());
if (dotIndex != -1 && path.length() > currentFieldName.length()) {
path.setLength(path.length() - currentFieldName.length() - 1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.service.CacheService;
Expand Down Expand Up @@ -229,19 +230,16 @@ void clear(CacheEntity entity) {
public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notification) {
// In case this event happens for an old shard, we can safely ignore this as we don't keep track for old
// shards as part of request cache.

// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
Key key = notification.getKey().key;
RemovalNotification<Key, BytesReference> newNotification = new RemovalNotification<>(
key,
notification.getValue(),
notification.getRemovalReason()
);

cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(newNotification));
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction(
new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId)
);
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, newNotification);
}

private ICacheKey<Key> getICacheKey(Key key) {
Expand Down Expand Up @@ -285,10 +283,11 @@ BytesReference getOrCompute(
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
}
}
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey);
cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey);
} else {
cacheEntity.onHit();
}

return value;
}

Expand Down Expand Up @@ -560,23 +559,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
}
ShardId shardId = indexShard.shardId();

cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> {
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> {
// decrement the stale key count
cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> {
if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) {
// If ShardId is not present or readerCacheKeyId is not present
// it should have already been accounted for and hence been removed from this map
// so decrement staleKeysCount
staleKeysCount.decrementAndGet();
int newValue = currentValue - 1;
// Remove the key if the new value is zero by returning null; otherwise, update with the new value.
return newValue == 0 ? null : newValue;
});
return keyCountMap;
// Return the current map
return readerCacheKeyMap;
} else {
// If it is in the map, it is not stale yet.
// Proceed to adjust the count for the readerCacheKeyId in the map
// but do not decrement the staleKeysCount
Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId);
// this should never be null
assert (count != null && count >= 0);
// Reduce the count by 1
int newCount = count - 1;
if (newCount > 0) {
// Update the map with the new count
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
} else {
// Remove the readerCacheKeyId entry if new count is zero
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
}
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
}
});
}

/**
* Updates the count of stale keys in the cache.
* This method is called when a CleanupKey is added to the keysToClean set.
*
* It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* <p>It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
*
Expand All @@ -594,7 +611,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
ShardId shardId = indexShard.shardId();

// Using computeIfPresent to atomically operate on the countMap for a given shardId
cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> {
cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> {
if (cleanupKey.readerCacheKeyId == null) {
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum();
Expand All @@ -603,18 +620,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
return null;
} else {
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> {
staleKeysCount.addAndGet(v);
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> {
staleKeysCount.addAndGet(count);
// Return null to remove the key after updating staleKeysCount
return null;
});

// Check if countMap is empty after removal to decide if we need to remove the shardId entry
if (countMap.isEmpty()) {
return null; // Returning null removes the entry for shardId
// Returning null removes the entry for shardId
return null;
}
}
return countMap; // Return the modified countMap to keep the mapping
// Return the modified countMap to retain updates
return countMap;
});
}

Expand Down Expand Up @@ -740,6 +758,11 @@ public void close() {
this.cacheCleaner.close();
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

private final class IndicesRequestCacheCleaner implements Runnable, Releasable {

private final IndicesRequestCacheCleanupManager cacheCleanupManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,49 @@ public void testDocValue() throws Exception {
assertEquals(1, valueReaders.size());
}

public void testLongFieldNameWithHashArray() throws Exception {
String mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("test")
.startObject("properties")
.startObject("field")
.field("type", FIELD_TYPE)
.endObject()
.endObject()
.endObject()
.endObject()
.toString();
final DocumentMapper mapper = mapperService.documentMapperParser().parse("test", new CompressedXContent(mapping));

XContentBuilder json = XContentFactory.jsonBuilder()
.startObject()
.startObject("field")
.startObject("detail")
.startArray("fooooooooooo")
.startObject()
.field("name", "baz")
.endObject()
.startObject()
.field("name", "baz")
.endObject()
.endArray()
.endObject()
.endObject()
.endObject();

ParsedDocument d = mapper.parse(new SourceToParse("test", "1", BytesReference.bytes(json), MediaTypeRegistry.JSON));
writer.addDocument(d.rootDoc());
writer.commit();

IndexFieldData<?> fieldData = getForField("field");
List<LeafReaderContext> readers = refreshReader();
assertEquals(1, readers.size());

IndexFieldData<?> valueFieldData = getForField("field._value");
List<LeafReaderContext> valueReaders = refreshReader();
assertEquals(1, valueReaders.size());
}

@Override
protected String getFieldDataType() {
return FIELD_TYPE;
Expand Down
Loading

0 comments on commit e45119e

Please sign in to comment.