Skip to content

Commit

Permalink
fix: 17218: Learner: ERROR "NullPointerException: Cannot invoke "com.…
Browse files Browse the repository at this point in the history
…swirlds.virtualmap.internal.cache.VirtualNodeCache$Mutation.isDeleted()" (#17237)

Fixes: #17218
Reviewed-by: Alex Kuzmin <[email protected]>, Ivan Malygin <[email protected]>
Signed-off-by: Artem Ananev <[email protected]>
  • Loading branch information
artemananiev authored Jan 7, 2025
1 parent 7e8fc98 commit 7229541
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2021-2024 Hedera Hashgraph, LLC
* Copyright (C) 2021-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1491,54 +1491,65 @@ private static <K, V> void filterMutations(
final Consumer<Mutation<K, V>> action = mutation -> {
// local variable is required because mutation.next can be changed by another thread to null
// see https://github.com/hashgraph/hedera-services/issues/7046 for the context
Mutation<K, V> nextMutation = mutation.next;
final Mutation<K, V> nextMutation = mutation.next;
if (nextMutation == null) {
return;
}
mutation.next = null;
if (nextMutation != null) {
assert !nextMutation.isFiltered();
// There may be older mutations being purged in parallel, they should not contribute
// to the "filtered" counter
if (!nextMutation.isFiltered() && (nextMutation.version > lastReleasedVersion)) {
nextMutation.setFiltered();
assert !nextMutation.isFiltered();
// There may be older mutations being purged in parallel, they should not contribute
// to the "filtered" counter
if (!nextMutation.isFiltered() && (nextMutation.version > lastReleasedVersion)) {
nextMutation.setFiltered();
filteredCounter.incrementAndGet();
}
if (!nextMutation.isNew()) {
return;
}
// nextMutation is to put a new element into a virtual map. The element doesn't
// exist in the data source. If this mutation is filtered, there must be a newer
// mutation for the same key. If that newer mutation has the "deleted" flag, the
// element should never be flushed to disk
final Mutation<K, V> latestMutation = index.get(mutation.key);
// If latestMutation is null, lookup() can handle it just fine
final Mutation<K, V> latestMutationUpToVersion = lookup(latestMutation, newestVersion);
if (latestMutationUpToVersion == null) {
// Mutations are processed on many threads, see array.parallelTraverse() call
// below. The key may be removed from the index or the latest mutation up to
// newestVersion may be removed in parallel on a different thread
return;
}
assert !latestMutationUpToVersion.isFiltered();
if (latestMutationUpToVersion.isDeleted()) {
if (!latestMutationUpToVersion.isFiltered()) {
latestMutationUpToVersion.setFiltered();
filteredCounter.incrementAndGet();
}
if (nextMutation.isNew()) {
// nextMutation is to put a new element into a virtual map. The element doesn't
// exist in the data source. If this mutation is filtered, there must be a newer
// mutation for the same key. If that newer mutation has the "deleted" flag, the
// element should never be flushed to disk
final Mutation<K, V> latestMutation = index.get(mutation.key);
assert latestMutation != null;
final Mutation<K, V> latestMutationUpToVersion = lookup(latestMutation, newestVersion);
assert latestMutationUpToVersion != null;
assert !latestMutationUpToVersion.isFiltered();
if (latestMutationUpToVersion.isDeleted()) {
if (!latestMutationUpToVersion.isFiltered()) {
latestMutationUpToVersion.setFiltered();
filteredCounter.incrementAndGet();
}
// If the latest mutation up to newestVersion is "deleted", and there are no
// newer mutations, the whole entry for the key can be removed from the index.
// It's safe to do so here, as there are no references to copies older than
// newestVersion and there are no mutations in versions newer than newestVersion
index.compute(mutation.key, (k, v) -> {
assert v != null;
if (v == latestMutationUpToVersion) {
return null;
}
Mutation<K, V> m = v;
while (m.next != latestMutationUpToVersion) {
m = m.next;
}
assert !m.isFiltered();
assert m.version > newestVersion;
m.next = null;
return v;
});
} else {
// Propagate the "new" flag to the newer mutation
latestMutationUpToVersion.setNew();
// If the latest mutation up to newestVersion is "deleted", and there are no
// newer mutations, the whole entry for the key can be removed from the index.
// It's safe to do so here, as there are no references to copies older than
// newestVersion and there are no mutations in versions newer than newestVersion
index.compute(mutation.key, (k, v) -> {
assert v != null;
if (v == latestMutationUpToVersion) {
return null;
}
}
Mutation<K, V> m = v;
while ((m != null) && (m.next != latestMutationUpToVersion)) {
m = m.next;
}
// m may be null, if latestMutationUpToVersion was removed from the list of
// mutations in a parallel thread
if (m != null) {
assert !m.isFiltered();
assert m.version > newestVersion;
m.next = null;
}
return v;
});
} else {
// Propagate the "new" flag to the newer mutation
latestMutationUpToVersion.setNew();
}
};
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2021-2024 Hedera Hashgraph, LLC
* Copyright (C) 2021-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1552,6 +1552,9 @@ public void setupWithOriginalNode(final MerkleNode originalNode) {
snapshotCache.prepareForFlush();
flush(snapshotCache, originalMap.state, this.dataSource);

// I assume an empty node cache can be used below rather than snapshotCache, since all the
// cache entries are flushed to the data source anyway. However, using snapshotCache may
// be slightly faster, because it's in memory
return new RecordAccessorImpl<>(reconnectState, snapshotCache, keySerializer, valueSerializer, dataSource);
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2021-2024 Hedera Hashgraph, LLC
* Copyright (C) 2021-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,8 @@
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -585,6 +587,65 @@ void inMemoryAddRemoveNoFlushTest() throws InterruptedException {
}
}

@Test
void inMemoryManyAddManyRemoveNoFlushTest() throws InterruptedException {
final Configuration configuration = new TestConfigBuilder()
.withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1_000_000)
.getOrCreateConfig();

VirtualRootNode<TestKey, TestValue> root = new VirtualRootNode<>(
TestKeySerializer.INSTANCE,
TestValueSerializer.INSTANCE,
new InMemoryBuilder(),
configuration.getConfigData(VirtualMapConfig.class));

final VirtualRootNode<TestKey, TestValue> copy0 = root;
VirtualMapState state = new VirtualMapState("label");
copy0.postInit(new VirtualStateAccessorImpl(state));

final int nCopies = 100;
final VirtualRootNode[] copies = new VirtualRootNode[nCopies];
copies[0] = root;

// Here is the test: every elemement is added in one copy and then removed in the
// next copy. Every copy will contain no more than 500 elements, therefore its
// effective size will be small, so none of the copies should be flushed to disk
for (int copyNo = 1; copyNo < nCopies; copyNo++) {
final VirtualRootNode<TestKey, TestValue> copy = root.copy();
copies[copyNo] = copy;
state = state.copy();
copy.postInit(new VirtualStateAccessorImpl(state));
root.release();
root = copy;
final int N = 1000;
final List<Integer> l = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
l.add(i);
}
Collections.shuffle(l);
for (int i = 0; i < N; i++) {
final int keyIndex = l.get(i);
final TestKey key = new TestKey(keyIndex);
if (i % 2 == copyNo % 2) { // add
final TestValue value = new TestValue(1000000 + keyIndex);
root.put(key, value);
} else { // remove
root.remove(key);
}
}
}

// The last two copies should not be checked: the last one is mutable, the one before is not
// mergeable until its next copy is immutable
for (int i = 0; i < nCopies - 2; i++) {
final VirtualRootNode<TestKey, TestValue> copy = copies[i];
// Copies must be merged, not flushed
assertEventuallyTrue(() -> copy.isMerged(), Duration.ofSeconds(16), "copy " + i + " should be merged");
}

root.release();
}

@Test
void inMemoryAddRemoveSomeFlushesTest() {
final Configuration configuration = new TestConfigBuilder()
Expand Down

0 comments on commit 7229541

Please sign in to comment.