From 4f67b3471ae920e5bd37878a1d5ca0b5ed2bbc32 Mon Sep 17 00:00:00 2001 From: Ken Wenzel Date: Tue, 23 May 2023 11:30:06 +0200 Subject: [PATCH] GH-4554 Fix performance of memory overflow - do not verify additions in Changeset via isEmpty() - proactively close MemoryOverflowModel in Changeset by using AutoClosable interface - use GarbageCollectorMXBean to monitor GC load - do not isolate transactions in LmdbStore when used in MemoryOverflowModel --- .../eclipse/rdf4j/sail/base/Changeset.java | 28 +++- .../rdf4j/sail/lmdb/LmdbSailStore.java | 4 + .../eclipse/rdf4j/sail/lmdb/LmdbStore.java | 8 +- .../rdf4j/sail/lmdb/MemoryOverflowModel.java | 152 ++++++++++------- .../rdf4j/sail/lmdb/SailSourceModel.java | 80 +++++---- .../eclipse/rdf4j/sail/lmdb/TripleStore.java | 38 ++++- .../rdf4j/sail/lmdb/SailSourceModelTest.java | 2 +- .../benchmark/OverflowBenchmarkSynthetic.java | 2 +- .../sail/nativerdf/MemoryOverflowModel.java | 158 +++++++++++------- .../rdf4j/sail/nativerdf/NativeStore.java | 4 + .../rdf4j/sail/nativerdf/SailSourceModel.java | 82 ++++----- .../sail/nativerdf/SailSourceModelTest.java | 2 +- 12 files changed, 348 insertions(+), 212 deletions(-) diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java index fd8b3a54b68..ec89eeae284 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java @@ -127,7 +127,25 @@ public void close() throws SailException { refbacks = null; prepend = null; observed = null; - approved = null; + try { + if (approved instanceof AutoCloseable) { + try { + ((AutoCloseable) approved).close(); + } catch (Exception e) { + throw new SailException(e); + } + } + approved = null; + } finally { + if (deprecated instanceof AutoCloseable) { + try { + ((AutoCloseable) deprecated).close(); + } catch (Exception e) { + throw new SailException(e); + } + } + deprecated = null; + } deprecated = null; approvedContexts = null; deprecatedContexts = null; @@ -416,7 +434,7 @@ public void approve(Statement statement) { approved = createEmptyModel(); } approved.add(statement); - approvedEmpty = approved == null || approved.isEmpty(); + approvedEmpty = false; if (statement.getContext() != null) { if (approvedContexts == null) { approvedContexts = new HashSet<>(); @@ -447,7 +465,7 @@ public void deprecate(Statement statement) { deprecated = createEmptyModel(); } deprecated.add(statement); - deprecatedEmpty = deprecated == null || deprecated.isEmpty(); + deprecatedEmpty = false; Resource ctx = statement.getContext(); if (approvedContexts != null && approvedContexts.contains(ctx) && !approved.contains(null, null, null, ctx)) { @@ -885,7 +903,7 @@ public void approveAll(Set approve, Set approveContexts) { approved = createEmptyModel(); } approved.addAll(approve); - approvedEmpty = approved == null || approved.isEmpty(); + approvedEmpty = false; if (approveContexts != null) { if (approvedContexts == null) { @@ -912,7 +930,7 @@ public void deprecateAll(Set deprecate) { deprecated = createEmptyModel(); } deprecated.addAll(deprecate); - deprecatedEmpty = deprecated == null || deprecated.isEmpty(); + deprecatedEmpty = false; for (Statement statement : deprecate) { Resource ctx = statement.getContext(); diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java index 239a40cfb16..3db0573fa51 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java @@ -372,6 +372,10 @@ CloseableIteration createStatementIterator( } } + public void setTransactionIsolation(boolean transactionIsolation) { + this.tripleStore.setTransactionIsolation(transactionIsolation); + } + private final class LmdbSailSource extends BackingSailSource { private final boolean explicit; diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java index 953e9678953..2c213d2e681 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java @@ -250,11 +250,15 @@ protected void initializeInternal() throws SailException { FileUtils.writeStringToFile(versionFile, VERSION, StandardCharsets.UTF_8); } backingStore = new LmdbSailStore(dataDir, config); - this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel() { + this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel(false) { @Override protected SailStore createSailStore(File dataDir) throws IOException, SailException { // Model can't fit into memory, use another LmdbSailStore to store delta - return new LmdbSailStore(dataDir, config); + LmdbStoreConfig overflowConfig = new LmdbStoreConfig(); + LmdbSailStore store = new LmdbSailStore(dataDir, overflowConfig); + // does not need to isolate transactions and therefore can optimize autogrow and others + store.setTransactionIsolation(false); + return store; } }) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java index f13cca44933..5c3671dbd30 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java @@ -14,11 +14,15 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; import java.nio.file.Files; -import java.util.Collection; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.eclipse.rdf4j.common.io.FileUtil; import org.eclipse.rdf4j.model.IRI; @@ -41,7 +45,7 @@ * estimated memory usage is more than the amount of free memory available. Once the threshold is cross this * implementation seamlessly changes to a disk based {@link SailSourceModel}. */ -abstract class MemoryOverflowModel extends AbstractModel { +abstract class MemoryOverflowModel extends AbstractModel implements AutoCloseable { private static final long serialVersionUID = 4119844228099208169L; @@ -49,8 +53,8 @@ abstract class MemoryOverflowModel extends AbstractModel { private static final int LARGE_BLOCK = 10000; - // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think - // we have space for one more block. The limit is currently set at 32 MB + // To reduce the chance of OOM we will always overflow once we get close to running out of memory. + // The limit is currently set at 32 MB private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024; final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); @@ -63,27 +67,17 @@ abstract class MemoryOverflowModel extends AbstractModel { transient SailSourceModel disk; - private long baseline = 0; + private final boolean verifyAdditions; - private long maxBlockSize = 0; + private final SimpleValueFactory vf = SimpleValueFactory.getInstance(); - SimpleValueFactory vf = SimpleValueFactory.getInstance(); - - public MemoryOverflowModel() { + public MemoryOverflowModel(boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(LARGE_BLOCK); } - public MemoryOverflowModel(Model model) { - this(model.getNamespaces()); - addAll(model); - } - - public MemoryOverflowModel(Set namespaces, Collection c) { - this(namespaces); - addAll(c); - } - - public MemoryOverflowModel(Set namespaces) { + public MemoryOverflowModel(Set namespaces, boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(namespaces, LARGE_BLOCK); } @@ -227,40 +221,71 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx } } + static class GcInfo { + long count; + long time; + } + + private final Map prevGcInfo = new ConcurrentHashMap<>(); + + private synchronized boolean highGcLoad() { + boolean highLoad = false; + + // get all garbage collector MXBeans. + List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + long count = gcBean.getCollectionCount(); + long time = gcBean.getCollectionTime(); + + GcInfo prevInfo = prevGcInfo.get(gcBean.getName()); + if (prevInfo != null) { + long countDiff = count - prevInfo.count; + long timeDiff = time - prevInfo.time; + if (countDiff != 0) { + double gcLoad = (double) timeDiff / countDiff; + // TODO find good threshold + if (gcLoad > 30) { + highLoad = true; + } + } + } else { + prevInfo = new GcInfo(); + prevGcInfo.put(gcBean.getName(), prevInfo); + } + prevInfo.count = count; + prevInfo.time = time; + } + return highLoad; + } + private synchronized void checkMemoryOverflow() { if (disk == null) { int size = size(); if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { - // maximum heap size the JVM can allocate - long maxMemory = RUNTIME.maxMemory(); + boolean overflow = highGcLoad(); + if (!overflow) { + // maximum heap size the JVM can allocate + long maxMemory = RUNTIME.maxMemory(); - // total currently allocated JVM memory - long totalMemory = RUNTIME.totalMemory(); + // total currently allocated JVM memory + long totalMemory = RUNTIME.totalMemory(); - // amount of memory free in the currently allocated JVM memory - long freeMemory = RUNTIME.freeMemory(); + // amount of memory free in the currently allocated JVM memory + long freeMemory = RUNTIME.freeMemory(); - // estimated memory used - long used = totalMemory - freeMemory; + // estimated memory used + long used = totalMemory - freeMemory; - // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) - long freeToAllocateMemory = maxMemory - used; + // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) + long freeToAllocateMemory = maxMemory - used; - if (baseline > 0) { - long blockSize = used - baseline; - if (blockSize > maxBlockSize) { - maxBlockSize = blockSize; - } - - // Sync if either the estimated size of the next block is larger than remaining memory, or - // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) - if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { - logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize); - overflowToDisk(); - } + // try to prevent OOM + overflow = freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING; + } + if (overflow) { + logger.debug("syncing at {} triples.", size); + overflowToDisk(); } - baseline = used; } } } @@ -271,26 +296,7 @@ private synchronized void overflowToDisk() { dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); store = createSailStore(dataDir); - disk = new SailSourceModel(store) { - - @Override - protected void finalize() throws Throwable { - logger.debug("finalizing {}", dataDir); - if (disk == this) { - try { - store.close(); - } catch (SailException e) { - logger.error(e.toString(), e); - } finally { - FileUtil.deleteDir(dataDir); - dataDir = null; - store = null; - disk = null; - } - } - super.finalize(); - } - }; + disk = new SailSourceModel(store, verifyAdditions); disk.addAll(memory); memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK); logger.debug("overflow synced to disk"); @@ -299,4 +305,22 @@ protected void finalize() throws Throwable { logger.error("Error while writing to overflow directory " + path, e); } } + + @Override + public void close() throws IOException { + if (disk != null) { + logger.debug("closing {}", dataDir); + disk.close(); + try { + store.close(); + } catch (SailException e) { + logger.error(e.toString(), e); + } finally { + FileUtil.deleteDir(dataDir); + dataDir = null; + store = null; + disk = null; + } + } + } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java index dbe42069c1a..a18e866d1cc 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java @@ -45,6 +45,8 @@ class SailSourceModel extends AbstractModel { private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class); + private final boolean verifyAdditions; + private final class StatementIterator implements Iterator { final CloseableIteration stmts; @@ -97,16 +99,15 @@ public void remove() { SailSink sink; - private long size; - private final IsolationLevels level = IsolationLevels.NONE; - public SailSourceModel(SailStore store) { - this(store.getExplicitSailSource()); + public SailSourceModel(SailStore store, boolean verifyAdditions) { + this(store.getExplicitSailSource(), verifyAdditions); } - public SailSourceModel(SailSource source) { + public SailSourceModel(SailSource source, boolean verifyAdditions) { this.source = source; + this.verifyAdditions = verifyAdditions; } @Override @@ -147,21 +148,20 @@ public String toString() { @Override public synchronized int size() { - if (size < 0) { + long size = 0; + try { + CloseableIteration iter; + iter = dataset().getStatements(null, null, null); try { - CloseableIteration iter; - iter = dataset().getStatements(null, null, null); - try { - while (iter.hasNext()) { - iter.next(); - size++; - } - } finally { - iter.close(); + while (iter.hasNext()) { + iter.next(); + size++; } - } catch (SailException e) { - throw new ModelException(e); + } finally { + iter.close(); } + } catch (SailException e) { + throw new ModelException(e); } if (size > Integer.MAX_VALUE) { return Integer.MAX_VALUE; @@ -243,13 +243,10 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... throw new UnsupportedOperationException("Incomplete statement"); } try { - if (contains(subj, pred, obj, contexts)) { + if (verifyAdditions && contains(subj, pred, obj, contexts)) { logger.trace("already contains statement {} {} {} {}", subj, pred, obj, contexts); return false; } - if (size >= 0) { - size++; - } if (contexts == null || contexts.length == 0) { sink().approve(subj, pred, obj, null); } else { @@ -268,7 +265,6 @@ public synchronized boolean clear(Resource... contexts) { try { if (contains(null, null, null, contexts)) { sink().clear(contexts); - size = -1; return true; } } catch (SailException e) { @@ -279,25 +275,23 @@ public synchronized boolean clear(Resource... contexts) { @Override public synchronized boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { + boolean removed = false; try { - if (contains(subj, pred, obj, contexts)) { - size = -1; - CloseableIteration stmts; - stmts = dataset().getStatements(subj, pred, obj, contexts); - try { - while (stmts.hasNext()) { - Statement st = stmts.next(); - sink().deprecate(st); - } - } finally { - stmts.close(); + CloseableIteration stmts = dataset().getStatements(subj, pred, obj, + contexts); + try { + while (stmts.hasNext()) { + Statement st = stmts.next(); + sink().deprecate(st); + removed = true; } - return true; + } finally { + stmts.close(); } } catch (SailException e) { throw new ModelException(e); } - return false; + return removed; } @Override @@ -372,7 +366,6 @@ public synchronized void removeTermIteration(Iterator iter, Resource } finally { stmts.close(); } - size = -1; } catch (SailException e) { throw new ModelException(e); } @@ -404,6 +397,21 @@ private synchronized SailDataset dataset() throws SailException { return dataset; } + public void close() { + if (sink != null) { + try { + sink.flush(); + } finally { + sink.close(); + sink = null; + } + } + if (dataset != null) { + dataset.close(); + dataset = null; + } + } + private boolean contains(SailDataset dataset, Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException { if (dataset == null) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java index 86c54b5c6b7..960396e4e08 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java @@ -156,6 +156,8 @@ class TripleStore implements Closeable { private TxnRecordCache recordCache = null; + private boolean transactionIsolation = true; + static final Comparator COMPARATOR = new Comparator() { @Override public int compare(ByteBuffer b1, ByteBuffer b2) { @@ -684,9 +686,35 @@ protected TripleIndex getBestIndex(long subj, long pred, long obj, long context) return bestIndex; } - private boolean requiresResize() { - if (autoGrow) { - return LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0); + private boolean requiresResize() throws IOException { + if (autoGrow && LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0)) { + if (transactionIsolation) { + // caller has to handle resizing + return true; + } else { + // directly resize if isolation is not required + E(mdb_txn_commit(writeTxn)); + StampedLock lock = txnManager.lock(); + long stamp = lock.writeLock(); + try { + txnManager.deactivate(); + mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0); + E(mdb_env_set_mapsize(env, mapSize)); + // restart write transaction + try (MemoryStack stack = stackPush()) { + PointerBuffer pp = stack.mallocPointer(1); + mdb_txn_begin(env, NULL, 0, pp); + writeTxn = pp.get(0); + } + } finally { + try { + txnManager.activate(); + } finally { + lock.unlockWrite(stamp); + } + } + return false; + } } else { return false; } @@ -919,6 +947,10 @@ private void storeProperties(File propFile) throws IOException { } } + public void setTransactionIsolation(boolean transactionIsolation) { + this.transactionIsolation = transactionIsolation; + } + class TripleIndex { private final char[] fieldSeq; diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java index 01ddf213bf0..e621259b03b 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java @@ -61,7 +61,7 @@ protected SailSourceModel getNewModel() { LmdbSailStore store = new LmdbSailStore(Files.createTempDirectory("SailSourceModelTest-").toFile(), new LmdbStoreConfig("spoc")); stores.add(store); - return new SailSourceModel(store); + return new SailSourceModel(store, false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java index 0ccf1dcafca..4ebca65da33 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java @@ -63,7 +63,7 @@ public class OverflowBenchmarkSynthetic { @Setup(Level.Trial) public void setup() { ((Logger) (LoggerFactory - .getLogger("org.eclipse.rdf4j.sail.lmdbrdf.MemoryOverflowModel"))) + .getLogger("org.eclipse.rdf4j.sail.lmdb.MemoryOverflowModel"))) .setLevel(ch.qos.logback.classic.Level.DEBUG); } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java index 001e25de584..5356b805dad 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java @@ -14,12 +14,17 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; import java.nio.file.Files; -import java.util.Collection; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.eclipse.rdf4j.common.io.FileUtil; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.model.Namespace; @@ -39,10 +44,8 @@ * Model implementation that stores in a {@link LinkedHashModel} until more than 10KB statements are added and the * estimated memory usage is more than the amount of free memory available. Once the threshold is cross this * implementation seamlessly changes to a disk based {@link SailSourceModel}. - * - * @author James Leigh */ -abstract class MemoryOverflowModel extends AbstractModel { +abstract class MemoryOverflowModel extends AbstractModel implements AutoCloseable { private static final long serialVersionUID = 4119844228099208169L; @@ -50,41 +53,31 @@ abstract class MemoryOverflowModel extends AbstractModel { private static final int LARGE_BLOCK = 10000; - // To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think - // we have space for one more block. The limit is currently set at 32 MB + // To reduce the chance of OOM we will always overflow once we get close to running out of memory. + // The limit is currently set at 32 MB private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024; final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); - private volatile LinkedHashModel memory; - - private transient File dataDir; + private LinkedHashModel memory; - private transient SailStore store; + transient File dataDir; - private transient volatile SailSourceModel disk; + transient SailStore store; - private long baseline = 0; + transient SailSourceModel disk; - private long maxBlockSize = 0; + private final boolean verifyAdditions; - SimpleValueFactory vf = SimpleValueFactory.getInstance(); + private final SimpleValueFactory vf = SimpleValueFactory.getInstance(); - public MemoryOverflowModel() { + public MemoryOverflowModel(boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(LARGE_BLOCK); } - public MemoryOverflowModel(Model model) { - this(model.getNamespaces()); - addAll(model); - } - - public MemoryOverflowModel(Set namespaces, Collection c) { - this(namespaces); - addAll(c); - } - - public MemoryOverflowModel(Set namespaces) { + public MemoryOverflowModel(Set namespaces, boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(namespaces, LARGE_BLOCK); } @@ -194,15 +187,11 @@ public synchronized void removeTermIteration(Iterator iter, Resource protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException; - private Model getDelegate() { - LinkedHashModel memory = this.memory; - if (memory != null) { + synchronized Model getDelegate() { + if (disk == null) { return memory; - } else { - synchronized (this) { - return disk; - } } + return disk; } private void writeObject(ObjectOutputStream s) throws IOException { @@ -232,59 +221,106 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx } } + static class GcInfo { + long count; + long time; + } + + private final Map prevGcInfo = new ConcurrentHashMap<>(); + + private synchronized boolean highGcLoad() { + boolean highLoad = false; + + // get all garbage collector MXBeans. + List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + long count = gcBean.getCollectionCount(); + long time = gcBean.getCollectionTime(); + + GcInfo prevInfo = prevGcInfo.get(gcBean.getName()); + if (prevInfo != null) { + long countDiff = count - prevInfo.count; + long timeDiff = time - prevInfo.time; + if (countDiff != 0) { + double gcLoad = (double) timeDiff / countDiff; + // TODO find good threshold + if (gcLoad > 30) { + highLoad = true; + } + } + } else { + prevInfo = new GcInfo(); + prevGcInfo.put(gcBean.getName(), prevInfo); + } + prevInfo.count = count; + prevInfo.time = time; + } + return highLoad; + } + private synchronized void checkMemoryOverflow() { if (disk == null) { int size = size(); if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) { - // maximum heap size the JVM can allocate - long maxMemory = RUNTIME.maxMemory(); + boolean overflow = highGcLoad(); + if (!overflow) { + // maximum heap size the JVM can allocate + long maxMemory = RUNTIME.maxMemory(); - // total currently allocated JVM memory - long totalMemory = RUNTIME.totalMemory(); + // total currently allocated JVM memory + long totalMemory = RUNTIME.totalMemory(); - // amount of memory free in the currently allocated JVM memory - long freeMemory = RUNTIME.freeMemory(); + // amount of memory free in the currently allocated JVM memory + long freeMemory = RUNTIME.freeMemory(); - // estimated memory used - long used = totalMemory - freeMemory; + // estimated memory used + long used = totalMemory - freeMemory; - // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) - long freeToAllocateMemory = maxMemory - used; + // amount of memory the JVM can still allocate from the OS (upper boundary is the max heap) + long freeToAllocateMemory = maxMemory - used; - if (baseline > 0) { - long blockSize = used - baseline; - if (blockSize > maxBlockSize) { - maxBlockSize = blockSize; - } - - // Sync if either the estimated size of the next block is larger than remaining memory, or - // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) - if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { - logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize); - overflowToDisk(); - } + // try to prevent OOM + overflow = freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING; + } + if (overflow) { + logger.debug("syncing at {} triples.", size); + overflowToDisk(); } - baseline = used; } } } private synchronized void overflowToDisk() { try { - LinkedHashModel memory = this.memory; - this.memory = null; - assert disk == null; dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); store = createSailStore(dataDir); - disk = new SailSourceModel(store); + disk = new SailSourceModel(store, verifyAdditions); disk.addAll(memory); + memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK); logger.debug("overflow synced to disk"); } catch (IOException | SailException e) { String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)"; logger.error("Error while writing to overflow directory " + path, e); } } + + @Override + public void close() throws IOException { + if (disk != null) { + logger.debug("closing {}", dataDir); + disk.close(); + try { + store.close(); + } catch (SailException e) { + logger.error(e.toString(), e); + } finally { + FileUtil.deleteDir(dataDir); + dataDir = null; + store = null; + disk = null; + } + } + } } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java index 6f5ff9100d6..ea1441f41a0 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java @@ -72,6 +72,10 @@ public class NativeStore extends AbstractNotifyingSail implements FederatedServi final static class MemoryOverflowIntoNativeStore extends MemoryOverflowModel { private static final long serialVersionUID = 1L; + public MemoryOverflowIntoNativeStore() { + super(false); + } + /** * The class is static to avoid taking a pointer which might make it hard to get a phantom reference. */ diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java index ccf9e112d13..dfc19d65418 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java @@ -40,13 +40,13 @@ /** * A {@link Model} that keeps the {@link Statement}s in an {@link SailSource}. - * - * @author James Leigh */ class SailSourceModel extends AbstractModel { private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class); + private final boolean verifyAdditions; + private final class StatementIterator implements Iterator { final CloseableIteration stmts; @@ -99,16 +99,15 @@ public void remove() { SailSink sink; - private long size; - private final IsolationLevels level = IsolationLevels.NONE; - public SailSourceModel(SailStore store) { - this(store.getExplicitSailSource()); + public SailSourceModel(SailStore store, boolean verifyAdditions) { + this(store.getExplicitSailSource(), verifyAdditions); } - public SailSourceModel(SailSource source) { + public SailSourceModel(SailSource source, boolean verifyAdditions) { this.source = source; + this.verifyAdditions = verifyAdditions; } @Override @@ -149,21 +148,20 @@ public String toString() { @Override public synchronized int size() { - if (size < 0) { + long size = 0; + try { + CloseableIteration iter; + iter = dataset().getStatements(null, null, null); try { - CloseableIteration iter; - iter = dataset().getStatements(null, null, null); - try { - while (iter.hasNext()) { - iter.next(); - size++; - } - } finally { - iter.close(); + while (iter.hasNext()) { + iter.next(); + size++; } - } catch (SailException e) { - throw new ModelException(e); + } finally { + iter.close(); } + } catch (SailException e) { + throw new ModelException(e); } if (size > Integer.MAX_VALUE) { return Integer.MAX_VALUE; @@ -245,13 +243,10 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... throw new UnsupportedOperationException("Incomplete statement"); } try { - if (contains(subj, pred, obj, contexts)) { + if (verifyAdditions && contains(subj, pred, obj, contexts)) { logger.trace("already contains statement {} {} {} {}", subj, pred, obj, contexts); return false; } - if (size >= 0) { - size++; - } if (contexts == null || contexts.length == 0) { sink().approve(subj, pred, obj, null); } else { @@ -270,7 +265,6 @@ public synchronized boolean clear(Resource... contexts) { try { if (contains(null, null, null, contexts)) { sink().clear(contexts); - size = -1; return true; } } catch (SailException e) { @@ -281,25 +275,23 @@ public synchronized boolean clear(Resource... contexts) { @Override public synchronized boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { + boolean removed = false; try { - if (contains(subj, pred, obj, contexts)) { - size = -1; - CloseableIteration stmts; - stmts = dataset().getStatements(subj, pred, obj, contexts); - try { - while (stmts.hasNext()) { - Statement st = stmts.next(); - sink().deprecate(st); - } - } finally { - stmts.close(); + CloseableIteration stmts = dataset().getStatements(subj, pred, obj, + contexts); + try { + while (stmts.hasNext()) { + Statement st = stmts.next(); + sink().deprecate(st); + removed = true; } - return true; + } finally { + stmts.close(); } } catch (SailException e) { throw new ModelException(e); } - return false; + return removed; } @Override @@ -374,7 +366,6 @@ public synchronized void removeTermIteration(Iterator iter, Resource } finally { stmts.close(); } - size = -1; } catch (SailException e) { throw new ModelException(e); } @@ -406,6 +397,21 @@ private synchronized SailDataset dataset() throws SailException { return dataset; } + public void close() { + if (sink != null) { + try { + sink.flush(); + } finally { + sink.close(); + sink = null; + } + } + if (dataset != null) { + dataset.close(); + dataset = null; + } + } + private boolean contains(SailDataset dataset, Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException { if (dataset == null) { diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java index b91c09dc24d..7cf1f420b33 100644 --- a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java @@ -51,7 +51,7 @@ protected SailSourceModel getNewModel() { try { NativeSailStore store = new NativeSailStore(Files.createTempDirectory("SailSourceModelTest-").toFile(), "spoc"); - return new SailSourceModel(store); + return new SailSourceModel(store, false); } catch (Exception e) { throw new RuntimeException(e); }