Skip to content

Commit

Permalink
graph batch and unbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
jermainewang committed Oct 3, 2018
1 parent 7d04c8c commit 2be55fb
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 62 deletions.
20 changes: 17 additions & 3 deletions include/dgl/graph_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,28 @@ class GraphOp {
/*!
* \brief Partition the graph into several subgraphs.
*
* The graph will be partitioned by the node ids. Edges between partitions
* will be ignored. This requires the given number of partitions to evenly
* This is a reverse operation of DisjointUnion. The graph will be partitioned
* into num graphs. This requires the given number of partitions to evenly
* divides the number of nodes in the graph.
*
* \param graph The graph to be partitioned.
* \param num The number of partitions.
* \return a list of partitioned graphs
*/
static std::vector<Graph> PartitionByNum(const Graph* graph, size_t num);
static std::vector<Graph> DisjointPartitionByNum(const Graph* graph, int64_t num);

/*!
* \brief Partition the graph into several subgraphs.
*
* This is a reverse operation of DisjointUnion. The graph will be partitioned
* based on the given sizes. This requires the sum of the given sizes is equal
* to the number of nodes in the graph.
*
* \param graph The graph to be partitioned.
* \param sizes The number of partitions.
* \return a list of partitioned graphs
*/
static std::vector<Graph> DisjointPartitionBySizes(const Graph* graph, IdArray sizes);
};

} // namespace dgl
Expand Down
69 changes: 40 additions & 29 deletions python/dgl/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .graph import DGLGraph
from . import graph_index as gi
from . import backend as F
from . import utils

class BatchedDGLGraph(DGLGraph):
"""The batched DGL graph.
Expand All @@ -24,7 +25,6 @@ class BatchedDGLGraph(DGLGraph):
The edge attributes to also be batched.
"""
def __init__(self, graph_list, node_attrs, edge_attrs):
# TODO(minjie): handle the input is again a batched graph.
# create batched graph index
batched_index = gi.disjoint_union([g._graph for g in graph_list])
# create batched node and edge frames
Expand All @@ -43,9 +43,19 @@ def __init__(self, graph_list, node_attrs, edge_attrs):
edge_frame=batched_edge_frame)

# extra members
self._batch_size = len(graph_list)
self._batch_num_nodes = [gr.number_of_nodes() for gr in graph_list]
self._batch_num_edges = [gr.number_of_edges() for gr in graph_list]
self._batch_size = 0
self._batch_num_nodes = []
self._batch_num_edges = []
for gr in graph_list:
if isinstance(gr, BatchedDGLGraph):
# handle the input is again a batched graph.
self._batch_size += gr._batch_size
self._batch_num_nodes += gr._batch_num_nodes
self._batch_num_edges += gr._batch_num_edges
else:
self._batch_size += 1
self._batch_num_nodes.append(gr.number_of_nodes())
self._batch_num_edges.append(gr.number_of_edges())

@property
def batch_size(self):
Expand Down Expand Up @@ -78,10 +88,12 @@ def add_edges(self, u, v, reprs=None):
# new APIs
def __getitem__(self, idx):
"""Slice the batch and return the batch of graphs specified by the idx."""
# TODO
pass

def __setitem__(self, idx, val):
"""Set the value of the slice. The graph size cannot be changed."""
# TODO
pass

'''
Expand Down Expand Up @@ -114,37 +126,36 @@ def split(graph_batch, num_or_size_splits):
# TODO(minjie): could follow torch.split syntax
pass

def unbatch(graph_batch):
def unbatch(graph):
"""Unbatch the graph and return a list of subgraphs.
Parameters
----------
graph_batch : DGLGraph
graph : BatchedDGLGraph
The batched graph.
"""
assert False, "disabled for now"
graph_list = graph_batch.graph_list
num_graphs = len(graph_list)
# split and set node attrs
attrs = [{} for _ in range(num_graphs)] # node attr dict for each graph
for key in graph_batch.node_attr_schemes():
vals = F.unpack(graph_batch.pop_n_repr(key), graph_batch.num_nodes)
for attr, val in zip(attrs, vals):
attr[key] = val
for attr, g in zip(attrs, graph_list):
g.set_n_repr(attr)

# split and set edge attrs
attrs = [{} for _ in range(num_graphs)] # edge attr dict for each graph
for key in graph_batch.edge_attr_schemes():
vals = F.unpack(graph_batch.pop_e_repr(key), graph_batch.num_edges)
for attr, val in zip(attrs, vals):
attr[key] = val
for attr, g in zip(attrs, graph_list):
g.set_e_repr(attr)

return graph_list

assert isinstance(graph, BatchedDGLGraph)
bsize = graph.batch_size
bn = graph.batch_num_nodes
be = graph.batch_num_edges
pttns = gi.disjoint_partition(graph._graph, utils.toindex(bn))
# split the frames
node_frames = [FrameRef() for i in range(bsize)]
edge_frames = [FrameRef() for i in range(bsize)]
for attr, col in graph._node_frame.items():
# TODO: device context
col_splits = F.unpack(col, bn)
for i in range(bsize):
node_frames[i][attr] = col_splits[i]
for attr, col in graph._edge_frame.items():
# TODO: device context
col_splits = F.unpack(col, be)
for i in range(bsize):
edge_frames[i][attr] = col_splits[i]
return [DGLGraph(graph_data=pttns[i],
node_frame=node_frames[i],
edge_frame=edge_frames[i]) for i in range(bsize)]

def batch(graph_list, node_attrs=ALL, edge_attrs=ALL):
"""Batch a list of DGLGraphs into one single graph.
Expand Down
34 changes: 34 additions & 0 deletions python/dgl/graph_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,40 @@ def disjoint_union(graphs):
handle = _CAPI_DGLDisjointUnion(inputs, len(graphs))
return GraphIndex(handle)

def disjoint_partition(graph, num_or_size_splits):
"""Partition the graph disjointly.
This is a reverse operation of DisjointUnion. The graph will be partitioned
into num graphs. This requires the given number of partitions to evenly
divides the number of nodes in the graph. If the a size list is given,
the sum of the given sizes is equal.
Parameters
----------
graph : GraphIndex
The graph to be partitioned
num_or_size_splits : int or utils.Index
The partition number of size splits
Returns
-------
list of GraphIndex
The partitioned graphs
"""
if isinstance(num_or_size_splits, utils.Index):
rst = _CAPI_DGLDisjointPartitionBySizes(
graph._handle,
num_or_size_splits.todgltensor())
else:
rst = _CAPI_DGLDisjointPartitionByNum(
graph._handle,
int(num_or_size_splits))
graphs = []
for val in rst.asnumpy():
handle = ctypes.cast(int(val), ctypes.c_void_p)
graphs.append(GraphIndex(handle))
return graphs

def create_graph_index(graph_data=None):
"""Create a graph index object.
Expand Down
36 changes: 36 additions & 0 deletions src/graph/graph_apis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ using tvm::runtime::TVMArgs;
using tvm::runtime::TVMArgValue;
using tvm::runtime::TVMRetValue;
using tvm::runtime::PackedFunc;
using tvm::runtime::NDArray;

namespace dgl {

Expand Down Expand Up @@ -289,4 +290,39 @@ TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLDisjointUnion")
*rv = ghandle;
});

TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLDisjointPartitionByNum")
.set_body([] (TVMArgs args, TVMRetValue* rv) {
GraphHandle ghandle = args[0];
const Graph* gptr = static_cast<Graph*>(ghandle);
int64_t num = args[1];
std::vector<Graph>&& rst = GraphOp::DisjointPartitionByNum(gptr, num);
// return the pointer array as an integer array
const int64_t len = rst.size();
NDArray ptr_array = NDArray::Empty({len}, DLDataType{kDLInt, 64, 1}, DLContext{kDLCPU, 0});
int64_t* ptr_array_data = static_cast<int64_t*>(ptr_array->data);
for (size_t i = 0; i < rst.size(); ++i) {
Graph* ptr = new Graph();
*ptr = std::move(rst[i]);
ptr_array_data[i] = reinterpret_cast<std::intptr_t>(ptr);
}
*rv = ptr_array;
});

TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLDisjointPartitionBySizes")
.set_body([] (TVMArgs args, TVMRetValue* rv) {
GraphHandle ghandle = args[0];
const Graph* gptr = static_cast<Graph*>(ghandle);
const IdArray sizes = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
std::vector<Graph>&& rst = GraphOp::DisjointPartitionBySizes(gptr, sizes);
// return the pointer array as an integer array
const int64_t len = rst.size();
NDArray ptr_array = NDArray::Empty({len}, DLDataType{kDLInt, 64, 1}, DLContext{kDLCPU, 0});
int64_t* ptr_array_data = static_cast<int64_t*>(ptr_array->data);
for (size_t i = 0; i < rst.size(); ++i) {
Graph* ptr = new Graph();
*ptr = std::move(rst[i]);
ptr_array_data[i] = reinterpret_cast<std::intptr_t>(ptr);
}
*rv = ptr_array;
});
} // namespace dgl
88 changes: 88 additions & 0 deletions src/graph/graph_op.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Graph operation implementation
#include <dgl/graph_op.h>
#include <algorithm>

namespace dgl {

Expand All @@ -16,4 +17,91 @@ Graph GraphOp::DisjointUnion(std::vector<const Graph*> graphs) {
return rst;
}

std::vector<Graph> GraphOp::DisjointPartitionByNum(const Graph* graph, int64_t num) {
CHECK(num != 0 && graph->NumVertices() % num == 0)
<< "Number of partitions must evenly divide the number of nodes.";
IdArray sizes = IdArray::Empty({num}, DLDataType{kDLInt, 64, 1}, DLContext{kDLCPU, 0});
int64_t* sizes_data = static_cast<int64_t*>(sizes->data);
std::fill(sizes_data, sizes_data + num, graph->NumVertices() / num);
return DisjointPartitionBySizes(graph, sizes);
}

std::vector<Graph> GraphOp::DisjointPartitionBySizes(const Graph* graph, IdArray sizes) {
const int64_t len = sizes->shape[0];
const int64_t* sizes_data = static_cast<int64_t*>(sizes->data);
std::vector<int64_t> cumsum;
cumsum.push_back(0);
for (int64_t i = 0; i < len; ++i) {
cumsum.push_back(cumsum[i] + sizes_data[i]);
}
CHECK_EQ(cumsum[len], graph->NumVertices())
<< "Sum of the given sizes must equal to the number of nodes.";
dgl_id_t node_offset = 0, edge_offset = 0;
std::vector<Graph> rst(len);
for (int64_t i = 0; i < len; ++i) {
// copy adj
rst[i].adjlist_.insert(rst[i].adjlist_.end(),
graph->adjlist_.begin() + node_offset,
graph->adjlist_.begin() + node_offset + sizes_data[i]);
rst[i].reverse_adjlist_.insert(rst[i].reverse_adjlist_.end(),
graph->reverse_adjlist_.begin() + node_offset,
graph->reverse_adjlist_.begin() + node_offset + sizes_data[i]);
// relabel adjs
size_t num_edges = 0;
for (auto& elist : rst[i].adjlist_) {
for (size_t j = 0; j < elist.succ.size(); ++j) {
elist.succ[j] -= node_offset;
elist.edge_id[j] -= edge_offset;
}
num_edges += elist.succ.size();
}
for (auto& elist : rst[i].reverse_adjlist_) {
for (size_t j = 0; j < elist.succ.size(); ++j) {
elist.succ[j] -= node_offset;
elist.edge_id[j] -= edge_offset;
}
}
// copy edges
rst[i].all_edges_src_.reserve(num_edges);
rst[i].all_edges_dst_.reserve(num_edges);
rst[i].num_edges_ = num_edges;
for (size_t j = edge_offset; j < edge_offset + num_edges; ++j) {
rst[i].all_edges_src_.push_back(graph->all_edges_src_[j] - node_offset);
rst[i].all_edges_dst_.push_back(graph->all_edges_dst_[j] - node_offset);
}
// update offset
CHECK_EQ(rst[i].NumVertices(), sizes_data[i]);
CHECK_EQ(rst[i].NumEdges(), num_edges);
node_offset += sizes_data[i];
edge_offset += num_edges;
}
/*for (int64_t i = 0; i < len; ++i) {
rst[i].AddVertices(sizes_data[i]);
}
for (dgl_id_t eid = 0; eid < graph->num_edges_; ++eid) {
const dgl_id_t src = graph->all_edges_src_[eid];
const dgl_id_t dst = graph->all_edges_dst_[eid];
size_t src_select = 0, dst_select = 0;
for (size_t i = 1; i < cumsum.size(); ++i) { // TODO: replace with binary search
if (cumsum[i] > src) {
src_select = i;
break;
}
}
for (size_t i = 1; i < cumsum.size(); ++i) { // TODO: replace with binary search
if (cumsum[i] > dst) {
dst_select = i;
break;
}
}
if (src_select != dst_select) {
// the edge is ignored if across two partitions
continue;
}
const int64_t offset = cumsum[src_select - 1];
rst[src_select - 1].AddEdge(src - offset, dst - offset);
}*/
return rst;
}

} // namespace dgl
Loading

0 comments on commit 2be55fb

Please sign in to comment.