diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java index 75ae8e497f2..4a7b8f431a4 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java @@ -11,6 +11,7 @@ package org.eclipse.rdf4j.sail.base; import java.lang.invoke.VarHandle; +import java.lang.ref.Reference; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -23,6 +24,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.StampedLock; import java.util.stream.Collectors; @@ -51,6 +53,28 @@ @InternalUseOnly public abstract class Changeset implements SailSink, ModelFactory { + static class CountedReference { + final T referent; + final AtomicInteger count = new AtomicInteger(1); + + CountedReference(T referent) { + this.referent = referent; + } + + CountedReference retain() { + count.incrementAndGet(); + return this; + } + + boolean release() { + return count.decrementAndGet() == 0; + } + + T get() { + return referent; + } + } + AdderBasedReadWriteLock readWriteLock = new AdderBasedReadWriteLock(); AdderBasedReadWriteLock refBacksReadWriteLock = new AdderBasedReadWriteLock(); Semaphore prependLock = new Semaphore(1); @@ -78,7 +102,7 @@ public abstract class Changeset implements SailSink, ModelFactory { *

* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE */ - private volatile Model approved; + private volatile CountedReference approved; private volatile boolean approvedEmpty = true; /** @@ -86,7 +110,7 @@ public abstract class Changeset implements SailSink, ModelFactory { *

* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE */ - private volatile Model deprecated; + private volatile CountedReference deprecated; private volatile boolean deprecatedEmpty = true; /** @@ -132,16 +156,16 @@ public void close() throws SailException { addedNamespaces = null; removedPrefixes = null; try { - if (approved instanceof AutoCloseable) { - ((AutoCloseable) approved).close(); + if (approved != null && approved.release() && approved.get() instanceof AutoCloseable) { + ((AutoCloseable) approved.get()).close(); } } catch (Exception e) { throw new SailException(e); } finally { approved = null; - if (deprecated instanceof AutoCloseable) { + if (deprecated != null && deprecated.release() && deprecated.get() instanceof AutoCloseable) { try { - ((AutoCloseable) deprecated).close(); + ((AutoCloseable) deprecated.get()).close(); } catch (Exception e) { throw new SailException(e); } finally { @@ -184,7 +208,7 @@ boolean hasApproved(Resource subj, IRI pred, Value obj, Resource[] contexts) { boolean readLock = readWriteLock.readLock(); try { - return approved.contains(subj, pred, obj, contexts); + return approved.get().contains(subj, pred, obj, contexts); } finally { readWriteLock.unlockReader(readLock); } @@ -206,7 +230,7 @@ boolean hasDeprecated(Resource subj, IRI pred, Value obj, Resource[] contexts) { } } - return deprecated.contains(subj, pred, obj, contexts); + return deprecated.get().contains(subj, pred, obj, contexts); } finally { readWriteLock.unlockReader(readLock); } @@ -389,7 +413,7 @@ public void clear(Resource... contexts) { statementCleared = true; if (approved != null) { - approved.clear(); + approved.get().clear(); } if (approvedContexts != null) { approvedContexts.clear(); @@ -399,7 +423,7 @@ public void clear(Resource... contexts) { deprecatedContexts = new HashSet<>(); } if (approved != null) { - approved.remove(null, null, null, contexts); + approved.get().remove(null, null, null, contexts); } if (approvedContexts != null && contexts != null) { for (Resource resource : contexts) { @@ -410,7 +434,7 @@ public void clear(Resource... contexts) { deprecatedContexts.addAll(Arrays.asList(contexts)); } } - approvedEmpty = approved == null || approved.isEmpty(); + approvedEmpty = approved == null || approved.get().isEmpty(); } finally { readWriteLock.unlockWriter(writeLock); } @@ -425,13 +449,13 @@ public void approve(Statement statement) { try { if (deprecated != null) { - deprecated.remove(statement); - deprecatedEmpty = deprecated == null || deprecated.isEmpty(); + deprecated.get().remove(statement); + deprecatedEmpty = deprecated == null || deprecated.get().isEmpty(); } if (approved == null) { - approved = createEmptyModel(); + approved = new CountedReference<>(createEmptyModel()); } - approved.add(statement); + approved.get().add(statement); approvedEmpty = false; if (statement.getContext() != null) { if (approvedContexts == null) { @@ -456,17 +480,17 @@ public void deprecate(Statement statement) { long writeLock = readWriteLock.writeLock(); try { if (approved != null) { - approved.remove(statement); - approvedEmpty = approved == null || approved.isEmpty(); + approved.get().remove(statement); + approvedEmpty = approved == null || approved.get().isEmpty(); } if (deprecated == null) { - deprecated = createEmptyModel(); + deprecated = new CountedReference<>(createEmptyModel()); } - deprecated.add(statement); + deprecated.get().add(statement); deprecatedEmpty = false; Resource ctx = statement.getContext(); if (approvedContexts != null && approvedContexts.contains(ctx) - && !approved.contains(null, null, null, ctx)) { + && !approved.get().contains(null, null, null, ctx)) { approvedContexts.remove(ctx); } } finally { @@ -501,11 +525,11 @@ public String toString() { sb.append(" deprecatedContexts, "); } if (deprecated != null) { - sb.append(deprecated.size()); + sb.append(deprecated.get().size()); sb.append(" deprecated, "); } if (approved != null) { - sb.append(approved.size()); + sb.append(approved.get().size()); sb.append(" approved, "); } if (sb.length() > 0) { @@ -520,9 +544,9 @@ protected void setChangeset(Changeset from) { assert !from.closed; this.observed = from.observed; - this.approved = from.approved; + this.approved = from.approved != null ? from.approved.retain() : null; this.approvedEmpty = from.approvedEmpty; - this.deprecated = from.deprecated; + this.deprecated = from.deprecated != null ? from.deprecated.retain() : null; this.deprecatedEmpty = from.deprecatedEmpty; this.approvedContexts = from.approvedContexts; this.deprecatedContexts = from.deprecatedContexts; @@ -689,7 +713,7 @@ List getDeprecatedStatements() { boolean readLock = readWriteLock.readLock(); try { - return new ArrayList<>(deprecated); + return new ArrayList<>(deprecated.get()); } finally { readWriteLock.unlockReader(readLock); } @@ -704,7 +728,7 @@ List getApprovedStatements() { boolean readLock = readWriteLock.readLock(); try { - return new ArrayList<>(approved); + return new ArrayList<>(approved.get()); } finally { readWriteLock.unlockReader(readLock); } @@ -725,7 +749,7 @@ boolean hasDeprecated(Statement statement) { } } if (deprecated != null) { - return deprecated.contains(statement); + return deprecated.get().contains(statement); } else { return false; } @@ -751,7 +775,7 @@ Iterable getApprovedStatements(Resource subj, IRI pred, Value obj, boolean readLock = readWriteLock.readLock(); try { - Iterable statements = approved.getStatements(subj, pred, obj, contexts); + Iterable statements = approved.get().getStatements(subj, pred, obj, contexts); // This is a synchronized context, users of this method will be allowed to use the results at their leisure. // We @@ -788,7 +812,8 @@ Iterable getApprovedTriples(Resource subj, IRI pred, Value obj) { try { // TODO none of this is particularly well thought-out in terms of performance, but we are aiming // for functionally complete first. - Stream approvedSubjectTriples = approved.parallelStream() + Stream approvedSubjectTriples = approved.get() + .parallelStream() .filter(st -> st.getSubject().isTriple()) .map(st -> (Triple) st.getSubject()) .filter(t -> { @@ -801,7 +826,8 @@ Iterable getApprovedTriples(Resource subj, IRI pred, Value obj) { return obj == null || obj.equals(t.getObject()); }); - Stream approvedObjectTriples = approved.parallelStream() + Stream approvedObjectTriples = approved.get() + .parallelStream() .filter(st -> st.getObject().isTriple()) .map(st -> (Triple) st.getObject()) .filter(t -> { @@ -825,8 +851,8 @@ void removeApproved(Statement next) { long writeLock = readWriteLock.writeLock(); try { if (approved != null) { - approved.remove(next); - approvedEmpty = approved == null || approved.isEmpty(); + approved.get().remove(next); + approvedEmpty = approved == null || approved.get().isEmpty(); } } finally { readWriteLock.unlockWriter(writeLock); @@ -850,7 +876,7 @@ void sinkApproved(SailSink sink) { boolean readLock = readWriteLock.readLock(); try { if (approved != null) { - sink.approveAll(approved, approvedContexts); + sink.approveAll(approved.get(), approvedContexts); } } finally { readWriteLock.unlockReader(readLock); @@ -865,7 +891,7 @@ void sinkDeprecated(SailSink sink) { boolean readLock = readWriteLock.readLock(); try { if (deprecated != null) { - sink.deprecateAll(deprecated); + sink.deprecateAll(deprecated.get()); } } finally { readWriteLock.unlockReader(readLock); @@ -895,12 +921,12 @@ public void approveAll(Set approve, Set approveContexts) { try { if (deprecated != null) { - deprecated.removeAll(approve); + deprecated.get().removeAll(approve); } if (approved == null) { - approved = createEmptyModel(); + approved = new CountedReference<>(createEmptyModel()); } - approved.addAll(approve); + approved.get().addAll(approve); approvedEmpty = approvedEmpty && approve.isEmpty(); if (approveContexts != null) { @@ -921,19 +947,19 @@ public void deprecateAll(Set deprecate) { try { if (approved != null) { - approved.removeAll(deprecate); - approvedEmpty = approved == null || approved.isEmpty(); + approved.get().removeAll(deprecate); + approvedEmpty = approved == null || approved.get().isEmpty(); } if (deprecated == null) { - deprecated = createEmptyModel(); + deprecated = new CountedReference<>(createEmptyModel()); } - deprecated.addAll(deprecate); + deprecated.get().addAll(deprecate); deprecatedEmpty = deprecatedEmpty && deprecate.isEmpty(); for (Statement statement : deprecate) { Resource ctx = statement.getContext(); if (approvedContexts != null && approvedContexts.contains(ctx) - && !approved.contains(null, null, null, ctx)) { + && !approved.get().contains(null, null, null, ctx)) { approvedContexts.remove(ctx); } } diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java index 15a38e54272..08204d7f796 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java @@ -477,7 +477,10 @@ private void flush(SailSink sink) throws SailException { && !isChanged((Changeset) sink)) { // one change to apply that is not in use to an empty Changeset Changeset dst = (Changeset) sink; - dst.setChangeset(changes.pop()); + Changeset src = changes.pop(); + dst.setChangeset(src); + // correctly close changeset + src.close(); } else { Iterator iter = changes.iterator(); while (iter.hasNext()) { @@ -517,6 +520,9 @@ private void flush(Changeset change, SailSink sink) throws SailException { change.sinkDeprecated(sink); change.sinkApproved(sink); + + // correctly close changeset + change.close(); } }