Skip to content

Commit

Permalink
GH-2998 add all when overflowing instead of one statement at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed May 17, 2024
1 parent e7be2e8 commit ebb4249
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -50,9 +51,12 @@ abstract class MemoryOverflowModel extends AbstractModel {

private static final int LARGE_BLOCK = 10000;

private static volatile boolean overflow;

// To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think
// we have space for one more block. The limit is currently set at 32 MB
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024;
// we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps.
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 ? 128 * 1024 * 1024
: 32 * 1024 * 1024;

final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class);

Expand Down Expand Up @@ -138,6 +142,33 @@ public boolean add(Statement st) {
return getDelegate().add(st);
}

@Override
public boolean addAll(Collection<? extends Statement> c) {
checkMemoryOverflow();
if (disk != null || c.size() <= 1024) {
return getDelegate().addAll(c);
} else {
boolean ret = false;
HashSet<Statement> buffer = new HashSet<>();
for (Statement st : c) {
buffer.add(st);
if (buffer.size() >= 1024) {
ret |= getDelegate().addAll(buffer);
buffer.clear();
innerCheckMemoryOverflow();
}
}
if (!buffer.isEmpty()) {
ret |= getDelegate().addAll(buffer);
buffer.clear();
}

return ret;

}

}

@Override
public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) {
return getDelegate().remove(subj, pred, obj, contexts);
Expand Down Expand Up @@ -234,43 +265,69 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx

private synchronized void checkMemoryOverflow() {
if (disk == null) {
if (overflow) {
System.out.println("HERE: " + overflow);
innerCheckMemoryOverflow();
}
int size = size();
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();

// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();

// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();

// estimated memory used
long used = totalMemory - freeMemory;

// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;

if (baseline > 0) {
long blockSize = used - baseline;
if (blockSize > maxBlockSize) {
maxBlockSize = blockSize;
}

// Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize);
overflowToDisk();
}
innerCheckMemoryOverflow();
}
}
}

private void innerCheckMemoryOverflow() {
if (disk != null) {
return;
}

// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();

// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();

// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();

// estimated memory used
long used = totalMemory - freeMemory;

// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;

if (baseline > 0) {
long blockSize = used - baseline;
if (blockSize > maxBlockSize) {
maxBlockSize = blockSize;
}
if (overflow && freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING * 2) {
// stricter memory requirements to not overflow if other models are overflowing
logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
overflowToDisk();
System.gc();
} else if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
// Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)

logger.debug("syncing at {} triples. max block size: {}", size(), maxBlockSize);
overflowToDisk();
System.gc();
} else {
if (overflow) {
System.out.println("DO NOT OVERFLOW ANYMORE");
overflow = false;
}
baseline = used;
}
}
baseline = used;
}

private synchronized void overflowToDisk() {
System.out.println("OVERFLOWING");
overflow = true;

try {
LinkedHashModel memory = this.memory;
this.memory = null;
Expand All @@ -279,8 +336,8 @@ private synchronized void overflowToDisk() {
dataDir = Files.createTempDirectory("model").toFile();
logger.debug("memory overflow using temp directory {}", dataDir);
store = createSailStore(dataDir);
disk = new SailSourceModel(store);
disk.addAll(memory);
disk = new SailSourceModel(store, memory);
// disk.addAll(memory);
logger.debug("overflow synced to disk");
} catch (IOException | SailException e) {
String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,44 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai
addStatement(subj, pred, obj, explicit, ctx);
}

@Override
public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts) {
sinkStoreAccessLock.lock();
startTriplestoreTransaction();

try {
for (Statement statement : approved) {
Resource subj = statement.getSubject();
IRI pred = statement.getPredicate();
Value obj = statement.getObject();
Resource context = statement.getContext();

int subjID = valueStore.storeValue(subj);
int predID = valueStore.storeValue(pred);
int objID = valueStore.storeValue(obj);

int contextID = 0;
if (context != null) {
contextID = valueStore.storeValue(context);
}

boolean wasNew = tripleStore.storeTriple(subjID, predID, objID, contextID, explicit);
if (wasNew && context != null) {
contextStore.increment(context);
}

}
} catch (IOException e) {
throw new SailException(e);
} catch (RuntimeException e) {
logger.error("Encountered an unexpected problem while trying to add a statement", e);
throw e;
} finally {
sinkStoreAccessLock.unlock();
}

}

@Override
public void deprecate(Statement statement) throws SailException {
removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit,
Expand Down
Loading

0 comments on commit ebb4249

Please sign in to comment.