Skip to content

Commit

Permalink
GH-2998 improve memory overflow performance of the NativeStore
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Jun 5, 2024
1 parent 7aaffd1 commit 1932cff
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -48,11 +49,14 @@ abstract class MemoryOverflowModel extends AbstractModel {

private static final Runtime RUNTIME = Runtime.getRuntime();

private static final int LARGE_BLOCK = 10000;
private static final int LARGE_BLOCK = 1024 * 5;

private static volatile boolean overflow;

// To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think
// we have space for one more block. The limit is currently set at 32 MB
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024;
// we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps.
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024
: 32 * 1024 * 1024;

final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class);

Expand All @@ -71,7 +75,7 @@ abstract class MemoryOverflowModel extends AbstractModel {
SimpleValueFactory vf = SimpleValueFactory.getInstance();

public MemoryOverflowModel() {
memory = new LinkedHashModel(LARGE_BLOCK);
memory = new LinkedHashModel(LARGE_BLOCK * 2);
}

public MemoryOverflowModel(Model model) {
Expand Down Expand Up @@ -138,6 +142,33 @@ public boolean add(Statement st) {
return getDelegate().add(st);
}

@Override
public boolean addAll(Collection<? extends Statement> c) {
checkMemoryOverflow();
if (disk != null || c.size() <= 1024) {
return getDelegate().addAll(c);
} else {
boolean ret = false;
HashSet<Statement> buffer = new HashSet<>();
for (Statement st : c) {
buffer.add(st);
if (buffer.size() >= 1024) {
ret |= getDelegate().addAll(buffer);
buffer.clear();
innerCheckMemoryOverflow();
}
}
if (!buffer.isEmpty()) {
ret |= getDelegate().addAll(buffer);
buffer.clear();
}

return ret;

}

}

@Override
public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) {
return getDelegate().remove(subj, pred, obj, contexts);
Expand Down Expand Up @@ -195,13 +226,23 @@ public synchronized void removeTermIteration(Iterator<Statement> iter, Resource
protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException;

private Model getDelegate() {
LinkedHashModel memory = this.memory;
var memory = this.memory;
if (memory != null) {
return memory;
} else {
synchronized (this) {
var disk = this.disk;
if (disk != null) {
return disk;
}
synchronized (this) {
if (this.memory != null) {
return this.memory;
}
if (this.disk != null) {
return this.disk;
}
throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state");
}
}
}

Expand Down Expand Up @@ -232,45 +273,76 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx
}
}

private synchronized void checkMemoryOverflow() {
if (disk == null) {
int size = size();
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();

// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();

// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();

// estimated memory used
long used = totalMemory - freeMemory;

// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;

if (baseline > 0) {
long blockSize = used - baseline;
if (blockSize > maxBlockSize) {
maxBlockSize = blockSize;
}

// Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize);
overflowToDisk();
}
private void checkMemoryOverflow() {
if (disk == getDelegate()) {
return;
}

if (overflow) {
innerCheckMemoryOverflow();
}
int size = size() + 1;
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
innerCheckMemoryOverflow();
}

}

private void innerCheckMemoryOverflow() {
if (disk == getDelegate()) {
return;
}

// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();

// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();

// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();

// estimated memory used
long used = totalMemory - freeMemory;

// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;

if (baseline > 0) {
long blockSize = used - baseline;
if (blockSize > maxBlockSize) {
maxBlockSize = blockSize;
}
if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) {
// stricter memory requirements to not overflow if other models are overflowing
logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
overflowToDisk();
System.gc();
} else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
// Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)

logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
overflowToDisk();
System.gc();
} else {
if (overflow) {
overflow = false;
}
baseline = used;
}
}
baseline = used;
}

private synchronized void overflowToDisk() {
overflow = true;

if (memory == null) {
assert disk != null;
return;
}

try {
LinkedHashModel memory = this.memory;
this.memory = null;
Expand All @@ -279,8 +351,7 @@ private synchronized void overflowToDisk() {
dataDir = Files.createTempDirectory("model").toFile();
logger.debug("memory overflow using temp directory {}", dataDir);
store = createSailStore(dataDir);
disk = new SailSourceModel(store);
disk.addAll(memory);
this.disk = new SailSourceModel(store, memory);
logger.debug("overflow synced to disk");
} catch (IOException | SailException e) {
String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ CloseableIteration<? extends Statement> createStatementIterator(Resource subj, I
return tripleStore.cardinality(subjID, predID, objID, contextID);
}

public void disableTxnStatus() {
this.tripleStore.disableTxnStatus();
}

private final class NativeSailSource extends BackingSailSource {

private final boolean explicit;
Expand Down Expand Up @@ -444,6 +448,44 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai
addStatement(subj, pred, obj, explicit, ctx);
}

@Override
public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts) {
sinkStoreAccessLock.lock();
startTriplestoreTransaction();

try {
for (Statement statement : approved) {
Resource subj = statement.getSubject();
IRI pred = statement.getPredicate();
Value obj = statement.getObject();
Resource context = statement.getContext();

int subjID = valueStore.storeValue(subj);
int predID = valueStore.storeValue(pred);
int objID = valueStore.storeValue(obj);

int contextID = 0;
if (context != null) {
contextID = valueStore.storeValue(context);
}

boolean wasNew = tripleStore.storeTriple(subjID, predID, objID, contextID, explicit);
if (wasNew && context != null) {
contextStore.increment(context);
}

}
} catch (IOException e) {
throw new SailException(e);
} catch (RuntimeException e) {
logger.error("Encountered an unexpected problem while trying to add a statement", e);
throw e;
} finally {
sinkStoreAccessLock.unlock();
}

}

@Override
public void deprecate(Statement statement) throws SailException {
removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ private static final class OverFlowStoreCleaner implements Runnable {

private OverFlowStoreCleaner(NativeSailStore nativeSailStore, File dataDir) {
this.nativeSailStore = nativeSailStore;
nativeSailStore.disableTxnStatus();
this.dataDir = dataDir;
}

Expand Down
Loading

0 comments on commit 1932cff

Please sign in to comment.