Skip to content

Commit

Permalink
Optimization of concurrent fact indexer (#27)
Browse files Browse the repository at this point in the history
* Add fact indexer tests

* Possibly an optimization of the concurrent fact indexer

The concurrent fact indexer maintains a coarse and fine index of
facts, where the coarse index is on the predicate symbol and the fine
index is on the predicate symbol, argument index and constant at that
argument. The fact indexer returns an overapproximation of the set of
facts that might match the input query but tries to minimize the
returned fact set using a heuristic which picks from the fine index
the fact set maintained at the argument position for which the most
distinct constants have been seen so far. Under the assumption that
the fact sets are roughly of equal size for each constant this is a
sound strategy, but it is easy to imagine worst cases where fact sets
that are much too large are returned. E.g. consider

f(a, x, b)
f(b, x, b)
f(c, x1, b)
f(c, x2, b)
...
f(c, x10000, b)
f(c, x, d)

and the query f(c, _, d)?

Two fine indexes will be considered as candidates; the first and the
third. The first index has three distinct constants and the third has
only two. The candidate fact set returned is thus the 10,001 element
set

{ f(c, x1, b), ..., f(c, x10000, b), f(c, x, d) }

rather than the singleton set

{ f(c, x, d) }

The fact indexer cannot evaluate the size of the underlying container
since it has to work for all Iterable containers, including
ConcurrentLinkedBag which cannot implement the Container interface for
efficiency reasons. It is however quite easy to add a size method to
this collection type and pass it as a lambda to the fact indexer. The
fact indexer can then just pick the smallest set from the fine index
instead of relying on a heuristic.

* Use :: syntax for methods.

---------

Co-authored-by: Ulrik Rasmussen <[email protected]>
Co-authored-by: Aaron Bembenek <[email protected]>
  • Loading branch information
3 people authored Nov 7, 2024
1 parent 4e25a86 commit 10229d9
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ public class StratifiedNegationEvalManager implements EvalManager {

private final ConcurrentFactIndexer<ConcurrentLinkedBag<PositiveAtom>> facts =
new ConcurrentFactIndexer<>(
() -> new ConcurrentLinkedBag<>(),
(bag, atom) -> bag.add(atom),
() -> ConcurrentLinkedBag.emptyBag());
ConcurrentLinkedBag::new,
ConcurrentLinkedBag::add,
ConcurrentLinkedBag::emptyBag,
ConcurrentLinkedBag::size);
private final ConcurrentFactTrie trie = new ConcurrentFactTrie();

private final Map<PredicateSym, Set<Integer>> relevantStrataByPred = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand All @@ -63,6 +64,7 @@ public class ConcurrentFactIndexer<T extends Iterable<PositiveAtom>> implements
private final Supplier<T> generator;
private final BiConsumer<T, PositiveAtom> addFunc;
private final Supplier<T> empty;
private final Function<T, Integer> size;

private final ConcurrentMap<PredicateSym, AtomicReferenceArray<ConcurrentMap<Constant, T>>>
fineIdx = Utilities.createConcurrentMap();
Expand All @@ -73,9 +75,10 @@ public class ConcurrentFactIndexer<T extends Iterable<PositiveAtom>> implements
*
* @param generator an anonymous function that returns a container
* @param addFunc an anonymous function that adds a fact to a container
* @param size an anonymous function that gets the number of items in the container
*/
public ConcurrentFactIndexer(Supplier<T> generator, BiConsumer<T, PositiveAtom> addFunc) {
this(generator, addFunc, generator);
public ConcurrentFactIndexer(Supplier<T> generator, BiConsumer<T, PositiveAtom> addFunc, Function<T, Integer> size) {
this(generator, addFunc, generator, size);
}

/**
Expand All @@ -84,12 +87,14 @@ public ConcurrentFactIndexer(Supplier<T> generator, BiConsumer<T, PositiveAtom>
* @param generator an anonymous function that returns a container
* @param addFunc an anonymous function that adds a fact to a container
* @param empty an anonymous function that returns an empty container (such as a static instance)
* @param size an anonymous function that gets the number of items in the container
*/
public ConcurrentFactIndexer(
Supplier<T> generator, BiConsumer<T, PositiveAtom> addFunc, Supplier<T> empty) {
Supplier<T> generator, BiConsumer<T, PositiveAtom> addFunc, Supplier<T> empty, Function<T, Integer> size) {
this.generator = generator;
this.addFunc = addFunc;
this.empty = empty;
this.size = size;
}

/**
Expand Down Expand Up @@ -179,21 +184,20 @@ public T indexInto(PositiveAtom a, ConstOnlySubstitution s) {

int bestIdx = -1;
Term bestConst = null;
int maxKeySetSize = -1;
int minFactSetSize = Integer.MAX_VALUE;
Term[] args = a.getArgs();
for (int i = 0; i < args.length; ++i) {
Term t = args[i];
// if (!(t instanceof DummyTerm) && (t instanceof Constant || (s != null && (t =
// s.get((Variable) t)) != null))) {
if ((t = t.accept(tv, s)) != null) {
ConcurrentMap<Constant, T> byConstant = byPos.get(i);
if (byConstant != null) {
if (!byConstant.containsKey(t)) {
T collection = byConstant.get(t);
if (collection == null) {
return this.empty.get();
}
int keySetSize = byConstant.size();
if (keySetSize > maxKeySetSize) {
maxKeySetSize = keySetSize;
int factSetSize = size.apply(collection);
if (factSetSize < minFactSetSize) {
minFactSetSize = factSetSize;
bestIdx = i;
bestConst = t;
}
Expand Down Expand Up @@ -234,7 +238,7 @@ public ConcurrentFactIndexer<T> getCopy() {
// hand, that might end up creating a new fact indexer with an
// inconsistent state.
ConcurrentFactIndexer<T> r =
new ConcurrentFactIndexer<>(this.generator, this.addFunc, this.empty);
new ConcurrentFactIndexer<>(this.generator, this.addFunc, this.empty, this.size);
for (PredicateSym pred : this.coarseIdx.keySet()) {
r.addAll(this.indexInto(pred));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -44,6 +45,7 @@
*/
public class ConcurrentLinkedBag<T> implements Iterable<T> {
private AtomicReference<Node> head = new AtomicReference<>();
private AtomicInteger size = new AtomicInteger();

/** A node in the linked list. */
public class Node {
Expand Down Expand Up @@ -85,8 +87,18 @@ public void add(T e) {
h = this.head.get();
n = new Node(e, h);
} while (!this.head.compareAndSet(h, n));
// This is not updated atomically with the addition of the element and can thus only be considered a lower bound
// for the set size. However, consumers cannot tell the difference since there is no way to transactionally read
// the set size and traverse the set anyway.
size.incrementAndGet();
}

/**
* Returns the number of elements that have been added to the set.
* @return the size of the set
*/
public Integer size() { return size.get(); }

/**
* Returns the head of the linked-list that backs this bag, or null if there is no head.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private FactIndexerFactory() {}
*/
public static ConcurrentFactIndexer<Set<PositiveAtom>> createConcurrentSetFactIndexer() {
return new ConcurrentFactIndexer<>(
() -> Utilities.createConcurrentSet(), (set, fact) -> set.add(fact));
Utilities::createConcurrentSet, Set::add, Set::size);
}

/**
Expand All @@ -61,6 +61,6 @@ public static ConcurrentFactIndexer<Set<PositiveAtom>> createConcurrentSetFactIn
*/
public static ConcurrentFactIndexer<Queue<PositiveAtom>> createConcurrentQueueFactIndexer() {
return new ConcurrentFactIndexer<>(
() -> new ConcurrentLinkedQueue<>(), (queue, fact) -> queue.add(fact));
ConcurrentLinkedQueue::new, Queue::add, Queue::size);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package edu.harvard.seas.pl.abcdatalog.util.datastructures;

import edu.harvard.seas.pl.abcdatalog.ast.*;
import edu.harvard.seas.pl.abcdatalog.engine.AbstractTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

import java.util.function.Supplier;

@RunWith(Suite.class)
@Suite.SuiteClasses({
FactIndexerTest.SetTests.class,
FactIndexerTest.ConcurrentLinkedBagTests.class
})
public class FactIndexerTest {
public static class SetTests extends AbstractFactIndexerTests {
public SetTests() { super(FactIndexerFactory::createConcurrentSetFactIndexer); }
}

public static class ConcurrentLinkedBagTests extends AbstractFactIndexerTests {
public ConcurrentLinkedBagTests() { super(FactIndexerFactory::createConcurrentQueueFactIndexer); }
}

public static abstract class AbstractFactIndexerTests extends AbstractTests {
private final Supplier<FactIndexer> factIndexerFactory;

public AbstractFactIndexerTests(Supplier<FactIndexer> factIndexerFactory) {
super(() -> { throw new Error("Tests do not use engine="); });
this.factIndexerFactory = factIndexerFactory;
}

@Test
public void testSmallestFactSetIsReturnedFromFineIndex() {
FactIndexer indexer = factIndexerFactory.get();
indexer.addAll(parseFacts("f(a,x,b). f(b,x,b). f(c,x1,b). f(c,x2,b). f(c,x,d)."));

Iterable<PositiveAtom> result = indexer.indexInto(parseQuery("f(c,_,d)?"));
int size = 0;
for (PositiveAtom ignored : result) {
++size;
}
Assert.assertEquals(1, size);
}
}
}

0 comments on commit 10229d9

Please sign in to comment.