Skip to content

Commit

Permalink
Merge pull request #177 from twitter/cleanup_executorservice
Browse files Browse the repository at this point in the history
Remove obsolete threadpool and executor service.
  • Loading branch information
pankajgupta committed Mar 18, 2015

Verified

This commit was signed with the committer’s verified signature.
Meulengracht Philip Meulengracht
2 parents 70519e1 + 418ec13 commit b2df01f
Showing 9 changed files with 24 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ import com.twitter.cassovary.graph._
import com.twitter.cassovary.util.io.{AdjacencyListGraphReader, ListOfEdgesGraphReader}
import com.twitter.app.Flags
import com.twitter.util.Stopwatch
import java.util.concurrent.Executors
import java.io.File
import scala.collection.mutable.ListBuffer

@@ -113,16 +112,11 @@ object PerformanceBenchmark extends App with GzipGraphDownloader {
if (helpFlag()) {
println(flags.usage)
} else {
/**
* Thread pool used for reading graphs. Only useful if multiple files with the same prefix name are present.
*/
val graphReadingThreadPool = Executors.newFixedThreadPool(4)

def readGraph(path : String, filename : String, adjacencyList: Boolean) : DirectedGraph[Node] = {
if (adjacencyList) {
AdjacencyListGraphReader.forIntIds(path, filename, graphReadingThreadPool).toArrayBasedDirectedGraph()
AdjacencyListGraphReader.forIntIds(path, filename).toArrayBasedDirectedGraph()
} else
ListOfEdgesGraphReader.forIntIds(path, filename, graphReadingThreadPool).toArrayBasedDirectedGraph()
ListOfEdgesGraphReader.forIntIds(path, filename).toArrayBasedDirectedGraph()
}

if (benchmarks.isEmpty) {
@@ -143,7 +137,6 @@ object PerformanceBenchmark extends App with GzipGraphDownloader {
printf("\tAvg time over %d repetitions: %s.\n", reps(), duration)
}
}
graphReadingThreadPool.shutdown()
}

def cacheRemoteFile(url : String) : (String, String) = {
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ import com.twitter.cassovary.graph.NodeIdEdgesMaxId
import com.twitter.cassovary.util.NodeNumberer
import com.twitter.util.NonFatal
import java.io.IOException
import java.util.concurrent.ExecutorService
import scala.io.Source

/**
@@ -130,9 +129,7 @@ class AdjacencyListGraphReader[T] (
}

object AdjacencyListGraphReader {
def forIntIds(directory: String, prefixFileNames: String = "", threadPool: ExecutorService,
def forIntIds(directory: String, prefixFileNames: String = "",
nodeNumberer: NodeNumberer[Int] = new NodeNumberer.IntIdentity()) =
new AdjacencyListGraphReader[Int](directory, prefixFileNames, nodeNumberer, _.toInt) {
override val executorService = threadPool
}
new AdjacencyListGraphReader[Int](directory, prefixFileNames, nodeNumberer, _.toInt)
}
Original file line number Diff line number Diff line change
@@ -13,11 +13,9 @@
*/
package com.twitter.cassovary.util.io

import com.google.common.util.concurrent.MoreExecutors
import com.twitter.cassovary.graph.StoredGraphDir.StoredGraphDir
import com.twitter.cassovary.graph._
import com.twitter.cassovary.util.NodeNumberer
import java.util.concurrent.ExecutorService

/**
* Trait that classes should implement to read in graphs that nodes have
@@ -51,11 +49,6 @@ trait GraphReader[T] {
*/
def storedGraphDir: StoredGraphDir = StoredGraphDir.OnlyOut

/**
* Override to use multiple threads
*/
def executorService: ExecutorService = MoreExecutors.sameThreadExecutor()

def parallelismLimit: Int = Runtime.getRuntime.availableProcessors

/**
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@ import com.twitter.logging.Logger
import it.unimi.dsi.fastutil.ints.{Int2IntArrayMap, Int2ObjectMap, Int2ObjectLinkedOpenHashMap}
import scala.io.Source
import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.ExecutorService
import com.twitter.util.NonFatal
import java.io.IOException

@@ -140,9 +139,8 @@ class ListOfEdgesGraphReader[T](
}

object ListOfEdgesGraphReader {
def forIntIds(directory: String, prefixFileNames: String = "", threadPool: ExecutorService,
def forIntIds(directory: String, prefixFileNames: String = "",
nodeNumberer: NodeNumberer[Int] = new NodeNumberer.IntIdentity()) =
new ListOfEdgesGraphReader[Int](directory, prefixFileNames, new NodeNumberer.IntIdentity(), _.toInt) {
override val executorService = threadPool
}
new ListOfEdgesGraphReader[Int](directory, prefixFileNames,
new NodeNumberer.IntIdentity(), _.toInt)
}
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ package com.twitter.cassovary.util.io

import com.twitter.cassovary.graph.{GraphBehaviours, Node}
import com.twitter.cassovary.util.SequentialNodeNumberer
import java.util.concurrent.Executors
import org.scalatest.{Matchers, WordSpec}

class AdjacencyListGraphReaderSpec extends WordSpec with Matchers with GraphBehaviours[Node] {
@@ -38,30 +37,26 @@ class AdjacencyListGraphReaderSpec extends WordSpec with Matchers with GraphBeha
)

trait GraphWithoutRenumberer {
val graph = AdjacencyListGraphReader.forIntIds(directory, "toy_6nodes_adj",
Executors.newFixedThreadPool(2)).toSharedArrayBasedDirectedGraph()
val graph = AdjacencyListGraphReader.forIntIds(directory,
"toy_6nodes_adj").toSharedArrayBasedDirectedGraph()
}

trait GraphWithRenumberer {
val seqRenumberer = new SequentialNodeNumberer[Int]()
val graph = AdjacencyListGraphReader.forIntIds(directory, "toy_6nodes_adj",
Executors.newFixedThreadPool(2), seqRenumberer).toSharedArrayBasedDirectedGraph()
seqRenumberer).toSharedArrayBasedDirectedGraph()
}

trait GraphWithStringIds {
val seqNumberer = new SequentialNodeNumberer[String]()
val graph = new AdjacencyListGraphReader[String](directory, "toy_7nodes_adj_StringIds", seqNumberer,
idReader = identity){
override val executorService = Executors.newFixedThreadPool(2)
}.toSharedArrayBasedDirectedGraph()
val graph = new AdjacencyListGraphReader[String](directory, "toy_7nodes_adj_StringIds",
seqNumberer, idReader = identity).toSharedArrayBasedDirectedGraph()
}

trait GraphWithLongIds {
val seqNumberer = new SequentialNodeNumberer[Long]()
val graph = new AdjacencyListGraphReader[Long](directory, "toy_7nodes_adj_LongIds", seqNumberer,
idReader = _.toLong){
override val executorService = Executors.newFixedThreadPool(2)
}.toSharedArrayBasedDirectedGraph()
idReader = _.toLong).toSharedArrayBasedDirectedGraph()
}

"AdjacencyListReader" should {
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ package com.twitter.cassovary.util.io

import com.twitter.cassovary.graph.{GraphBehaviours, Node}
import com.twitter.cassovary.util.SequentialNodeNumberer
import java.util.concurrent.Executors
import org.scalatest.{Matchers, WordSpec}

class ListOfEdgesGraphReaderSpec extends WordSpec with GraphBehaviours[Node] with Matchers {
@@ -37,24 +36,20 @@ class ListOfEdgesGraphReaderSpec extends WordSpec with GraphBehaviours[Node] wit
private val directory: String = "cassovary-core/src/test/resources/graphs/"

trait GraphWithIntIds {
val graph = ListOfEdgesGraphReader.forIntIds(directory, "toy_list5edges",
Executors.newFixedThreadPool(2)).toArrayBasedDirectedGraph()
val graph = ListOfEdgesGraphReader.forIntIds(directory,
"toy_list5edges").toArrayBasedDirectedGraph()
}

trait GraphWithStringIds {
val seqNumberer = new SequentialNodeNumberer[String]()
val graph = new ListOfEdgesGraphReader(directory, "toy_6nodes_list_StringIds", seqNumberer,
idReader = identity){
override val executorService = Executors.newFixedThreadPool(2)
}.toSharedArrayBasedDirectedGraph()
idReader = identity).toSharedArrayBasedDirectedGraph()
}

trait GraphWithLongIds {
val seqNumberer = new SequentialNodeNumberer[Long]()
val graph = new ListOfEdgesGraphReader(directory, "toy_6nodes_list_LongIds", seqNumberer,
idReader = _.toLong){
override val executorService = Executors.newFixedThreadPool(2)
}.toSharedArrayBasedDirectedGraph()
idReader = _.toLong).toSharedArrayBasedDirectedGraph()
}

"ListOfEdgesReader" when {
8 changes: 2 additions & 6 deletions cassovary-examples/src/main/scala/HelloLoadGraph.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 Twitter, Inc.
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
@@ -22,20 +22,16 @@
*/

import com.twitter.cassovary.util.io.{AdjacencyListGraphReader, LabelsReader}
import java.util.concurrent.Executors

object HelloLoadGraph {
def main(args: Array[String]) {
val threadPool = Executors.newFixedThreadPool(2)
val dir = "../cassovary-core/src/test/resources/graphs"
val graph = AdjacencyListGraphReader.forIntIds(dir, "toy_6nodes_adj",
threadPool).toArrayBasedDirectedGraph()
val graph = AdjacencyListGraphReader.forIntIds(dir, "toy_6nodes_adj").toArrayBasedDirectedGraph()
graph.nodeLabels = new LabelsReader(dir, "toy_6nodelabels").read(graph.maxNodeId)

printf("\nHello Graph!\n\tA graph loaded from two adjacency list files " +
"with %s nodes has %s directed edges.\n", graph.nodeCount, graph.edgeCount)
printf("\tLabels of node 10 are label1(%d) and label2(%d)\n",
graph.labelOfNode[Int](10, "label1").get, graph.labelOfNode[Int](10, "label2").get)
threadPool.shutdown()
}
}
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ class ListOfEdgesToAdjacencyListConverter(
) {
def apply() {
def outputFileName(chunkNumber: Int) = outputFileNamesPrefix + "_" + chunkNumber + "." + outputFileNamesExtension
val graph = ListOfEdgesGraphReader.forIntIds(inputDirectory, inputFileNamesPrefix, threadPool)
val graph = ListOfEdgesGraphReader.forIntIds(inputDirectory, inputFileNamesPrefix)
.toArrayBasedDirectedGraph()
val outputWriters = Seq.tabulate(numberOfOutputChunks)(n => new File(outputDirectory, outputFileName(n)))
.map(file => new FileWriter(file))
14 changes: 4 additions & 10 deletions cassovary-examples/src/main/scala/RenumberedGraph.scala
Original file line number Diff line number Diff line change
@@ -13,18 +13,16 @@
*/

/**
* Generates a directed Erdos-Renyi random graph file with n log(n) edges
* and node ids distributed uniformly throughout the space of 0..MaxNodeId.
* Generates a directed Erdos-Renyi random graph file with n log(n) edges
* and node ids distributed uniformly throughout the space of 0..MaxNodeId.
* Loads the graph file both with a SequentialNodeNumberer and without,
* and compares approximate representation sizes of the two.
*/

import com.google.common.util.concurrent.MoreExecutors
import com.twitter.cassovary.util.io.AdjacencyListGraphReader
import com.twitter.cassovary.util.{Sampling, SequentialNodeNumberer}
import com.twitter.cassovary.graph.TestGraphs
import java.io.{File,PrintWriter}
import scala.math
import scala.util.Random

object RenumberedGraph {
@@ -59,9 +57,7 @@ object RenumberedGraph {

// Read graph file into memory with renumbering.
val readGraph = new AdjacencyListGraphReader[Int](renumGraphDirName, renumGraphFileName,
new SequentialNodeNumberer[Int](), _.toInt) {
override val executorService = MoreExecutors.sameThreadExecutor()
}.toArrayBasedDirectedGraph()
new SequentialNodeNumberer[Int](), _.toInt).toArrayBasedDirectedGraph()

val rgComplexity = readGraph.approxStorageComplexity
printf("A renumbered graph with %d nodes (min id: %d, max id: %d) and %d directed edges has an approx. storage complexity of %d bytes.\n",
@@ -70,9 +66,7 @@ object RenumberedGraph {

// Read graph file into memory without renumbering.
val readGraph2 = new AdjacencyListGraphReader[Int](renumGraphDirName, renumGraphFileName,
new SequentialNodeNumberer[Int](), _.toInt) {
override val executorService = MoreExecutors.sameThreadExecutor()
}.toArrayBasedDirectedGraph()
new SequentialNodeNumberer[Int](), _.toInt).toArrayBasedDirectedGraph()
val rg2Complexity = readGraph2.approxStorageComplexity
printf("An unrenumbered graph with %d nodes (min id: %d, max id: %d) and %d directed edges has an approx. storage complexity of %d bytes.\n",
readGraph2.nodeCount, readGraph2.map{_.id}.min, readGraph2.map{_.id}.max, readGraph2.edgeCount, rg2Complexity)

0 comments on commit b2df01f

Please sign in to comment.