diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java index 15a38e54272..621c61a98d1 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java @@ -40,6 +40,8 @@ */ class SailSourceBranch implements SailSource { + private static final long BACKOFF_START = Runtime.getRuntime().maxMemory() / 1024 / 1024 / 1024 * 10; + private static final Logger logger = LoggerFactory.getLogger(SailSourceBranch.class); /** @@ -404,6 +406,31 @@ private boolean isChanged(Changeset change) { private SailDataset derivedFromSerializable(IsolationLevel level) throws SailException { try { semaphore.lock(); + + if (changes.size() > BACKOFF_START) { + int size = changes.size(); + Changeset peekLast = changes.peekLast(); + compressChanges(); + int backoff = Math.min(5, changes.size()); + while (peekLast == changes.peekLast() && backoff-- > 0) { + peekLast = changes.peekLast(); + try { + semaphore.unlock(); + if (semaphore.isHeldByCurrentThread()) { + System.out.println(); + } + System.out.println("Backing off: " + changes.size()); + Thread.sleep(10); + compressChanges(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); + } finally { + semaphore.lock(); + } + } + } + if (serializable == null && level.isCompatibleWith(IsolationLevels.SERIALIZABLE)) { serializable = backingSource.sink(level); }