Skip to content

Commit

Permalink
Merge pull request #2872 from eclipse/GH-2799-memoryoverflowmodel-oom
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad authored Apr 24, 2021
2 parents 0d33b7b + 508e9ee commit 62dacb6
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ abstract class MemoryOverflowModel extends AbstractModel {

private static final int LARGE_BLOCK = 10000;

// To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think
// we have space for one more block. The limit is currently set at 32 MB
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024;

final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class);

private LinkedHashModel memory;
Expand All @@ -62,7 +66,7 @@ abstract class MemoryOverflowModel extends AbstractModel {
private long maxBlockSize = 0;

public MemoryOverflowModel() {
memory = new LinkedHashModel();
memory = new LinkedHashModel(LARGE_BLOCK);
}

public MemoryOverflowModel(Model model) {
Expand All @@ -76,7 +80,7 @@ public MemoryOverflowModel(Set<Namespace> namespaces, Collection<? extends State
}

public MemoryOverflowModel(Set<Namespace> namespaces) {
memory = new LinkedHashModel(namespaces);
memory = new LinkedHashModel(namespaces, LARGE_BLOCK);
}

@Override
Expand Down Expand Up @@ -243,9 +247,11 @@ private synchronized void checkMemoryOverflow() {
if (blockSize > maxBlockSize) {
maxBlockSize = blockSize;
}

// Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 10% of the heap is still free (this last condition to avoid GC overhead limit)
if (freeToAllocateMemory < Math.min(0.1 * maxMemory, maxBlockSize)) {
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize);
overflowToDisk();
}
Expand Down Expand Up @@ -282,7 +288,7 @@ protected void finalize() throws Throwable {
}
};
disk.addAll(memory);
memory = new LinkedHashModel(memory.getNamespaces());
memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK);
logger.debug("overflow synced to disk");
} catch (IOException | SailException e) {
String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*******************************************************************************
* Copyright (c) 2021 Eclipse RDF4J contributors.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*******************************************************************************/

package org.eclipse.rdf4j.sail.nativerdf.benchmark;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.FileUtils;
import org.assertj.core.util.Files;
import org.eclipse.rdf4j.IsolationLevels;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Logger;

/**
* @author Håvard Ottestad
*/
@State(Scope.Benchmark)
@Warmup(iterations = 0)
@BenchmarkMode({ Mode.AverageTime })
@Fork(value = 1, jvmArgs = { "-Xms500M", "-Xmx500M", "-XX:+UseParallelGC" })
@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class OverflowBenchmarkReal {

@Setup(Level.Trial)
public void setup() {
((Logger) (LoggerFactory
.getLogger("org.eclipse.rdf4j.sail.nativerdf.MemoryOverflowModel")))
.setLevel(ch.qos.logback.classic.Level.DEBUG);
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include("OverflowBenchmarkReal") // adapt to run other benchmark tests
.build();

new Runner(opt).run();
}

@Benchmark
public long loadLotsOfData() throws IOException {
File temporaryFolder = Files.newTemporaryFolder();
SailRepository sailRepository = null;
try {
sailRepository = new SailRepository(new NativeStore(temporaryFolder));

try (SailRepositoryConnection connection = sailRepository.getConnection()) {

connection.begin(IsolationLevels.READ_COMMITTED);
connection.add(
OverflowBenchmarkReal.class.getClassLoader().getResource("benchmarkFiles/datagovbe-valid.ttl"));
connection.commit();

return connection.size();
}

} finally {
try {
if (sailRepository != null) {
sailRepository.shutDown();
}
} finally {
FileUtils.deleteDirectory(temporaryFolder);
}
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*******************************************************************************
* Copyright (c) 2021 Eclipse RDF4J contributors.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*******************************************************************************/

package org.eclipse.rdf4j.sail.nativerdf.benchmark;

import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.commons.io.FileUtils;
import org.assertj.core.util.Files;
import org.eclipse.rdf4j.IsolationLevels;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.vocabulary.FOAF;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Logger;

/**
* @author Håvard Ottestad
*/
@State(Scope.Benchmark)
@Warmup(iterations = 0)
@BenchmarkMode({ Mode.AverageTime })
@Fork(value = 1, jvmArgs = { "-Xms64M", "-Xmx64M", "-XX:+UseG1GC" })
@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class OverflowBenchmarkSynthetic {

private final Random random = new Random(389012849);
private final String ns = "http://example.org/";

@Setup(Level.Trial)
public void setup() {
((Logger) (LoggerFactory
.getLogger("org.eclipse.rdf4j.sail.nativerdf.MemoryOverflowModel")))
.setLevel(ch.qos.logback.classic.Level.DEBUG);
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include("OverflowBenchmarkSynthetic") // adapt to run other benchmark tests
.build();

new Runner(opt).run();
}

@Benchmark
public long loadLotsOfDataEmptyStore() throws IOException {
File temporaryFolder = Files.newTemporaryFolder();
SailRepository sailRepository = null;
try {
sailRepository = new SailRepository(new NativeStore(temporaryFolder));

try (SailRepositoryConnection connection = sailRepository.getConnection()) {

connection.begin();
addData(connection, 4000);
connection.commit();

return connection.size();
}

} finally {
try {
if (sailRepository != null) {
sailRepository.shutDown();
}
} finally {
FileUtils.deleteDirectory(temporaryFolder);
}
}

}

@Benchmark
public long loadLotsOfDataNonEmptyStore() throws IOException {
File temporaryFolder = Files.newTemporaryFolder();
SailRepository sailRepository = null;
try {
sailRepository = new SailRepository(new NativeStore(temporaryFolder));

try (SailRepositoryConnection connection = sailRepository.getConnection()) {

connection.begin();
addData(connection, 1000);
connection.commit();

connection.begin(IsolationLevels.READ_COMMITTED);
addData(connection, 4000);
connection.commit();

return connection.size();
}

} finally {
try {
if (sailRepository != null) {
sailRepository.shutDown();
}
} finally {
FileUtils.deleteDirectory(temporaryFolder);
}
}

}

private void addData(SailRepositoryConnection connection, int upperLimit) {
ValueFactory vf = connection.getValueFactory();

IntStream
.range(0, upperLimit)
.mapToObj(String::valueOf)
.flatMap(i -> Stream.of(
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral(i)),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + "")),
vf.createStatement(vf.createIRI(ns, random.nextInt(upperLimit) + ""), FOAF.KNOWS,
vf.createIRI(ns, random.nextInt(upperLimit) + ""))
)
)
.forEach(connection::add);
}

}

0 comments on commit 62dacb6

Please sign in to comment.