diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java index fd8b3a54b68..144e4dac6de 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java @@ -416,7 +416,7 @@ public void approve(Statement statement) { approved = createEmptyModel(); } approved.add(statement); - approvedEmpty = approved == null || approved.isEmpty(); + approvedEmpty = false; if (statement.getContext() != null) { if (approvedContexts == null) { approvedContexts = new HashSet<>(); @@ -447,7 +447,7 @@ public void deprecate(Statement statement) { deprecated = createEmptyModel(); } deprecated.add(statement); - deprecatedEmpty = deprecated == null || deprecated.isEmpty(); + deprecatedEmpty = false; Resource ctx = statement.getContext(); if (approvedContexts != null && approvedContexts.contains(ctx) && !approved.contains(null, null, null, ctx)) { @@ -833,8 +833,9 @@ void sinkApproved(SailSink sink) { boolean readLock = readWriteLock.readLock(); try { - if (approved != null) { - sink.approveAll(approved, approvedContexts); + Model approvedLocal = approved; + if (approvedLocal != null) { + sink.approveAll(approvedLocal, approvedContexts); } } finally { readWriteLock.unlockReader(readLock); @@ -848,8 +849,9 @@ void sinkDeprecated(SailSink sink) { boolean readLock = readWriteLock.readLock(); try { - if (deprecated != null) { - sink.deprecateAll(deprecated); + Model deprecatedLocal = deprecated; + if (deprecatedLocal != null) { + sink.deprecateAll(deprecatedLocal); } } finally { readWriteLock.unlockReader(readLock); @@ -885,7 +887,7 @@ public void approveAll(Set approve, Set approveContexts) { approved = createEmptyModel(); } approved.addAll(approve); - approvedEmpty = approved == null || approved.isEmpty(); + approvedEmpty = false; if (approveContexts != null) { if (approvedContexts == null) { @@ -912,7 +914,7 @@ public void deprecateAll(Set deprecate) { deprecated = createEmptyModel(); } deprecated.addAll(deprecate); - deprecatedEmpty = deprecated == null || deprecated.isEmpty(); + deprecatedEmpty = false; for (Statement statement : deprecate) { Resource ctx = statement.getContext(); diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java index 953e9678953..a45c897f25a 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java @@ -250,7 +250,7 @@ protected void initializeInternal() throws SailException { FileUtils.writeStringToFile(versionFile, VERSION, StandardCharsets.UTF_8); } backingStore = new LmdbSailStore(dataDir, config); - this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel() { + this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel(false) { @Override protected SailStore createSailStore(File dataDir) throws IOException, SailException { // Model can't fit into memory, use another LmdbSailStore to store delta diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java index f13cca44933..24aab5fabec 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java @@ -67,23 +67,17 @@ abstract class MemoryOverflowModel extends AbstractModel { private long maxBlockSize = 0; - SimpleValueFactory vf = SimpleValueFactory.getInstance(); + private final boolean verifyAdditions; - public MemoryOverflowModel() { - memory = new LinkedHashModel(LARGE_BLOCK); - } + private final SimpleValueFactory vf = SimpleValueFactory.getInstance(); - public MemoryOverflowModel(Model model) { - this(model.getNamespaces()); - addAll(model); - } - - public MemoryOverflowModel(Set namespaces, Collection c) { - this(namespaces); - addAll(c); + public MemoryOverflowModel(boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; + memory = new LinkedHashModel(LARGE_BLOCK); } - public MemoryOverflowModel(Set namespaces) { + public MemoryOverflowModel(Set namespaces, boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(namespaces, LARGE_BLOCK); } @@ -255,7 +249,7 @@ private synchronized void checkMemoryOverflow() { // Sync if either the estimated size of the next block is larger than remaining memory, or // if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING || - freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) { + freeToAllocateMemory < Math.min(0.3 * maxMemory, maxBlockSize)) { logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize); overflowToDisk(); } @@ -271,7 +265,7 @@ private synchronized void overflowToDisk() { dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); store = createSailStore(dataDir); - disk = new SailSourceModel(store) { + disk = new SailSourceModel(store, verifyAdditions) { @Override protected void finalize() throws Throwable { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java index dbe42069c1a..e326d02ba6a 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java @@ -45,6 +45,8 @@ class SailSourceModel extends AbstractModel { private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class); + private final boolean verifyAdditions; + private final class StatementIterator implements Iterator { final CloseableIteration stmts; @@ -97,16 +99,15 @@ public void remove() { SailSink sink; - private long size; - private final IsolationLevels level = IsolationLevels.NONE; - public SailSourceModel(SailStore store) { - this(store.getExplicitSailSource()); + public SailSourceModel(SailStore store, boolean verifyAdditions) { + this(store.getExplicitSailSource(), verifyAdditions); } - public SailSourceModel(SailSource source) { + public SailSourceModel(SailSource source, boolean verifyAdditions) { this.source = source; + this.verifyAdditions = verifyAdditions; } @Override @@ -147,21 +148,20 @@ public String toString() { @Override public synchronized int size() { - if (size < 0) { + long size = 0; + try { + CloseableIteration iter; + iter = dataset().getStatements(null, null, null); try { - CloseableIteration iter; - iter = dataset().getStatements(null, null, null); - try { - while (iter.hasNext()) { - iter.next(); - size++; - } - } finally { - iter.close(); + while (iter.hasNext()) { + iter.next(); + size++; } - } catch (SailException e) { - throw new ModelException(e); + } finally { + iter.close(); } + } catch (SailException e) { + throw new ModelException(e); } if (size > Integer.MAX_VALUE) { return Integer.MAX_VALUE; @@ -243,13 +243,10 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... throw new UnsupportedOperationException("Incomplete statement"); } try { - if (contains(subj, pred, obj, contexts)) { + if (verifyAdditions && contains(subj, pred, obj, contexts)) { logger.trace("already contains statement {} {} {} {}", subj, pred, obj, contexts); return false; } - if (size >= 0) { - size++; - } if (contexts == null || contexts.length == 0) { sink().approve(subj, pred, obj, null); } else { @@ -268,7 +265,6 @@ public synchronized boolean clear(Resource... contexts) { try { if (contains(null, null, null, contexts)) { sink().clear(contexts); - size = -1; return true; } } catch (SailException e) { @@ -279,25 +275,23 @@ public synchronized boolean clear(Resource... contexts) { @Override public synchronized boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { + boolean removed = false; try { - if (contains(subj, pred, obj, contexts)) { - size = -1; - CloseableIteration stmts; - stmts = dataset().getStatements(subj, pred, obj, contexts); - try { - while (stmts.hasNext()) { - Statement st = stmts.next(); - sink().deprecate(st); - } - } finally { - stmts.close(); + CloseableIteration stmts = dataset().getStatements(subj, pred, obj, + contexts); + try { + while (stmts.hasNext()) { + Statement st = stmts.next(); + sink().deprecate(st); + removed = true; } - return true; + } finally { + stmts.close(); } } catch (SailException e) { throw new ModelException(e); } - return false; + return removed; } @Override @@ -372,7 +366,6 @@ public synchronized void removeTermIteration(Iterator iter, Resource } finally { stmts.close(); } - size = -1; } catch (SailException e) { throw new ModelException(e); } diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java index 01ddf213bf0..e621259b03b 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModelTest.java @@ -61,7 +61,7 @@ protected SailSourceModel getNewModel() { LmdbSailStore store = new LmdbSailStore(Files.createTempDirectory("SailSourceModelTest-").toFile(), new LmdbStoreConfig("spoc")); stores.add(store); - return new SailSourceModel(store); + return new SailSourceModel(store, false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java index 0ccf1dcafca..4ebca65da33 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkSynthetic.java @@ -63,7 +63,7 @@ public class OverflowBenchmarkSynthetic { @Setup(Level.Trial) public void setup() { ((Logger) (LoggerFactory - .getLogger("org.eclipse.rdf4j.sail.lmdbrdf.MemoryOverflowModel"))) + .getLogger("org.eclipse.rdf4j.sail.lmdb.MemoryOverflowModel"))) .setLevel(ch.qos.logback.classic.Level.DEBUG); } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java index 001e25de584..43125c5d559 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java @@ -20,6 +20,7 @@ import java.util.Optional; import java.util.Set; +import org.eclipse.rdf4j.common.io.FileUtil; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.model.Namespace; @@ -39,8 +40,6 @@ * Model implementation that stores in a {@link LinkedHashModel} until more than 10KB statements are added and the * estimated memory usage is more than the amount of free memory available. Once the threshold is cross this * implementation seamlessly changes to a disk based {@link SailSourceModel}. - * - * @author James Leigh */ abstract class MemoryOverflowModel extends AbstractModel { @@ -56,35 +55,29 @@ abstract class MemoryOverflowModel extends AbstractModel { final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class); - private volatile LinkedHashModel memory; + private LinkedHashModel memory; - private transient File dataDir; + transient File dataDir; - private transient SailStore store; + transient SailStore store; - private transient volatile SailSourceModel disk; + transient SailSourceModel disk; private long baseline = 0; private long maxBlockSize = 0; - SimpleValueFactory vf = SimpleValueFactory.getInstance(); - - public MemoryOverflowModel() { - memory = new LinkedHashModel(LARGE_BLOCK); - } + private final boolean verifyAdditions; - public MemoryOverflowModel(Model model) { - this(model.getNamespaces()); - addAll(model); - } + private final SimpleValueFactory vf = SimpleValueFactory.getInstance(); - public MemoryOverflowModel(Set namespaces, Collection c) { - this(namespaces); - addAll(c); + public MemoryOverflowModel(boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; + memory = new LinkedHashModel(LARGE_BLOCK); } - public MemoryOverflowModel(Set namespaces) { + public MemoryOverflowModel(Set namespaces, boolean verifyAdditions) { + this.verifyAdditions = verifyAdditions; memory = new LinkedHashModel(namespaces, LARGE_BLOCK); } @@ -194,15 +187,11 @@ public synchronized void removeTermIteration(Iterator iter, Resource protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException; - private Model getDelegate() { - LinkedHashModel memory = this.memory; - if (memory != null) { + synchronized Model getDelegate() { + if (disk == null) { return memory; - } else { - synchronized (this) { - return disk; - } } + return disk; } private void writeObject(ObjectOutputStream s) throws IOException { @@ -272,15 +261,32 @@ private synchronized void checkMemoryOverflow() { private synchronized void overflowToDisk() { try { - LinkedHashModel memory = this.memory; - this.memory = null; - assert disk == null; dataDir = Files.createTempDirectory("model").toFile(); logger.debug("memory overflow using temp directory {}", dataDir); store = createSailStore(dataDir); - disk = new SailSourceModel(store); + disk = new SailSourceModel(store, verifyAdditions) { + + @Override + protected void finalize() throws Throwable { + logger.debug("finalizing {}", dataDir); + if (disk == this) { + try { + store.close(); + } catch (SailException e) { + logger.error(e.toString(), e); + } finally { + FileUtil.deleteDir(dataDir); + dataDir = null; + store = null; + disk = null; + } + } + super.finalize(); + } + }; disk.addAll(memory); + memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK); logger.debug("overflow synced to disk"); } catch (IOException | SailException e) { String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)"; diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java index 6f5ff9100d6..ea1441f41a0 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeStore.java @@ -72,6 +72,10 @@ public class NativeStore extends AbstractNotifyingSail implements FederatedServi final static class MemoryOverflowIntoNativeStore extends MemoryOverflowModel { private static final long serialVersionUID = 1L; + public MemoryOverflowIntoNativeStore() { + super(false); + } + /** * The class is static to avoid taking a pointer which might make it hard to get a phantom reference. */ diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java index ccf9e112d13..96205c416f6 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java @@ -40,13 +40,13 @@ /** * A {@link Model} that keeps the {@link Statement}s in an {@link SailSource}. - * - * @author James Leigh */ class SailSourceModel extends AbstractModel { private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class); + private final boolean verifyAdditions; + private final class StatementIterator implements Iterator { final CloseableIteration stmts; @@ -99,16 +99,15 @@ public void remove() { SailSink sink; - private long size; - private final IsolationLevels level = IsolationLevels.NONE; - public SailSourceModel(SailStore store) { - this(store.getExplicitSailSource()); + public SailSourceModel(SailStore store, boolean verifyAdditions) { + this(store.getExplicitSailSource(), verifyAdditions); } - public SailSourceModel(SailSource source) { + public SailSourceModel(SailSource source, boolean verifyAdditions) { this.source = source; + this.verifyAdditions = verifyAdditions; } @Override @@ -149,21 +148,20 @@ public String toString() { @Override public synchronized int size() { - if (size < 0) { + long size = 0; + try { + CloseableIteration iter; + iter = dataset().getStatements(null, null, null); try { - CloseableIteration iter; - iter = dataset().getStatements(null, null, null); - try { - while (iter.hasNext()) { - iter.next(); - size++; - } - } finally { - iter.close(); + while (iter.hasNext()) { + iter.next(); + size++; } - } catch (SailException e) { - throw new ModelException(e); + } finally { + iter.close(); } + } catch (SailException e) { + throw new ModelException(e); } if (size > Integer.MAX_VALUE) { return Integer.MAX_VALUE; @@ -245,13 +243,10 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource... throw new UnsupportedOperationException("Incomplete statement"); } try { - if (contains(subj, pred, obj, contexts)) { + if (verifyAdditions && contains(subj, pred, obj, contexts)) { logger.trace("already contains statement {} {} {} {}", subj, pred, obj, contexts); return false; } - if (size >= 0) { - size++; - } if (contexts == null || contexts.length == 0) { sink().approve(subj, pred, obj, null); } else { @@ -270,7 +265,6 @@ public synchronized boolean clear(Resource... contexts) { try { if (contains(null, null, null, contexts)) { sink().clear(contexts); - size = -1; return true; } } catch (SailException e) { @@ -281,25 +275,23 @@ public synchronized boolean clear(Resource... contexts) { @Override public synchronized boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) { + boolean removed = false; try { - if (contains(subj, pred, obj, contexts)) { - size = -1; - CloseableIteration stmts; - stmts = dataset().getStatements(subj, pred, obj, contexts); - try { - while (stmts.hasNext()) { - Statement st = stmts.next(); - sink().deprecate(st); - } - } finally { - stmts.close(); + CloseableIteration stmts = dataset().getStatements(subj, pred, obj, + contexts); + try { + while (stmts.hasNext()) { + Statement st = stmts.next(); + sink().deprecate(st); + removed = true; } - return true; + } finally { + stmts.close(); } } catch (SailException e) { throw new ModelException(e); } - return false; + return removed; } @Override @@ -374,7 +366,6 @@ public synchronized void removeTermIteration(Iterator iter, Resource } finally { stmts.close(); } - size = -1; } catch (SailException e) { throw new ModelException(e); } diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java index b91c09dc24d..7cf1f420b33 100644 --- a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModelTest.java @@ -51,7 +51,7 @@ protected SailSourceModel getNewModel() { try { NativeSailStore store = new NativeSailStore(Files.createTempDirectory("SailSourceModelTest-").toFile(), "spoc"); - return new SailSourceModel(store); + return new SailSourceModel(store, false); } catch (Exception e) { throw new RuntimeException(e); }