Skip to content

Commit

Permalink
Multithreaded code
Browse files Browse the repository at this point in the history
  • Loading branch information
pbloem committed Aug 9, 2016
1 parent 00b4203 commit ce88c57
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 133 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<dependency>
<groupId>com.github.Data2Semantics</groupId>
<artifactId>nodes</artifactId>
<version>v0.1.2</version>
<version>v0.1.4</version>
</dependency>
</dependencies>

Expand Down
26 changes: 21 additions & 5 deletions src/main/java/nl/peterbloem/motive/MotifModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;

import org.nodes.DGraph;
import org.nodes.DLink;
Expand Down Expand Up @@ -46,6 +47,7 @@

import nl.peterbloem.kit.FrequencyModel;
import nl.peterbloem.kit.Functions;
import nl.peterbloem.kit.Global;
import nl.peterbloem.kit.OnlineModel;
import nl.peterbloem.kit.Pair;
import nl.peterbloem.kit.Series;
Expand All @@ -63,6 +65,20 @@
*/
public class MotifModel
{
private static ExecutorService executor = null;

/**
* Sets the threadpool to use (for the beta model). If not set, the beta
* model will manage its own threads.
*
* @param executor
*/
public static void setExecutor(ExecutorService executor)
{
MotifModel.executor = executor;
}


public static <L> double size(Graph<L> graph, Graph<L> sub,
List<List<Integer>> occurrences, StructureModel<Graph<?>> nullModel, boolean resetWiring)
{
Expand Down Expand Up @@ -135,7 +151,7 @@ public static <L> double sizeBeta(Graph<L> graph, Graph<L> sub,
public static <L> double sizeBeta(DGraph<L> graph, DGraph<L> sub,
List<List<Integer>> occurrences, boolean resetWiring, int iterations, double alpha)
{
int numThreads = Runtime.getRuntime().availableProcessors();
int numThreads = Global.numThreads();

List<List<Integer>> wiring = new ArrayList<List<Integer>>();
Set<Integer> motifNodes = new HashSet<Integer>();
Expand All @@ -153,8 +169,8 @@ public static <L> double sizeBeta(DGraph<L> graph, DGraph<L> sub,
List<Double> samples = new ArrayList<Double>(iterations);
DSequenceEstimator<String> motifModel = new DSequenceEstimator<String>(sub);
DSequenceEstimator<String> subbedModel = new DSequenceEstimator<String>(degrees);
motifModel.nonuniform(iterations, numThreads);
subbedModel.nonuniform(iterations, numThreads);
motifModel.nonuniform(iterations, numThreads, executor);
subbedModel.nonuniform(iterations, numThreads, executor);

for(int i : series(iterations))
samples.add(motifModel.logSamples().get(i) + subbedModel.logSamples().get(i));
Expand Down Expand Up @@ -294,8 +310,8 @@ public static <L> double sizeBeta(UGraph<L> graph, UGraph<L> sub,
List<Double> samples = new ArrayList<Double>(iterations);
USequenceEstimator<String> motifModel = new USequenceEstimator<String>(sub);
USequenceEstimator<String> subbedModel = new USequenceEstimator<String>(degrees);
motifModel.nonuniform(iterations, numThreads);
subbedModel.nonuniform(iterations, numThreads);
motifModel.nonuniform(iterations, numThreads, executor);
subbedModel.nonuniform(iterations, numThreads, executor);

for(int i : series(iterations))
samples.add(motifModel.logSamples().get(i) + subbedModel.logSamples().get(i));
Expand Down
210 changes: 133 additions & 77 deletions src/main/java/nl/peterbloem/motive/exec/Compare.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.json.JSONObject;
import org.nodes.DGraph;
Expand Down Expand Up @@ -156,6 +160,21 @@ public static enum NullModel{ER, EDGELIST, BETA}

public void main() throws IOException
{

// * set up thread pools
double mix = 0.666; // relative number of threads devoted to sampling
// - concurrent threads for sampling
int sThreads = Math.max(1, (int)(Global.numThreads() * mix));
// - concurrent threads for computing scores
int mThreads = Math.max(1, Global.numThreads() - sThreads);

Global.log().info(sThreads + " for sampling, " + mThreads + " for computing motif scores.");

ExecutorService samplesExecutor = Executors.newFixedThreadPool(sThreads);
ExecutorService motifsExecutor = Executors.newFixedThreadPool(sThreads);

MotifModel.setExecutor(samplesExecutor);

Global.secureRandom(42);
Global.log().info("Threads available: " + NUM_THREADS);

Expand All @@ -177,128 +196,165 @@ public void main() throws IOException

Global.log().info("Computing motif code lengths");

List<? extends Graph<String>> subs;
List<Double> frequencies;
List<List<List<Integer>>> occurrences;
List<? extends Graph<String>> subsAll;
List<Double> frequenciesAll;

final List<List<List<Integer>>> occurrences;

if(directed)
{
DPlainMotifExtractor<String> ex
= new DPlainMotifExtractor<String>(
(DGraph<String>)data, motifSamples, motifMinSize, motifMaxSize, minFreq);

subs = new ArrayList<Graph<String>>(ex.subgraphs());
subsAll = new ArrayList<Graph<String>>(ex.subgraphs());

frequencies = new ArrayList<Double>(subs.size());
for(Graph<String> sub : subs)
frequencies.add((double)ex.occurrences((DGraph<String>)sub).size());
frequenciesAll = new ArrayList<Double>(subsAll.size());
for(Graph<String> sub : subsAll)
frequenciesAll.add((double)ex.occurrences((DGraph<String>)sub).size());

occurrences = new ArrayList<List<List<Integer>>>(subs.size());
for(Graph<String> sub : subs)
occurrences = new ArrayList<List<List<Integer>>>(subsAll.size());
for(Graph<String> sub : subsAll)
occurrences.add(ex.occurrences((DGraph<String>)sub));
} else
{
UPlainMotifExtractor<String> ex
= new UPlainMotifExtractor<String>(
(UGraph<String>)data, motifSamples, motifMinSize, motifMaxSize, minFreq);

subs = new ArrayList<Graph<String>>(ex.subgraphs());
frequencies = new ArrayList<Double>(subs.size());
for(Graph<String> sub : subs)
frequencies.add((double)ex.occurrences((UGraph<String>)sub).size());
subsAll = new ArrayList<Graph<String>>(ex.subgraphs());
frequenciesAll = new ArrayList<Double>(subsAll.size());
for(Graph<String> sub : subsAll)
frequenciesAll.add((double)ex.occurrences((UGraph<String>)sub).size());

occurrences = new ArrayList<List<List<Integer>>>(subs.size());
for(Graph<String> sub : subs)
occurrences = new ArrayList<List<List<Integer>>>(subsAll.size());
for(Graph<String> sub : subsAll)
occurrences.add(ex.occurrences((UGraph<String>)sub));
}

if(subs.size() > maxMotifs)

final List<? extends Graph<String>> subs;
final List<Double> frequencies;
if(subsAll.size() > maxMotifs)
{
subs = new ArrayList<Graph<String>>(subs.subList(0, maxMotifs));
frequencies = new ArrayList<Double>(frequencies.subList(0, maxMotifs));
subs = new ArrayList<Graph<String>>(subsAll.subList(0, maxMotifs));
frequencies = new ArrayList<Double>(frequenciesAll.subList(0, maxMotifs));
} else
{
subs = subsAll;
frequencies = frequenciesAll;
}
List<Double> factorsER = new ArrayList<Double>(subs.size());
List<Double> factorsEL = new ArrayList<Double>(subs.size());
List<Double> factorsBeta = new ArrayList<Double>(subs.size());
List<Double> maxFactors = new ArrayList<Double>(subs.size());

final Map<Graph<String>, Double> factorsERMap = new ConcurrentHashMap<Graph<String>, Double>();
final Map<Graph<String>, Double> factorsELMap = new ConcurrentHashMap<Graph<String>, Double>();
final Map<Graph<String>, Double> factorsBetaMap = new ConcurrentHashMap<Graph<String>, Double>();
final Map<Graph<String>, Double> maxFactorsMap = new ConcurrentHashMap<Graph<String>, Double>();

double baselineER = new ERSimpleModel(true).codelength(data);
double baselineEL = new EdgeListModel(Prior.ML).codelength(data);
double baselineBeta = new DegreeSequenceModel(betaIterations, betaAlpha, Prior.ML, Margin.LOWERBOUND).codelength(data);
final double baselineER = new ERSimpleModel(true).codelength(data);
final double baselineEL = new EdgeListModel(Prior.ML).codelength(data);
final double baselineBeta = new DegreeSequenceModel(betaIterations, betaAlpha, Prior.ML, Margin.LOWERBOUND).codelength(data);

for(int i : series(subs.size()))
for(final int i : series(subs.size()))
{
Graph<String> sub = subs.get(i);
List<List<Integer>> occs = occurrences.get(i);

Global.log().info("Analysing sub ("+ (i+1) +" of " + subs.size() + "): " + sub);
Global.log().info("freq: " + frequencies.get(i));

double max = Double.NEGATIVE_INFINITY;
Thread thread = new Thread(){
public void run(){
Graph<String> sub = subs.get(i);
List<List<Integer>> occs = occurrences.get(i);

Global.log().info("Analysing sub ("+ (i+1) +" of " + subs.size() + "): " + sub);
Global.log().info("freq: " + frequencies.get(i));

double max = Double.NEGATIVE_INFINITY;

Global.log().info("null model: ER");
{
double sizeER = MotifSearchModel.sizeER(data, sub, occs, resets);
double factorER = baselineER - sizeER;
factorsER.add(factorER);

max = Math.max(max, factorER);

Global.log().info("ER baseline: " + baselineER);
Global.log().info("ER motif code: " + sizeER);
Global.log().info("ER factor: " + factorER);
}
Global.log().info("null model: ER");
{
double sizeER = MotifSearchModel.sizeER(data, sub, occs, resets);
double factorER = baselineER - sizeER;
factorsERMap.put(sub, factorER);
max = Math.max(max, factorER);
Global.log().info("ER baseline: " + baselineER);
Global.log().info("ER motif code: " + sizeER);
Global.log().info("ER factor: " + factorER);
}

Global.log().info("null model: EL");
{
double sizeEL = MotifSearchModel.sizeEL(data, sub, occs, resets);

double factorEL = baselineEL - sizeEL;
factorsEL.add(factorEL);

max = Math.max(max, factorEL);

Global.log().info("EL baseline: " + baselineEL);
Global.log().info("EL motif code: " + sizeEL);
Global.log().info("EL factor: " + factorEL);
}
Global.log().info("null model: EL");
{
double sizeEL = MotifSearchModel.sizeEL(data, sub, occs, resets);
double factorEL = baselineEL - sizeEL;
factorsELMap.put(sub, factorEL);
max = Math.max(max, factorEL);
Global.log().info("EL baseline: " + baselineEL);
Global.log().info("EL motif code: " + sizeEL);
Global.log().info("EL factor: " + factorEL);
}

Global.log().info("null model: Beta");
{
Global.log().info("null model: Beta");
{

double sizeBeta = MotifSearchModel.sizeBeta(data, sub, occs, resets, betaIterations, betaAlpha, betaSearchDepth);
double factorBeta = baselineBeta - sizeBeta;
factorsBeta.add(factorBeta);

max = Math.max(max, factorBeta);

Global.log().info("Beta baseline: " + baselineBeta);
Global.log().info("Beta motif code: " + sizeBeta);
Global.log().info("Beta factor: " + factorBeta);
}
double sizeBeta = MotifSearchModel.sizeBeta(data, sub, occs, resets, betaIterations, betaAlpha, betaSearchDepth);
double factorBeta = baselineBeta - sizeBeta;
factorsBetaMap.put(sub, factorBeta);

max = Math.max(max, factorBeta);

Global.log().info("Beta baseline: " + baselineBeta);
Global.log().info("Beta motif code: " + sizeBeta);
Global.log().info("Beta factor: " + factorBeta);
}

maxFactorsMap.put(sub, max);
}
};

maxFactors.add(max);
motifsExecutor.execute(thread);
}

// * Execute all threads and wait until finished
motifsExecutor.shutdown();
try
{
motifsExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

List<Double> factorsER = new ArrayList<Double>(subsAll.size());
List<Double> factorsEL = new ArrayList<Double>(subsAll.size());
List<Double> factorsBeta = new ArrayList<Double>(subsAll.size());
List<Double> maxFactors = new ArrayList<Double>(subsAll.size());

for(int i : series(subsAll.size()))
{
Graph<String> sub = subsAll.get(i);
factorsER.add(factorsERMap.get(sub));
factorsEL.add(factorsELMap.get(sub));
factorsBeta.add(factorsBetaMap.get(sub));
maxFactors.add(maxFactorsMap.get(sub));
}

Comparator<Double> comp = Functions.natural();
Functions.sort(
factorsBeta, Collections.reverseOrder(comp),
(List) frequencies,
(List) frequenciesAll,
(List) factorsER,
(List) factorsEL,
(List) factorsBeta,
(List) subs);
(List) subsAll);

File numbersFile = new File("numbers.csv");

BufferedWriter numbersWriter = new BufferedWriter(new FileWriter(numbersFile));
for(int i : series(subs.size()))
numbersWriter.write(frequencies.get(i) + ", " + factorsER.get(i) + ", " + factorsEL.get(i) + ", " + factorsBeta.get(i) + "\n");
for(int i : series(subsAll.size()))
numbersWriter.write(frequenciesAll.get(i) + ", " + factorsER.get(i) + ", " + factorsEL.get(i) + ", " + factorsBeta.get(i) + "\n");
numbersWriter.close();

int i = 0;
for(Graph<String> sub : subs)
for(Graph<String> sub : subsAll)
{
File graphFile = new File(String.format("motif.%03d.edgelist", i));
Data.writeEdgeList(sub, graphFile);
Expand Down
Loading

0 comments on commit ce88c57

Please sign in to comment.