Skip to content

Commit

Permalink
GH-2998 improve memory overflow performance of the LmdbStore
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Jun 5, 2024
1 parent 1932cff commit 1e16f58
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -75,7 +72,7 @@ class LmdbSailStore implements SailStore {
private volatile boolean asyncTransactionFinished;
private volatile boolean nextTransactionAsync;

private final boolean enableMultiThreading = true;
boolean enableMultiThreading = true;

private PersistentSetFactory<Long> setFactory;
private PersistentSet<Long> unusedIds, nextUnusedIds;
Expand Down Expand Up @@ -573,6 +570,51 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai
addStatement(subj, pred, obj, explicit, ctx);
}

@Override
public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts) {

sinkStoreAccessLock.lock();
try {
startTransaction(true);

for (Statement statement : approved) {
Resource subj = statement.getSubject();
IRI pred = statement.getPredicate();
Value obj = statement.getObject();
Resource context = statement.getContext();

AddQuadOperation q = new AddQuadOperation();
q.s = valueStore.storeValue(subj);
q.p = valueStore.storeValue(pred);
q.o = valueStore.storeValue(obj);
q.c = context == null ? 0 : valueStore.storeValue(context);
q.context = context;
q.explicit = explicit;

if (multiThreadingActive) {
while (!opQueue.add(q)) {
if (tripleStoreException != null) {
throw wrapTripleStoreException();
}
}

} else {
q.execute();
}

}
} catch (IOException e) {
rollback();
throw new SailException(e);
} catch (RuntimeException e) {
rollback();
logger.error("Encountered an unexpected problem while trying to add a statement", e);
throw e;
} finally {
sinkStoreAccessLock.unlock();
}
}

@Override
public void deprecate(Statement statement) throws SailException {
removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,11 @@ protected void initializeInternal() throws SailException {
backingStore = new LmdbSailStore(dataDir, config);
this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel() {
@Override
protected SailStore createSailStore(File dataDir) throws IOException, SailException {
protected LmdbSailStore createSailStore(File dataDir) throws IOException, SailException {
// Model can't fit into memory, use another LmdbSailStore to store delta
return new LmdbSailStore(dataDir, config);
LmdbSailStore lmdbSailStore = new LmdbSailStore(dataDir, config);
lmdbSailStore.enableMultiThreading = false;
return lmdbSailStore;
}
}) {

Expand Down Expand Up @@ -406,4 +408,5 @@ private boolean upgradeStore(File dataDir, String version) throws SailException
public Supplier<CollectionFactory> getCollectionFactory() {
return () -> new MapDb3CollectionFactory(getIterationCacheSyncThreshold());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/

package org.eclipse.rdf4j.sail.lmdb;

import java.io.File;
Expand All @@ -16,6 +17,7 @@
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
Expand All @@ -32,7 +34,6 @@
import org.eclipse.rdf4j.model.impl.LinkedHashModel;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.base.SailStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,21 +48,24 @@ abstract class MemoryOverflowModel extends AbstractModel {

private static final Runtime RUNTIME = Runtime.getRuntime();

private static final int LARGE_BLOCK = 10000;
private static final int LARGE_BLOCK = 5 * 1024;

private static volatile boolean overflow;

// To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think
// we have space for one more block. The limit is currently set at 32 MB
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024;
// we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps.
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024
: 32 * 1024 * 1024;

final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class);

private LinkedHashModel memory;
private volatile LinkedHashModel memory;

transient File dataDir;
private transient File dataDir;

transient SailStore store;
private transient LmdbSailStore store;

transient SailSourceModel disk;
private transient volatile SailSourceModel disk;

private long baseline = 0;

Expand All @@ -70,7 +74,7 @@ abstract class MemoryOverflowModel extends AbstractModel {
SimpleValueFactory vf = SimpleValueFactory.getInstance();

public MemoryOverflowModel() {
memory = new LinkedHashModel(LARGE_BLOCK);
memory = new LinkedHashModel(LARGE_BLOCK * 2);
}

public MemoryOverflowModel(Model model) {
Expand Down Expand Up @@ -137,6 +141,33 @@ public boolean add(Statement st) {
return getDelegate().add(st);
}

@Override
public boolean addAll(Collection<? extends Statement> c) {
checkMemoryOverflow();
if (disk != null || c.size() <= 1024) {
return getDelegate().addAll(c);
} else {
boolean ret = false;
HashSet<Statement> buffer = new HashSet<>();
for (Statement st : c) {
buffer.add(st);
if (buffer.size() >= 1024) {
ret |= getDelegate().addAll(buffer);
buffer.clear();
innerCheckMemoryOverflow();
}
}
if (!buffer.isEmpty()) {
ret |= getDelegate().addAll(buffer);
buffer.clear();
}

return ret;

}

}

@Override
public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) {
return getDelegate().remove(subj, pred, obj, contexts);
Expand Down Expand Up @@ -191,13 +222,27 @@ public synchronized void removeTermIteration(Iterator<Statement> iter, Resource
}
}

protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException;
protected abstract LmdbSailStore createSailStore(File dataDir) throws IOException, SailException;

synchronized Model getDelegate() {
if (disk == null) {
private Model getDelegate() {
var memory = this.memory;
if (memory != null) {
return memory;
} else {
var disk = this.disk;
if (disk != null) {
return disk;
}
synchronized (this) {
if (this.memory != null) {
return this.memory;
}
if (this.disk != null) {
return this.disk;
}
throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state");
}
}
return disk;
}

private void writeObject(ObjectOutputStream s) throws IOException {
Expand Down Expand Up @@ -227,51 +272,84 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx
}
}

private synchronized void checkMemoryOverflow() {
if (disk == null) {
int size = size();
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();
private void checkMemoryOverflow() {
if (disk == getDelegate()) {
return;
}

// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();
if (overflow) {
innerCheckMemoryOverflow();
}
int size = size() + 1;
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
innerCheckMemoryOverflow();
}

// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();
}

// estimated memory used
long used = totalMemory - freeMemory;
private void innerCheckMemoryOverflow() {
if (disk == getDelegate()) {
return;
}

// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;
// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();

if (baseline > 0) {
long blockSize = used - baseline;
if (blockSize > maxBlockSize) {
maxBlockSize = blockSize;
}
// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();

// Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize);
overflowToDisk();
}
// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();

// estimated memory used
long used = totalMemory - freeMemory;

// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;

if (baseline > 0) {
long blockSize = used - baseline;
if (blockSize > maxBlockSize) {
maxBlockSize = blockSize;
}
if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) {
// stricter memory requirements to not overflow if other models are overflowing
logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
overflowToDisk();
System.gc();
} else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
// Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)

logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
overflowToDisk();
System.gc();
} else {
if (overflow) {
overflow = false;
}
baseline = used;
}
}
baseline = used;
}

private synchronized void overflowToDisk() {
overflow = true;
if (memory == null) {
assert disk != null;
return;
}

try {
LinkedHashModel memory = this.memory;
this.memory = null;

assert disk == null;
dataDir = Files.createTempDirectory("model").toFile();
logger.debug("memory overflow using temp directory {}", dataDir);
store = createSailStore(dataDir);
disk = new SailSourceModel(store) {
disk = new SailSourceModel(store, memory) {

@Override
protected void finalize() throws Throwable {
Expand All @@ -291,8 +369,7 @@ protected void finalize() throws Throwable {
super.finalize();
}
};
disk.addAll(memory);
memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK);

logger.debug("overflow synced to disk");
} catch (IOException | SailException e) {
String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)";
Expand Down
Loading

0 comments on commit 1e16f58

Please sign in to comment.