Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic union graph #203

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ abstract class DynamicDirectedGraphHashMap(val storedGraphDir: StoredGraphDir)
* Get the total number of edges in the graph. It's a very slow function. Use it carefully.
*/
def edgeCount = iterator.foldLeft(0) {(accum, node) =>
accum + node.inboundCount + node.outboundCount
accum + (storedGraphDir match {
case OnlyOut | BothInOut => node.outboundCount
case OnlyIn => node.inboundCount
case Mutual => 2 * node.outboundCount // count each edge twice
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.twitter.cassovary.graph

import com.twitter.cassovary.graph.StoredGraphDir.StoredGraphDir
import com.twitter.cassovary.graph.node.DynamicNode

/**
* Wraps a directed graph and a dynamic directed graph to create a dynamic graph with the union of their nodes and edges. When
* edge are added, they are added to the underlying dynamic graph. Node or edge deletion is not supported.
*/
class DynamicDirectedGraphUnion(staticGraph: DirectedGraph[Node], dynamicGraph: DynamicDirectedGraph[DynamicNode])
extends DynamicDirectedGraph[DynamicNode] {
// Because computing nodeCount as an intersection is expensive, maintain nodeCount as a variable.
private var _nodeCount = if (dynamicGraph.nodeCount == 0)
staticGraph.nodeCount
else
((staticGraph map (_.id)).toSet ++ (dynamicGraph map (_.id)).toSet).size

override def getNodeById(id: Int): Option[DynamicNode] =
if (staticGraph.existsNodeId(id) || dynamicGraph.existsNodeId(id)) {
Some(new DynamicNodeUnion(
staticGraph.getNodeById(id),
dynamicGraph.getOrCreateNode(id)))
} else {
None
}

override def getOrCreateNode(id: Int): DynamicNode = {
if (!staticGraph.existsNodeId(id) && !dynamicGraph.existsNodeId(id)) {
_nodeCount += 1
}
new DynamicNodeUnion(staticGraph.getNodeById(id), dynamicGraph.getOrCreateNode(id))
}

/**
* Adds the given edge to the underlying dynamic graph. Note that for efficiency we don't check if the edge already exists,
* so if the edge already exists, a 2nd copy of it will be added.
*/
override def addEdge(srcId: Int, destId: Int): Unit = dynamicGraph.addEdge(srcId, destId)

/** Not supported. */
override def removeEdge(srcId: Int, destId: Int): (Option[Node], Option[Node]) = throw new UnsupportedOperationException()

override def edgeCount: Long = staticGraph.edgeCount + dynamicGraph.edgeCount

override def nodeCount: Int = _nodeCount

assert(staticGraph.storedGraphDir == dynamicGraph.storedGraphDir)
override val storedGraphDir: StoredGraphDir = dynamicGraph.storedGraphDir

override def iterator: Iterator[DynamicNode] = {
val staticGraphIds = staticGraph.iterator map (_.id)
val additionalDynamicGraphIds = dynamicGraph.iterator map (_.id) filter (!staticGraph.existsNodeId(_))
(staticGraphIds ++ additionalDynamicGraphIds) map (id => getNodeById(id).get)
}
}

/** Represents the union of two nodes. */
private class DynamicNodeUnion(staticNodeOption: Option[Node],
dynamicNode: DynamicNode) extends DynamicNode {
override val id: Int = dynamicNode.id

override def inboundNodes(): Seq[Int] = staticNodeOption match {
case Some(staticNode) => new IndexedSeqUnion(staticNode.inboundNodes(), dynamicNode.inboundNodes())
case None => dynamicNode.inboundNodes()
}
override def outboundNodes(): Seq[Int] = staticNodeOption match {
case Some(staticNode) => new IndexedSeqUnion(staticNode.outboundNodes(), dynamicNode.outboundNodes())
case None => dynamicNode.outboundNodes()
}

// To make sure an edge (u, v) is added to both u's out-neighbors and v's in-neighbors,
// mutations should happen through the graph.
override def addInBoundNodes(nodeIds: Seq[Int]): Unit = throw new UnsupportedOperationException()
override def addOutBoundNodes(nodeIds: Seq[Int]): Unit = throw new UnsupportedOperationException()
override def removeInBoundNode(nodeId: Int): Unit = throw new UnsupportedOperationException()
override def removeOutBoundNode(nodeId: Int): Unit = throw new UnsupportedOperationException()
}

/** Represents the concatenation of two IndexedSeqs. */
// TODO: We assume xs and ys have efficient random access (are effectively IndexedSeqs). Refactoring Node to return
// IndexedSeq would remove this assumption
private class IndexedSeqUnion[A](xs: Seq[A], ys: Seq[A]) extends IndexedSeq[A] {
override def length: Int = xs.size + ys.size

override def apply(i: Int): A =
if (i < xs.size) {
xs(i)
} else if (i - xs.size < ys.size) {
ys(i - xs.size)
} else {
throw new IndexOutOfBoundsException(s"Invalid index $i")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.twitter.cassovary.graph

import org.scalatest.{Matchers, WordSpec}

class DynamicDirectedGraphUnionSpec extends WordSpec with Matchers {
val staticGraph1 = ArrayBasedDirectedGraph.apply(
Iterable(
NodeIdEdgesMaxId(1, Array(2, 3)),
NodeIdEdgesMaxId(3, Array(1, 2)),
NodeIdEdgesMaxId(5, Array(1))
),
StoredGraphDir.BothInOut,
NeighborsSortingStrategy.LeaveUnsorted)

"A DynamicDirectedGraph" should {
" correctly combine two graphs" in {
staticGraph1.nodeCount shouldEqual (4)
staticGraph1.edgeCount shouldEqual(5)
val dynamicGraph = new SynchronizedDynamicGraph()
dynamicGraph.addEdge(5, 6)
val unionGraph = new DynamicDirectedGraphUnion(staticGraph1, dynamicGraph)
(unionGraph map (_.id)) should contain theSameElementsAs (Seq(1, 2, 3, 5, 6))
unionGraph.nodeCount shouldEqual (5)
unionGraph.edgeCount shouldEqual(6)
unionGraph.getNodeById(5).get.outboundNodes should contain theSameElementsAs (Seq(1, 6))
unionGraph.getNodeById(6).get.inboundNodes should contain theSameElementsAs (Seq(5))
unionGraph.getNodeById(5).get.outboundNodes should contain theSameElementsAs (Seq(1, 6))
// Make sure getNodeById doesn't create the node
unionGraph.getNodeById(4) should be (None)
unionGraph.nodeCount shouldEqual (5)
unionGraph.getOrCreateNode(4)
unionGraph.nodeCount shouldEqual (6)
unionGraph.addEdge(1, 4)
unionGraph.getNodeById(4).get.inboundNodes should contain theSameElementsAs (Seq(1))
unionGraph.addEdge(1, 6)
unionGraph.getOrCreateNode(1).outboundNodes should contain theSameElementsAs (Seq(2, 3, 4, 6))
unionGraph.edgeCount shouldEqual(8)
}

" throw an error given an invalid filename" in {
a[NullPointerException] should be thrownBy {
new DynamicDirectedGraphUnion(staticGraph1, null)
}
}
}
}