-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-4554 Fix performance of memory overflow #4557
base: develop
Are you sure you want to change the base?
GH-4554 Fix performance of memory overflow #4557
Conversation
after:
before:
|
8603d3e
to
2b721c2
Compare
I have a problem regarding the detection of the right moment switching to a disk-based model:
The first iteration works fine and the statements are written to disk. During the second iteration GC somehow removes enough objects to not constantly increase the memory usage. |
The reason seems to be this condition in // Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize);
overflowToDisk();
} If, for example,
|
|
Impressive performance improvement! |
if (approved != null) { | ||
sink.approveAll(approved, approvedContexts); | ||
Model approvedLocal = approved; | ||
if (approvedLocal != null) { | ||
sink.approveAll(approvedLocal, approvedContexts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had assumed that this already was safe since it's wrapped in a read lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not sure if it interferes with close
as the latter does not use the lock while changing the variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think any of our code tries to read or write concurrently with close(). Or at least I hope not :-/
if (approved instanceof AutoCloseable) { | ||
try { | ||
((AutoCloseable) approved).close(); | ||
} catch (Exception e) { | ||
throw new SailException(e); | ||
} | ||
} | ||
approved = null; | ||
if (deprecated instanceof AutoCloseable) { | ||
try { | ||
((AutoCloseable) deprecated).close(); | ||
} catch (Exception e) { | ||
throw new SailException(e); | ||
} | ||
} | ||
deprecated = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting. I presume that this is for closing the overflow model. I think the overflow model has some mechanism for automatically closing it once it's unreachable, but if we can explicitly close it then that is much more performant.
I would recommend using try finally. Something like:
try{
if (approved instanceof AutoCloseable) {
try {
((AutoCloseable) approved).close();
} catch (Exception e) {
throw new SailException(e);
}
}
approved = null;
} finally {
if (deprecated instanceof AutoCloseable) {
try {
((AutoCloseable) deprecated).close();
} catch (Exception e) {
throw new SailException(e);
}
}
deprecated = null;
}
Maybe even introduce two local variables for approved
and deprecated
so that these can be set to null first and then closed after everything is set to null.
approved.addAll(approve); | ||
approvedEmpty = approved == null || approved.isEmpty(); | ||
approvedEmpty = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe approvedEmpty
should be set to approve.isEmpty()
since it's technically allowed to pass in an empty set.
Same for deprecatedEmpty
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approve.isEmpty()
was the problem here because it leads to flushing and committing the underlying transaction of MemoryOverflowModel. Can approved
be empty at this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It used to be approvedEmpty = approved == null || approved.isEmpty();
. Couldn't it instead be approvedEmpty = approve.isEmpty();
? Notice the difference that one uses approved and the other just approve.
disk = new SailSourceModel(store) { | ||
|
||
@Override | ||
protected void finalize() throws Throwable { | ||
logger.debug("finalizing {}", dataDir); | ||
if (disk == this) { | ||
try { | ||
store.close(); | ||
} catch (SailException e) { | ||
logger.error(e.toString(), e); | ||
} finally { | ||
FileUtil.deleteDir(dataDir); | ||
dataDir = null; | ||
store = null; | ||
disk = null; | ||
} | ||
} | ||
super.finalize(); | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave this code in, or copy the code from the MemoryOverflowIntoNativeStore that uses the Java 9 Cleaner. I had previously found a few issues with the code that handles closing of a ChangeSet, it's not exactly straight forward where and when a ChangeSet ends up being closed and there might still be bugs with how it's handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I would propose to implement finalize on ChangeSet
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rely on the fact that every ChangeSet is closed. Everything else means additional overhead for tracking the objects when we do not want to use finalize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overhead of the Java 9 Cleaner is very low, especially compared to finalize. I have been using ConcurrentCleaner in a few places already. It distributes the load of registering between 128 Cleaners based on the thread id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be weary of putting too much trust to the changeset getting closed. There may still be bugs with that.
public void close() throws IOException { | ||
if (disk != null) { | ||
logger.debug("closing {}", dataDir); | ||
disk.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this method fails then the rest of the code is ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've wrapped it in a try-finally-block.
@@ -258,9 +246,9 @@ private synchronized void checkMemoryOverflow() { | |||
} | |||
|
|||
// Sync if either the estimated size of the next block is larger than remaining memory, or | |||
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit) | |||
// if less than 25% of the heap is still free (this last condition to avoid GC overhead limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is too aggressive. If someone runs the NativeStore with 32 GB of memory, then it would overflow once the there is less then 8 GB free. I think it would overflow fairly frequently then. Previously the Math.min(...) call would ensure that we use a constant (maxBlockSize) once the user has a large amount of memory.
There must be a better way to handle this in general. Maybe we could have a largish array wrapped in a soft reference and assume that once it becomes GCed we should spill to disk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked ChatGPT-4 about this and it recommends trying something with the GarbageCollectorMXBean.
// Within a separate thread:
// Check GC load.
for (GarbageCollectorMXBean gcBean : gcBeans) {
long count = gcBean.getCollectionCount();
long time = gcBean.getCollectionTime();
Long prevCount = prevGcCountMap.get(gcBean.getName());
Long prevTime = prevGcDurationMap.get(gcBean.getName());
if (prevCount != null && prevTime != null) {
long countDiff = count - prevCount;
long timeDiff = time - prevTime;
// If more than 98% of the total time was spent doing GC...
if (countDiff != 0 && ((double) timeDiff / countDiff) > GC_TIME_WARNING_THRESHOLD) {
System.err.println("WARNING: High GC time: " + (timeDiff * 100 / countDiff) + "%");
}
}
prevGcCountMap.put(gcBean.getName(), count);
prevGcDurationMap.put(gcBean.getName(), time);
}
try {
Thread.sleep(5000); // Check every 5 seconds.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that maybe we should have a global class that monitors GC. Code that want's to be notified at certain GC thresholds can register a handler that the GC monitor will call if the threshold is reached. Once the handler is called it also gets removed.
import java.lang.management.*;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* GC Monitor for tracking and handling excessive garbage collection.
*/
public class GCMonitor {
private static final Thread monitorThread;
private static final Map<String, Long> prevGcDurationMap = new ConcurrentHashMap<>();
private static final Map<String, Long> prevGcCountMap = new ConcurrentHashMap<>();
private static final Map<HandlerReference, Double> handlers = new ConcurrentHashMap<>();
static {
// Initialize and start the monitoring thread.
monitorThread = new Thread(() -> {
// Get all garbage collector MXBeans.
List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
// Initialize previous GC counts and durations for each GC bean.
for (GarbageCollectorMXBean gcBean : gcBeans) {
prevGcCountMap.put(gcBean.getName(), gcBean.getCollectionCount());
prevGcDurationMap.put(gcBean.getName(), gcBean.getCollectionTime());
}
while (true) {
try {
// For each GC bean, calculate the GC load and call any handlers that are above their threshold.
for (GarbageCollectorMXBean gcBean : gcBeans) {
long count = gcBean.getCollectionCount();
long time = gcBean.getCollectionTime();
Long prevCount = prevGcCountMap.get(gcBean.getName());
Long prevTime = prevGcDurationMap.get(gcBean.getName());
if (prevCount != null && prevTime != null) {
long countDiff = count - prevCount;
long timeDiff = time - prevTime;
if (countDiff != 0) {
double gcLoad = (double) timeDiff / countDiff;
// Remove any stale handlers or handlers that have crossed their threshold.
handlers.entrySet().removeIf(entry -> {
HandlerReference handlerReference = entry.getKey();
if (handlerReference.getReference().get() == null) {
return true;
} else if (gcLoad > entry.getValue()) {
try {
handlerReference.getHandler().accept(gcLoad);
return true;
} catch (Exception e) {
// ignore exceptions thrown by the handler
}
}
return false;
});
}
}
prevGcCountMap.put(gcBean.getName(), count);
prevGcDurationMap.put(gcBean.getName(), time);
}
Thread.sleep(5000);
} catch (InterruptedException e) {
// The thread should never be interrupted. Reset the interrupted status.
Thread.currentThread().interrupt();
} catch (Exception e) {
// Log any unexpected exceptions and continue execution.
System.err.println("Unexpected exception in GC monitoring thread: " + e.getMessage());
e.printStackTrace();
}
}
});
// The monitor thread should not prevent the JVM from exiting.
monitorThread.setDaemon(true);
monitorThread.start();
}
/**
* Register a new GC handler with a threshold and an associated reference object.
*
* @param handler The handler to be called when the GC load exceeds the threshold.
* @param threshold The GC load threshold.
* @param ref The reference object to be associated with the handler.
* @return An AutoCloseable object. Calling close on this object will unregister the handler.
*/
public static AutoCloseable registerHandler(Consumer<Double> handler, double threshold, Object ref) {
HandlerReference handlerReference = new HandlerReference(handler, ref);
handlers.put(handlerReference, threshold);
// Return an AutoCloseable object that can be used to unregister the handler.
return () -> handlers.remove(handlerReference);
}
/**
* Inner class to hold the handler and the weak reference.
*/
private static class HandlerReference {
private final Consumer<Double> handler;
private final WeakReference<Object> reference;
private HandlerReference(Consumer<Double> handler, Object ref) {
this.handler = handler;
this.reference = new WeakReference<>(ref);
}
private Consumer<Double> getHandler() {
return handler;
}
private WeakReference<Object> getReference() {
return reference;
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what ChatGPT gave me after some more detailed instructions from me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. I am often suprised about the answers.
This is probably the right direction. Maybe we should also allow to compute the GC load on demand,, for example after 10,000 inserted statements, as 5 seconds may be too long to react while quickly loading data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've integrated it into MemoryOverflowModel for now.
929f908
to
4f67b34
Compare
|
d477015
to
545d38a
Compare
try { | ||
if (approved instanceof AutoCloseable) { | ||
try { | ||
((AutoCloseable) approved).close(); | ||
} catch (Exception e) { | ||
throw new SailException(e); | ||
} | ||
} | ||
approved = null; | ||
} finally { | ||
if (deprecated instanceof AutoCloseable) { | ||
try { | ||
((AutoCloseable) deprecated).close(); | ||
} catch (Exception e) { | ||
throw new SailException(e); | ||
} | ||
} | ||
deprecated = null; | ||
} | ||
deprecated = null; | ||
approvedContexts = null; | ||
deprecatedContexts = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you wrap your try-finally in another try-finally and in the finally set all the fields to null? That way we are sure to always clear up and references that could slow down GC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed this.
Perfomance looks a lot better now. The ± is a bit big. Would be better if the ± was somewhere around 10%. Overall though this is very impressive @kenwenzel ! |
2d5f307
to
cc63c42
Compare
@hmottestad I had to add some more logic to correctly close changesets under all circumstances. |
eda84e4
to
ec1bd49
Compare
@hmottestad The Changeset code feels really ugly. Do you see any chance to get rid of |
@hmottestad What are our next steps regarding this issue? |
I need to take a good look at the code and test it out for myself. I also need to take a look at some benchmarks to see how your changes to the Changeset impact serializable transactions. I am fairly limited on time at the moment though, but i'll do my best. |
I also found that the changesets that are created by shallowClone() are not always closed. I don't know if that matters or not. You can test it out yourself with the following Changeset code. You can set a breakpoint at the logger line at the bottom of the class and then debug the tests in the /*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.base;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.ModelFactory;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Triple;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.util.Statements;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.sail.SailConflictException;
import org.eclipse.rdf4j.sail.SailException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Set of changes applied to an {@link SailSourceBranch} awaiting to be flushed into its backing {@link SailSource}.
*
* @author James Leigh
*/
@InternalUseOnly
public abstract class Changeset implements SailSink, ModelFactory {
private static final Logger logger = LoggerFactory.getLogger(Changeset.class);
private final static ConcurrentCleaner cleaner = new ConcurrentCleaner();
private StackTraceElement[] closedAt;
static class CountedReference<T> {
final T referent;
final AtomicInteger count = new AtomicInteger(1);
CountedReference(T referent) {
this.referent = referent;
}
CountedReference<T> retain() {
count.incrementAndGet();
return this;
}
boolean release() {
return count.decrementAndGet() == 0;
}
T get() {
return referent;
}
}
final AdderBasedReadWriteLock readWriteLock = new AdderBasedReadWriteLock();
final AdderBasedReadWriteLock refBacksReadWriteLock = new AdderBasedReadWriteLock();
final Semaphore prependLock = new Semaphore(1);
/**
* Set of {@link SailDataset}s that are currently using this {@link Changeset} to derive the state of the
* {@link SailSource}.
*/
private List<SailDatasetImpl> refbacks;
/**
* {@link Changeset}s that have been {@link #flush()}ed to the same {@link SailSourceBranch}, since this object was
* {@link #flush()}ed.
*/
private Set<Changeset> prepend;
/**
* When in {@link IsolationLevels#SERIALIZABLE} this contains all the observed {@link StatementPattern}s that were
* observed by {@link ObservingSailDataset}.
*/
private volatile Set<SimpleStatementPattern> observed;
/**
* Statements that have been added as part of a transaction, but has not yet been committed.
* <p>
* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE
*/
private volatile CountedReference<Model> approved;
private volatile boolean approvedEmpty = true;
/**
* Explicit statements that have been removed as part of a transaction, but have not yet been committed.
* <p>
* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE
*/
private volatile CountedReference<Model> deprecated;
private volatile boolean deprecatedEmpty = true;
/**
* Set of contexts of the {@link #approved} statements.
*/
private Set<Resource> approvedContexts;
/**
* Set of contexts that were passed to {@link #clear(Resource...)}.
*/
private volatile Set<Resource> deprecatedContexts;
/**
* Additional namespaces added.
*/
private Map<String, String> addedNamespaces;
/**
* Namespace prefixes that were removed.
*/
private Set<String> removedPrefixes;
/**
* If all namespaces were removed, other than {@link #addedNamespaces}.
*/
private volatile boolean namespaceCleared;
/**
* If all statements were removed, other than {@link #approved}.
*/
private volatile boolean statementCleared;
private boolean closed;
private final AtomicBoolean atomicClosed = new AtomicBoolean(false);
private final Throwable throwable;
private final Cleaner.Cleanable register;
public Changeset() {
throwable = new Throwable();
Runnable runnable = new Cleanable(atomicClosed);
register = cleaner.register(this, runnable);
}
@Override
public void close() throws SailException {
this.closedAt = Thread.currentThread().getStackTrace();
atomicClosed.set(true);
register.clean();
closed = true;
refbacks = null;
prepend = null;
observed = null;
approvedContexts = null;
deprecatedContexts = null;
addedNamespaces = null;
removedPrefixes = null;
try {
if (approved != null && approved.release() && approved.get() instanceof AutoCloseable) {
((AutoCloseable) approved.get()).close();
}
} catch (Exception e) {
throw new SailException(e);
} finally {
approved = null;
if (deprecated != null && deprecated.release() && deprecated.get() instanceof AutoCloseable) {
try {
((AutoCloseable) deprecated.get()).close();
} catch (Exception e) {
throw new SailException(e);
} finally {
deprecated = null;
}
}
}
}
@Override
public void prepare() throws SailException {
assert !closed;
if (prepend != null && observed != null) {
for (SimpleStatementPattern p : observed) {
Resource subj = p.getSubject();
IRI pred = p.getPredicate();
Value obj = p.getObject();
Resource context = p.getContext();
Resource[] contexts;
if (p.isAllContexts()) {
contexts = new Resource[0];
} else {
contexts = new Resource[] { context };
}
for (Changeset changeset : prepend) {
if (changeset.hasApproved(subj, pred, obj, contexts)
|| (changeset.hasDeprecated(subj, pred, obj, contexts))) {
throw new SailConflictException("Observed State has Changed");
}
}
}
}
}
boolean hasApproved(Resource subj, IRI pred, Value obj, Resource[] contexts) {
assert !closed;
if (approved == null || approvedEmpty) {
return false;
}
boolean readLock = readWriteLock.readLock();
try {
return approved.get().contains(subj, pred, obj, contexts);
} finally {
readWriteLock.unlockReader(readLock);
}
}
boolean hasDeprecated(Resource subj, IRI pred, Value obj, Resource[] contexts) {
assert !closed;
if ((deprecated == null || deprecatedEmpty) && deprecatedContexts == null) {
return false;
}
boolean readLock = readWriteLock.readLock();
try {
if (deprecatedContexts != null) {
for (Resource context : contexts) {
if (deprecatedContexts.contains(context)) {
return true;
}
}
}
return deprecated.get().contains(subj, pred, obj, contexts);
} finally {
readWriteLock.unlockReader(readLock);
}
}
public void addRefback(SailDatasetImpl dataset) {
assert !closed;
long writeLock = refBacksReadWriteLock.writeLock();
try {
if (refbacks == null) {
refbacks = new ArrayList<>();
}
refbacks.add(dataset);
} finally {
refBacksReadWriteLock.unlockWriter(writeLock);
}
}
public void removeRefback(SailDatasetImpl dataset) {
if (refbacks != null) {
// assert !closed;
long writeLock = refBacksReadWriteLock.writeLock();
try {
if (refbacks != null) {
refbacks.removeIf(d -> d == dataset);
}
} finally {
refBacksReadWriteLock.unlockWriter(writeLock);
}
}
}
public boolean isRefback() {
assert !closed;
boolean readLock = refBacksReadWriteLock.readLock();
try {
return refbacks != null && !refbacks.isEmpty();
} finally {
refBacksReadWriteLock.unlockReader(readLock);
}
}
public void prepend(Changeset changeset) {
assert !closed;
prependLock.acquireUninterruptibly();
try {
if (prepend == null) {
prepend = Collections.newSetFromMap(new IdentityHashMap<>());
}
prepend.add(changeset);
} finally {
prependLock.release();
}
}
@Override
public void setNamespace(String prefix, String name) {
assert !closed;
long writeLock = readWriteLock.writeLock();
try {
if (removedPrefixes == null) {
removedPrefixes = new HashSet<>();
}
removedPrefixes.add(prefix);
if (addedNamespaces == null) {
addedNamespaces = new HashMap<>();
}
addedNamespaces.put(prefix, name);
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void removeNamespace(String prefix) {
assert !closed;
long writeLock = readWriteLock.writeLock();
try {
if (addedNamespaces != null) {
addedNamespaces.remove(prefix);
}
if (removedPrefixes == null) {
removedPrefixes = new HashSet<>();
}
removedPrefixes.add(prefix);
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void clearNamespaces() {
assert !closed;
namespaceCleared = true;
long writeLock = readWriteLock.writeLock();
try {
if (removedPrefixes != null) {
removedPrefixes.clear();
}
if (addedNamespaces != null) {
addedNamespaces.clear();
}
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void observe(Resource subj, IRI pred, Value obj, Resource... contexts)
throws SailConflictException {
assert !closed;
long writeLock = readWriteLock.writeLock();
try {
if (observed == null) {
observed = new HashSet<>();
}
if (contexts == null) {
observed.add(new SimpleStatementPattern(subj, pred, obj, null, false));
} else if (contexts.length == 0) {
observed.add(new SimpleStatementPattern(subj, pred, obj, null, true));
} else {
for (Resource ctx : contexts) {
observed.add(new SimpleStatementPattern(subj, pred, obj, ctx, false));
}
}
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void observe(Resource subj, IRI pred, Value obj, Resource context)
throws SailConflictException {
assert !closed;
long writeLock = readWriteLock.writeLock();
try {
if (observed == null) {
observed = new HashSet<>();
}
observed.add(new SimpleStatementPattern(subj, pred, obj, context, false));
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void observeAll(Set<SimpleStatementPattern> observed) {
assert !closed;
long writeLock = readWriteLock.writeLock();
try {
if (this.observed == null) {
this.observed = new HashSet<>(observed);
} else {
this.observed.addAll(observed);
}
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void clear(Resource... contexts) {
long writeLock = readWriteLock.writeLock();
try {
if (contexts != null && contexts.length == 0) {
statementCleared = true;
if (approved != null) {
approved.get().clear();
}
if (approvedContexts != null) {
approvedContexts.clear();
}
} else {
if (deprecatedContexts == null) {
deprecatedContexts = new HashSet<>();
}
if (approved != null) {
approved.get().remove(null, null, null, contexts);
}
if (approvedContexts != null && contexts != null) {
for (Resource resource : contexts) {
approvedContexts.remove(resource);
}
}
if (contexts != null) {
deprecatedContexts.addAll(Arrays.asList(contexts));
}
}
approvedEmpty = approved == null || approved.get().isEmpty();
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void approve(Statement statement) {
assert !closed;
long writeLock = readWriteLock.writeLock();
try {
if (deprecated != null) {
deprecated.get().remove(statement);
deprecatedEmpty = deprecated == null || deprecated.get().isEmpty();
}
if (approved == null) {
approved = new CountedReference<>(createEmptyModel());
}
approved.get().add(statement);
approvedEmpty = false;
if (statement.getContext() != null) {
if (approvedContexts == null) {
approvedContexts = new HashSet<>();
}
approvedContexts.add(statement.getContext());
}
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws SailException {
approve(Statements.statement(subj, pred, obj, ctx));
}
@Override
public void deprecate(Statement statement) {
assert !closed;
long writeLock = readWriteLock.writeLock();
try {
if (approved != null) {
approved.get().remove(statement);
approvedEmpty = approved == null || approved.get().isEmpty();
}
if (deprecated == null) {
deprecated = new CountedReference<>(createEmptyModel());
}
deprecated.get().add(statement);
deprecatedEmpty = false;
Resource ctx = statement.getContext();
if (approvedContexts != null && approvedContexts.contains(ctx)
&& !approved.get().contains(null, null, null, ctx)) {
approvedContexts.remove(ctx);
}
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (observed != null) {
sb.append(observed.size());
sb.append(" observations, ");
}
if (namespaceCleared) {
sb.append("namespaceCleared, ");
}
if (removedPrefixes != null) {
sb.append(removedPrefixes.size());
sb.append(" removedPrefixes, ");
}
if (addedNamespaces != null) {
sb.append(addedNamespaces.size());
sb.append(" addedNamespaces, ");
}
if (statementCleared) {
sb.append("statementCleared, ");
}
if (deprecatedContexts != null && !deprecatedContexts.isEmpty()) {
sb.append(deprecatedContexts.size());
sb.append(" deprecatedContexts, ");
}
if (deprecated != null) {
sb.append(deprecated.get().size());
sb.append(" deprecated, ");
}
if (approved != null) {
sb.append(approved.get().size());
sb.append(" approved, ");
}
if (sb.length() > 0) {
return sb.substring(0, sb.length() - 2);
} else {
return super.toString();
}
}
protected void setChangeset(Changeset from) {
assert !closed;
assert !from.closed;
this.observed = from.observed;
this.approved = from.approved != null ? from.approved.retain() : null;
this.approvedEmpty = from.approvedEmpty;
this.deprecated = from.deprecated != null ? from.deprecated.retain() : null;
this.deprecatedEmpty = from.deprecatedEmpty;
this.approvedContexts = from.approvedContexts;
this.deprecatedContexts = from.deprecatedContexts;
this.addedNamespaces = from.addedNamespaces;
this.removedPrefixes = from.removedPrefixes;
this.namespaceCleared = from.namespaceCleared;
this.statementCleared = from.statementCleared;
}
/**
* Create a shallow clone of this Changeset. The shallow clone does not clone the underlying data structures, this
* means that any changes made to the original will potentially be reflected in the clone and vice versa.
*
* @return a new Changeset that is a shallow clone of the current Changeset.
*/
public Changeset shallowClone() {
assert !closed;
Changeset changeset = new Changeset() {
@Override
public void flush() throws SailException {
throw new UnsupportedOperationException();
}
@Override
public Model createEmptyModel() {
return Changeset.this.createEmptyModel();
}
};
changeset.setChangeset(this);
return changeset;
}
public Set<SimpleStatementPattern> getObserved() {
assert !closed;
boolean readLock = readWriteLock.readLock();
try {
return observed == null ? null : Collections.unmodifiableSet(observed);
} finally {
readWriteLock.unlockReader(readLock);
}
}
/**
* @deprecated Use getObserved() instead!
*/
@Deprecated
public Set<StatementPattern> getObservations() {
assert !closed;
boolean readLock = readWriteLock.readLock();
try {
if (observed == null) {
return null;
}
return observed.stream()
.map(simpleStatementPattern -> new StatementPattern(
new Var("s", simpleStatementPattern.getSubject()),
new Var("p", simpleStatementPattern.getPredicate()),
new Var("o", simpleStatementPattern.getObject()),
simpleStatementPattern.isAllContexts() ? null
: new Var("c", simpleStatementPattern.getContext())
)
)
.collect(Collectors.toCollection(HashSet::new));
} finally {
readWriteLock.unlockReader(readLock);
}
}
public Set<Resource> getApprovedContexts() {
assert !closed;
boolean readLock = readWriteLock.readLock();
try {
return cloneSet(approvedContexts);
} finally {
readWriteLock.unlockReader(readLock);
}
}
public Set<Resource> getDeprecatedContexts() {
if (closed) {
throw new AssertionError();
}
if (deprecatedContexts == null) {
return null;
}
boolean readLock = readWriteLock.readLock();
try {
return cloneSet(deprecatedContexts);
} finally {
readWriteLock.unlockReader(readLock);
}
}
public boolean isStatementCleared() {
assert !closed;
return statementCleared;
}
public Map<String, String> getAddedNamespaces() {
assert !closed;
boolean readLock = readWriteLock.readLock();
try {
return addedNamespaces;
} finally {
readWriteLock.unlockReader(readLock);
}
}
public Set<String> getRemovedPrefixes() {
assert !closed;
boolean readLock = readWriteLock.readLock();
try {
return cloneSet(removedPrefixes);
} finally {
readWriteLock.unlockReader(readLock);
}
}
public boolean isNamespaceCleared() {
assert !closed;
return namespaceCleared;
}
public boolean hasDeprecated() {
assert !closed;
if (deprecatedContexts == null) {
return deprecated != null && !deprecatedEmpty;
} else {
boolean readLock = readWriteLock.readLock();
try {
return (deprecated != null && !deprecatedEmpty) || !deprecatedContexts.isEmpty();
} finally {
readWriteLock.unlockReader(readLock);
}
}
}
boolean isChanged() {
assert !closed;
return approved != null || deprecated != null || approvedContexts != null
|| deprecatedContexts != null || addedNamespaces != null
|| removedPrefixes != null || statementCleared || namespaceCleared
|| observed != null;
}
List<Statement> getDeprecatedStatements() {
assert !closed;
if (deprecated == null || deprecatedEmpty) {
return Collections.emptyList();
}
boolean readLock = readWriteLock.readLock();
try {
return new ArrayList<>(deprecated.get());
} finally {
readWriteLock.unlockReader(readLock);
}
}
List<Statement> getApprovedStatements() {
assert !closed;
if (approved == null || approvedEmpty) {
return Collections.emptyList();
}
boolean readLock = readWriteLock.readLock();
try {
return new ArrayList<>(approved.get());
} finally {
readWriteLock.unlockReader(readLock);
}
}
boolean hasDeprecated(Statement statement) {
assert !closed;
if ((deprecated == null || deprecatedEmpty) && deprecatedContexts == null) {
return false;
}
boolean readLock = readWriteLock.readLock();
try {
if (deprecatedContexts != null) {
if (deprecatedContexts.contains(statement.getContext())) {
return true;
}
}
if (deprecated != null) {
return deprecated.get().contains(statement);
} else {
return false;
}
} finally {
readWriteLock.unlockReader(readLock);
}
}
boolean hasApproved() {
assert !closed;
return approved != null && !approvedEmpty;
}
Iterable<Statement> getApprovedStatements(Resource subj, IRI pred, Value obj,
Resource[] contexts) {
assert !closed;
if (approved == null || approvedEmpty) {
return Collections.emptyList();
}
boolean readLock = readWriteLock.readLock();
try {
Iterable<Statement> statements = approved.get().getStatements(subj, pred, obj, contexts);
// This is a synchronized context, users of this method will be allowed to use the results at their leisure.
// We
// provide a copy of the data so that there will be no concurrent modification exceptions!
if (statements instanceof Collection) {
return new ArrayList<>((Collection<? extends Statement>) statements);
} else {
List<Statement> ret = List.of();
for (Statement statement : statements) {
if (ret.isEmpty()) {
ret = List.of(statement);
} else {
if (ret.size() == 1) {
ret = new ArrayList<>(ret);
}
ret.add(statement);
}
}
return ret;
}
} finally {
readWriteLock.unlockReader(readLock);
}
}
Iterable<Triple> getApprovedTriples(Resource subj, IRI pred, Value obj) {
assert !closed;
if (approved == null || approvedEmpty) {
return Collections.emptyList();
}
boolean readLock = readWriteLock.readLock();
try {
// TODO none of this is particularly well thought-out in terms of performance, but we are aiming
// for functionally complete first.
Stream<Triple> approvedSubjectTriples = approved.get()
.parallelStream()
.filter(st -> st.getSubject().isTriple())
.map(st -> (Triple) st.getSubject())
.filter(t -> {
if (subj != null && !subj.equals(t.getSubject())) {
return false;
}
if (pred != null && !pred.equals(t.getPredicate())) {
return false;
}
return obj == null || obj.equals(t.getObject());
});
Stream<Triple> approvedObjectTriples = approved.get()
.parallelStream()
.filter(st -> st.getObject().isTriple())
.map(st -> (Triple) st.getObject())
.filter(t -> {
if (subj != null && !subj.equals(t.getSubject())) {
return false;
}
if (pred != null && !pred.equals(t.getPredicate())) {
return false;
}
return obj == null || obj.equals(t.getObject());
});
return Stream.concat(approvedSubjectTriples, approvedObjectTriples).collect(Collectors.toList());
} finally {
readWriteLock.unlockReader(readLock);
}
}
void removeApproved(Statement next) {
assert !closed;
long writeLock = readWriteLock.writeLock();
try {
if (approved != null) {
approved.get().remove(next);
approvedEmpty = approved == null || approved.get().isEmpty();
}
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
private <T> Set<T> cloneSet(Set<T> set) {
assert !closed;
if (set == null) {
return null;
}
return new HashSet<>(set);
}
void sinkApproved(SailSink sink) {
if (approved == null || approvedEmpty) {
return;
}
boolean readLock = readWriteLock.readLock();
try {
if (approved != null) {
sink.approveAll(approved.get(), approvedContexts);
}
} finally {
readWriteLock.unlockReader(readLock);
}
}
void sinkDeprecated(SailSink sink) {
if (deprecated == null || deprecatedEmpty) {
return;
}
boolean readLock = readWriteLock.readLock();
try {
if (deprecated != null) {
sink.deprecateAll(deprecated.get());
}
} finally {
readWriteLock.unlockReader(readLock);
}
}
public void sinkObserved(SailSink sink) {
if (observed == null) {
return;
}
boolean readLock = readWriteLock.readLock();
try {
if (observed != null) {
sink.observeAll(observed);
}
} finally {
readWriteLock.unlockReader(readLock);
}
}
@Override
public void approveAll(Set<Statement> approve, Set<Resource> approveContexts) {
long writeLock = readWriteLock.writeLock();
try {
if (deprecated != null) {
deprecated.get().removeAll(approve);
}
if (approved == null) {
approved = new CountedReference<>(createEmptyModel());
}
approved.get().addAll(approve);
approvedEmpty = approvedEmpty && approve.isEmpty();
if (approveContexts != null) {
if (approvedContexts == null) {
approvedContexts = new HashSet<>();
}
approvedContexts.addAll(approveContexts);
}
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
@Override
public void deprecateAll(Set<Statement> deprecate) {
long writeLock = readWriteLock.writeLock();
try {
if (approved != null) {
approved.get().removeAll(deprecate);
approvedEmpty = approved == null || approved.get().isEmpty();
}
if (deprecated == null) {
deprecated = new CountedReference<>(createEmptyModel());
}
deprecated.get().addAll(deprecate);
deprecatedEmpty = deprecatedEmpty && deprecate.isEmpty();
for (Statement statement : deprecate) {
Resource ctx = statement.getContext();
if (approvedContexts != null && approvedContexts.contains(ctx)
&& !approved.get().contains(null, null, null, ctx)) {
approvedContexts.remove(ctx);
}
}
} finally {
readWriteLock.unlockWriter(writeLock);
}
}
private static class AdderBasedReadWriteLock {
StampedLock writeLock = new StampedLock();
// LongAdder for handling readers. When the count is equal then there are no active readers.
private final LongAdder readersLocked = new LongAdder();
private final LongAdder readersUnlocked = new LongAdder();
// do not use this directly, use the VarHandle instead
public boolean readLock() {
while (true) {
readersLocked.increment();
if (!writeLock.isWriteLocked()) {
// Everything is good! We have acquired a read-lock and there are no active writers.
return true;
} else {
// Release our read lock, so we don't block any writers.
readersUnlocked.increment();
while (writeLock.isWriteLocked()) {
Thread.onSpinWait();
}
}
}
}
public void unlockReader(boolean locked) {
if (locked) {
VarHandle.acquireFence();
readersUnlocked.increment();
} else {
throw new IllegalMonitorStateException();
}
}
public long writeLock() {
// Acquire a write-lock.
long stamp = writeLock.writeLock();
// Wait for active readers to finish.
while (true) {
// The order is important here.
long unlockedSum = readersUnlocked.sum();
long lockedSum = readersLocked.sum();
if (unlockedSum == lockedSum) {
// No active readers.
VarHandle.releaseFence();
return stamp;
} else {
Thread.onSpinWait();
}
}
}
public void unlockWriter(long stamp) {
writeLock.unlockWrite(stamp);
}
}
public static class SimpleStatementPattern {
final private Resource subject;
final private IRI predicate;
final private Value object;
final private Resource context;
// true if the context is the union of all contexts
final private boolean allContexts;
public SimpleStatementPattern(Resource subject, IRI predicate, Value object, Resource context,
boolean allContexts) {
this.subject = subject;
this.predicate = predicate;
this.object = object;
this.context = context;
this.allContexts = allContexts;
}
public Resource getSubject() {
return subject;
}
public IRI getPredicate() {
return predicate;
}
public Value getObject() {
return object;
}
public Resource getContext() {
return context;
}
public boolean isAllContexts() {
return allContexts;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SimpleStatementPattern that = (SimpleStatementPattern) o;
return allContexts == that.allContexts && Objects.equals(subject, that.subject)
&& Objects.equals(predicate, that.predicate) && Objects.equals(object, that.object)
&& Objects.equals(context, that.context);
}
@Override
public int hashCode() {
int result = 1;
result = 31 * result + (subject == null ? 0 : subject.hashCode());
result = 31 * result + (predicate == null ? 0 : ((Object) predicate).hashCode());
result = 31 * result + (object == null ? 0 : object.hashCode());
result = 31 * result + (context == null ? 0 : context.hashCode());
result = 31 * result + ((Object) allContexts).hashCode();
return result;
}
}
private static class Cleanable implements Runnable {
private final AtomicBoolean atomicClosed;
private final StackTraceElement[] stackTrace;
private final Throwable throwable;
public Cleanable(AtomicBoolean atomicClosed) {
this.atomicClosed = atomicClosed;
this.stackTrace = Thread.currentThread().getStackTrace();
this.throwable = new Throwable();
}
@Override
public void run() {
if (!atomicClosed.get()) {
logger.error("Changeset was never closed", throwable);
}
}
}
} |
@hmottestad Thank you for your support :-). I'll try to take a look within the next days. The best would be to get rid of shallowClone completely but I am not sure if this is possible. |
I think I introduced shallowClone so that I could optimise the close method. Without shallowClone() a Changeset will be used after it has been closed, which really shouldn't be the case anyway. You could split out the performance stuff into its own branch/PR and then focus on the close() stuff separately. I still think it's hard to trace down all the places where the Changeset should/shouldn't be closed and that's it's easier to call it best effort and instead use a Java 9 Cleaner (or preferably our own ConcurrentCleaner) to close any overflow model that doesn't get closed correctly by the Changeset. |
This is a phenomenal change! I have been experiencing the issue with that as well. Great work @kenwenzel, I will be watching this PR. |
@hmottestad Should we continue working on this? |
Yeah sure. I've been testing out some other fixes here: #4974 |
…ry load of LinkedHashModel
- do not verify additions in Changeset via isEmpty() - proactively close MemoryOverflowModel in Changeset by using AutoClosable interface - use GarbageCollectorMXBean to monitor GC load - do not isolate transactions in LmdbStore when used in MemoryOverflowModel # Conflicts: # core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java # core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/MemoryOverflowModel.java # core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/SailSourceModel.java # core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/MemoryOverflowModel.java # core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/SailSourceModel.java
- some shallow copies of Changeset were not closed at all - use reference counting for approved and deprecated models
2cdcd7a
to
5610e2d
Compare
cb896ce
to
cc7383f
Compare
1c13bb8
to
553a41e
Compare
GitHub issue resolved: #4554
Briefly describe the changes proposed in this PR:
Increase the performance for large transactions requiring overflow to disk
for NativeStore and LmdbStore by avoiding unnecessary checks for existence of statements.
PR Author Checklist (see the contributor guidelines for more details):
mvn process-resources
to format from the command line)