diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/graph/distributed/GraphFilesSplitter.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/distributed/GraphFilesSplitter.scala new file mode 100644 index 00000000..d02c86ea --- /dev/null +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/graph/distributed/GraphFilesSplitter.scala @@ -0,0 +1,99 @@ +/* + * Copyright 2015 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.twitter.cassovary.graph.distributed + +import com.twitter.cassovary.graph.NodeIdEdgesMaxId +import com.twitter.cassovary.util.BoundedFuturePool +import com.twitter.cassovary.util.io.GraphReaderFromDirectory +import com.twitter.logging.Logger +import com.twitter.util.{Await, Future, FuturePool} +import java.io._ + +/** + * Splits a graph read by `graphReaderFromDirectory` to multiple subgraphs, each + * in a separate subdirectory, named "instance_i" for partition numbered i. + * Splitting is done as per `partitioner`. + */ +class GraphFilesSplitter[T](outputDir: String, partitioner: Partitioner, + graphReaderFromDirectory: GraphReaderFromDirectory[T]) { + + private val futurePool = new BoundedFuturePool(FuturePool.unboundedPool, + graphReaderFromDirectory.parallelismLimit) + private val log = Logger.get("graphFilesSplitter") + + def splitGraph(): Unit = { + // there are many parts of the original input graph + val inputParts = graphReaderFromDirectory.iterableSeq + + // instanceWriters is a 2-D array indexed by input part# and instance# + val instanceWriters = setupPerInstanceSubdirectories(partitioner.numInstances, + graphReaderFromDirectory.iterableSeq.length) + val futures = Future.collect(inputParts.indices map { i => + split(inputParts(i).iterator, instanceWriters(i)) + }) + Await.result(futures) + } + + private def mkDirHelper(dirName: String): Unit = { + val dir = new File(dirName) + if (dir.exists()) { + log.info("Directory %s already exists.", dir) + } else { + if (dir.mkdir()) { + log.debug("Made new directory %s", dir) + } else { + throw new FileNotFoundException("Unable to create new directory " + dir) + } + } + } + + private def getBufferedWriter(fileName: String): BufferedWriter = { + try { + val f = new File(fileName) + f.createNewFile() + new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f), "utf-8")) + } catch { + case ex : IOException => throw new IOException(ex.toString) + } + } + + // @return an array of arrays. The right index is of subgraph instance number and + // left index is of input seq number. + private def setupPerInstanceSubdirectories(numInstances: Int, + numInputParts: Int): Array[Array[BufferedWriter]] = { + mkDirHelper(outputDir) + val instanceWriters = Array.ofDim[BufferedWriter](numInputParts, numInstances) + (0 until numInstances) foreach { i => + val subDirName = outputDir + "/instance_" + i + mkDirHelper(subDirName) + (0 until numInputParts) foreach { j => + instanceWriters(j)(i) = getBufferedWriter(subDirName + "/" + j) + } + } + instanceWriters + } + + private def split(it: Iterator[NodeIdEdgesMaxId], + instanceWriters: Array[BufferedWriter]): Future[Unit] = futurePool { + it foreach { origNode => + partitioner.map(origNode) foreach { case (instance, node) => + instanceWriters(instance).write(graphReaderFromDirectory.reverseParseNode(node)) + } + } + instanceWriters foreach { writer => + writer.flush() + writer.close() + } + } +} diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReader.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReader.scala index 38652f5f..cb85a3ac 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReader.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReader.scala @@ -126,6 +126,13 @@ class AdjacencyListGraphReader[T] ( def oneShardReader(filename : String) : Iterable[NodeIdEdgesMaxId] = { new OneShardReader(filename, nodeNumberer) } + + // note that we are assuming that n.id.toString does the right thing, which is + // true for int and long ids but might not be for a general T. + def reverseParseNode(n: NodeIdEdgesMaxId): String = { + n.id + separator + n.edges.length + "\n" + n.edges.mkString("\n") + "\n" + } + } object AdjacencyListGraphReader { diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/GraphReader.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/GraphReader.scala index 64ba72b4..aedddaf4 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/GraphReader.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/GraphReader.scala @@ -51,6 +51,12 @@ trait GraphReader[T] { def parallelismLimit: Int = Runtime.getRuntime.availableProcessors + /** + * The reader knows the format as it knows how to read the file. This reverse parses + * the input `n` to a string in that same format. + */ + def reverseParseNode(n: NodeIdEdgesMaxId): String + /** * Create an `ArrayBasedDirectedGraph` */ diff --git a/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/ListOfEdgesGraphReader.scala b/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/ListOfEdgesGraphReader.scala index a1550bad..04469957 100644 --- a/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/ListOfEdgesGraphReader.scala +++ b/cassovary-core/src/main/scala/com/twitter/cassovary/util/io/ListOfEdgesGraphReader.scala @@ -136,6 +136,13 @@ class ListOfEdgesGraphReader[T]( def oneShardReader(filename: String): Iterable[NodeIdEdgesMaxId] = { new OneShardReader(filename, nodeNumberer) } + + def reverseParseNode(n: NodeIdEdgesMaxId): String = { + n.edges.map { neighbor => + n.id + " " + neighbor + }.mkString("\n") + "\n" + } + } object ListOfEdgesGraphReader { diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/graph/distributed/GraphFilesSplitterSpec.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/distributed/GraphFilesSplitterSpec.scala new file mode 100644 index 00000000..9034b970 --- /dev/null +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/graph/distributed/GraphFilesSplitterSpec.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2015 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.twitter.cassovary.graph.distributed + +import java.io.File +import java.util.concurrent.Executors + +import com.twitter.cassovary.util.io.AdjacencyListGraphReader +import com.twitter.common.util.FileUtils +import org.scalatest.{Matchers, WordSpec} + +class GraphFilesSplitterSpec extends WordSpec with Matchers { + val inputGraphDir = "cassovary-core/src/test/resources/graphs" + val reader = AdjacencyListGraphReader.forIntIds(inputGraphDir, "toy_6nodes_adj") + val tmpDir = "/tmp/test_graph_splitter" + val numInstances = 2 + val partitioner = new HashSourceMapper(numInstances, i => i % numInstances) + val splitter = new GraphFilesSplitter[Int](tmpDir, partitioner, reader) + "splitter" should { + "make appropriate output files and directories" in { + splitter.splitGraph() + val tmpd = new File(tmpDir) + val subdirs = tmpd.list() + val expectedSubDirs = (0 until numInstances).map(i => "instance_" + i).toList.sorted + subdirs.toList.sorted shouldEqual expectedSubDirs + val expectedFiles = (0 until reader.iterableSeq.length).map(_.toString).toList.sorted + subdirs foreach { s => + val files = new File(tmpDir + "/" + s).list() + files.toList.sorted shouldEqual expectedFiles + } + FileUtils.forceDeletePath(tmpd) + } + } +} diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReaderSpec.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReaderSpec.scala index 8c118d02..4e584b03 100644 --- a/cassovary-core/src/test/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReaderSpec.scala +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/util/io/AdjacencyListGraphReaderSpec.scala @@ -13,7 +13,7 @@ */ package com.twitter.cassovary.util.io -import com.twitter.cassovary.graph.{GraphBehaviours, Node} +import com.twitter.cassovary.graph.{NodeIdEdgesMaxId, GraphBehaviours, Node} import com.twitter.cassovary.util.SequentialNodeNumberer import org.scalatest.{Matchers, WordSpec} @@ -37,8 +37,9 @@ class AdjacencyListGraphReaderSpec extends WordSpec with Matchers with GraphBeha ) trait GraphWithoutRenumberer { - val graph = AdjacencyListGraphReader.forIntIds(directory, - "toy_6nodes_adj").toSharedArrayBasedDirectedGraph() + val reader = AdjacencyListGraphReader.forIntIds(directory, + "toy_6nodes_adj") + val graph = reader.toSharedArrayBasedDirectedGraph() } trait GraphWithRenumberer { @@ -73,6 +74,15 @@ class AdjacencyListGraphReaderSpec extends WordSpec with Matchers with GraphBeha behave like graphEquivalentToMap(graph, toy6nodeMap) } } + + "reverse parse correctly" in { + new GraphWithoutRenumberer { + val node = NodeIdEdgesMaxId(10, Array(11, 12, 20)) + val nodeStr = "10 3\n11\n12\n20\n" + reader.reverseParseNode(node) shouldEqual nodeStr + } + } + } "AdjacencyListReader renumbered" when { diff --git a/cassovary-core/src/test/scala/com/twitter/cassovary/util/io/ListOfEdgesGraphReaderSpec.scala b/cassovary-core/src/test/scala/com/twitter/cassovary/util/io/ListOfEdgesGraphReaderSpec.scala index d88d1a28..f8f43823 100644 --- a/cassovary-core/src/test/scala/com/twitter/cassovary/util/io/ListOfEdgesGraphReaderSpec.scala +++ b/cassovary-core/src/test/scala/com/twitter/cassovary/util/io/ListOfEdgesGraphReaderSpec.scala @@ -13,7 +13,7 @@ */ package com.twitter.cassovary.util.io -import com.twitter.cassovary.graph.{GraphBehaviours, Node} +import com.twitter.cassovary.graph.{NodeIdEdgesMaxId, GraphBehaviours, Node} import com.twitter.cassovary.util.SequentialNodeNumberer import org.scalatest.{Matchers, WordSpec} @@ -36,8 +36,9 @@ class ListOfEdgesGraphReaderSpec extends WordSpec with GraphBehaviours[Node] wit private val directory: String = "cassovary-core/src/test/resources/graphs/" trait GraphWithIntIds { - val graph = ListOfEdgesGraphReader.forIntIds(directory, - "toy_list5edges").toArrayBasedDirectedGraph() + val reader = ListOfEdgesGraphReader.forIntIds(directory, + "toy_list5edges") + val graph = reader.toArrayBasedDirectedGraph() } trait GraphWithStringIds { @@ -67,7 +68,16 @@ class ListOfEdgesGraphReaderSpec extends WordSpec with GraphBehaviours[Node] wit behave like graphEquivalentToMap(graph, intGraphMap) } } + + "reverse parse a node correctly" in { + new GraphWithIntIds { + val node = NodeIdEdgesMaxId(10, Array(11, 12, 13)) + val nodeStr = "10 11\n10 12\n10 13\n" + reader.reverseParseNode(node) shouldEqual nodeStr + } + } } + "using String ids" should { "provide the correct graph properties" in { new GraphWithStringIds { diff --git a/cassovary-examples/src/main/scala/cross-partitioning/CrossPartitioning.scala b/cassovary-examples/src/main/scala/cross-partitioning/CrossPartitioning.scala new file mode 100644 index 00000000..e9e784ed --- /dev/null +++ b/cassovary-examples/src/main/scala/cross-partitioning/CrossPartitioning.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2015 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +import com.twitter.app.Flags +import com.twitter.cassovary.graph.distributed.{GraphFilesSplitter, HashSourceAndDestMapper} +import com.twitter.cassovary.util.io.AdjacencyListGraphReader + +object CrossPartitioning extends App { + + val flags = new Flags("Cross Partitioning") + val numInstances = flags("n", 10, "Number of instances/shards") + val inputGraphDir = flags("i", "/tmp/input-graph", "Input graph directory") + val subgraphsDir = flags("o", "/tmp/output-graph", "Output subgraphs directory") + val helpFlag = flags("h", false, "Print usage") + flags.parseArgs(args) + + val reader = AdjacencyListGraphReader.forIntIds(inputGraphDir(), "toy_6nodes_adj", null) + + def hashNodeFn(i: Int) = i + + val partitioner = new HashSourceAndDestMapper(numInstances(), hashNodeFn) + val splitter = new GraphFilesSplitter[Int](subgraphsDir(), partitioner, reader) + println(s"Now splitting graph in ${inputGraphDir()} into ${numInstances()} subgraphs.") + splitter.splitGraph() + println("Split is complete.") +} diff --git a/cassovary-examples/src/main/scala/cross-partitioning/create_info_all_instances.sh b/cassovary-examples/src/main/scala/cross-partitioning/create_info_all_instances.sh new file mode 100755 index 00000000..6ebd8d6e --- /dev/null +++ b/cassovary-examples/src/main/scala/cross-partitioning/create_info_all_instances.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +INPUT_GRAPH_FILES_PREFIX=$1 +OUTPUT_GRAPH_DIR=$2 + +ALL_INSTANCES_SUBDIR=$OUTPUT_GRAPH_DIR/all_instances +OUTDEGREES=$ALL_INSTANCES_SUBDIR/outdegrees.txt +INDEGREES=$ALL_INSTANCES_SUBDIR/indegrees.txt + +mkdir -p $ALL_INSTANCES_SUBDIR +echo Creating Outdegrees file in $OUTDEGREES ... +grep -h '. .' $INPUT_GRAPH_FILES_PREFIX* > $OUTDEGREES + +echo Creating Indegrees file in $INDEGREES ... +grep -h '^[0-9][0-9]*$' $INPUT_GRAPH_FILES_PREFIX* | sort -S2G | uniq -c | perl -lane 'print $F[1]," ", $F[0]' > $INDEGREES +echo Done everything. +