Skip to content

Commit

Permalink
[Bugfix] Bug fixes in new dataloader (dmlc#3727)
Browse files Browse the repository at this point in the history
* fixes

* fix

* more fixes

* update

* oops

* lint?

* temporarily revert - will fix in another PR

* more fixes

* skipping mxnet test

* address comments

* fix DDP

* fix edge dataloader exclusion problems

* stupid bug

* fix

* use_uvm option

* fix

* fixes

* fixes

* fixes

* fixes

* add evaluation for cluster gcn and ddp

* stupid bug again

* fixes

* move sanity checks to only support DGLGraphs

* pytorch lightning compatibility fixes

* remove

* poke

* more fixes

* fix

* fix

* disable test

* docstrings

* why is it getting a memory leak?

* fix

* update

* updates and temporarily disable forkingpickler

* update

* fix?

* fix?

* oops

* oops

* fix

* lint

* huh

* uh

* update

* fix

* made it memory efficient

* refine exclude interface

* fix tutorial

* fix tutorial

* fix graph duplication in CPU dataloader workers

* lint

* lint

* Revert "lint"

This reverts commit 805484d.

* Revert "lint"

This reverts commit 0bce411.

* Revert "fix graph duplication in CPU dataloader workers"

This reverts commit 9e3a8cf.

Co-authored-by: xiny <[email protected]>
Co-authored-by: Jinjing Zhou <[email protected]>
  • Loading branch information
3 people authored Feb 21, 2022
1 parent 7b9afbf commit 3f138eb
Show file tree
Hide file tree
Showing 46 changed files with 1,545 additions and 334 deletions.
37 changes: 26 additions & 11 deletions examples/pytorch/__temporary__/cluster_gcn/cluster_gcn.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import numpy as np
from ogb.nodeproppred import DglNodePropPredDataset

USE_WRAPPER = True

class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes):
super().__init__()
Expand Down Expand Up @@ -40,11 +38,6 @@ def forward(self, sg, x):
model = SAGE(graph.ndata['feat'].shape[1], 256, dataset.num_classes).cuda()
opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)

if USE_WRAPPER:
import dglnew
graph.create_formats_()
graph = dglnew.graph.wrapper.DGLGraphStorage(graph)

num_partitions = 1000
sampler = dgl.dataloading.ClusterGCNSampler(
graph, num_partitions,
Expand All @@ -61,14 +54,13 @@ def forward(self, sg, x):
batch_size=100,
shuffle=True,
drop_last=False,
pin_memory=True,
num_workers=8,
persistent_workers=True,
use_prefetch_thread=True) # TBD: could probably remove this argument
num_workers=0,
use_uva=True)

durations = []
for _ in range(10):
t0 = time.time()
model.train()
for it, sg in enumerate(dataloader):
x = sg.ndata['feat']
y = sg.ndata['label'][:, 0]
Expand All @@ -85,4 +77,27 @@ def forward(self, sg, x):
tt = time.time()
print(tt - t0)
durations.append(tt - t0)

model.eval()
with torch.no_grad():
val_preds, test_preds = [], []
val_labels, test_labels = [], []
for it, sg in enumerate(dataloader):
x = sg.ndata['feat']
y = sg.ndata['label'][:, 0]
m_val = sg.ndata['valid_mask']
m_test = sg.ndata['test_mask']
y_hat = model(sg, x)
val_preds.append(y_hat[m_val])
val_labels.append(y[m_val])
test_preds.append(y_hat[m_test])
test_labels.append(y[m_test])
val_preds = torch.cat(val_preds, 0)
val_labels = torch.cat(val_labels, 0)
test_preds = torch.cat(test_preds, 0)
test_labels = torch.cat(test_labels, 0)
val_acc = MF.accuracy(val_preds, val_labels)
test_acc = MF.accuracy(test_preds, test_labels)
print('Validation acc:', val_acc.item(), 'Test acc:', test_acc.item())

print(np.mean(durations[4:]), np.std(durations[4:]))
105 changes: 72 additions & 33 deletions examples/pytorch/__temporary__/graphsage/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import time
import numpy as np
from ogb.nodeproppred import DglNodePropPredDataset
import tqdm

USE_WRAPPER = False
USE_UVA = True

class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes):
Expand All @@ -20,6 +21,8 @@ def __init__(self, in_feats, n_hidden, n_classes):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
self.dropout = nn.Dropout(0.5)
self.n_hidden = n_hidden
self.n_classes = n_classes

def forward(self, blocks, x):
h = x
Expand All @@ -30,41 +33,66 @@ def forward(self, blocks, x):
h = self.dropout(h)
return h

def inference(self, g, device, batch_size, num_workers, buffer_device=None):
# The difference between this inference function and the one in the official
# example is that the intermediate results can also benefit from prefetching.
g.ndata['h'] = g.ndata['feat']
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1, prefetch_node_feats=['h'])
dataloader = dgl.dataloading.NodeDataLoader(
g, torch.arange(g.num_nodes()).to(g.device), sampler, device=device,
batch_size=1000, shuffle=False, drop_last=False, num_workers=num_workers,
persistent_workers=(num_workers > 0))
if buffer_device is None:
buffer_device = device

def train(rank, world_size, graph, num_classes, split_idx):
for l, layer in enumerate(self.layers):
y = torch.zeros(
g.num_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes,
device=buffer_device)
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
x = blocks[0].srcdata['h']
h = layer(blocks[0], x)
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
y[output_nodes] = h.to(buffer_device)
g.ndata['h'] = y
return y


def train(rank, world_size, shared_memory_name, features, num_classes, split_idx):
torch.cuda.set_device(rank)
dist.init_process_group('nccl', 'tcp://127.0.0.1:12347', world_size=world_size, rank=rank)

graph = dgl.hetero_from_shared_memory(shared_memory_name)
feat, labels = features
graph.ndata['feat'] = feat
graph.ndata['label'] = labels

model = SAGE(graph.ndata['feat'].shape[1], 256, num_classes).cuda()
model = nn.parallel.DistributedDataParallel(model, device_ids=[rank], output_device=rank)
opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)

train_idx, valid_idx, test_idx = split_idx['train'], split_idx['valid'], split_idx['test']
if USE_WRAPPER:
import dglnew
graph = dglnew.graph.wrapper.DGLGraphStorage(graph)

if USE_UVA:
train_idx = train_idx.to('cuda')

sampler = dgl.dataloading.NeighborSampler(
[5, 5, 5], output_device='cpu', prefetch_node_feats=['feat'],
prefetch_labels=['label'])
dataloader = dgl.dataloading.NodeDataLoader(
graph,
train_idx,
sampler,
device='cuda',
batch_size=1000,
shuffle=True,
drop_last=False,
pin_memory=True,
num_workers=4,
persistent_workers=True,
use_ddp=True,
use_prefetch_thread=True) # TBD: could probably remove this argument
[15, 10, 5], prefetch_node_feats=['feat'], prefetch_labels=['label'])
train_dataloader = dgl.dataloading.NodeDataLoader(
graph, train_idx, sampler,
device='cuda', batch_size=1000, shuffle=True, drop_last=False,
num_workers=0, use_ddp=True, use_uva=USE_UVA)
valid_dataloader = dgl.dataloading.NodeDataLoader(
graph, valid_idx, sampler, device='cuda', batch_size=1024, shuffle=True,
drop_last=False, num_workers=0, use_uva=USE_UVA)

durations = []
for _ in range(10):
model.train()
t0 = time.time()
for it, (input_nodes, output_nodes, blocks) in enumerate(dataloader):
for it, (input_nodes, output_nodes, blocks) in enumerate(train_dataloader):
x = blocks[0].srcdata['feat']
y = blocks[-1].dstdata['label'][:, 0]
y_hat = model(blocks, x)
Expand All @@ -80,27 +108,38 @@ def train(rank, world_size, graph, num_classes, split_idx):
if rank == 0:
print(tt - t0)
durations.append(tt - t0)

model.eval()
ys = []
y_hats = []
for it, (input_nodes, output_nodes, blocks) in enumerate(valid_dataloader):
with torch.no_grad():
x = blocks[0].srcdata['feat']
ys.append(blocks[-1].dstdata['label'])
y_hats.append(model.module(blocks, x))
acc = MF.accuracy(torch.cat(y_hats), torch.cat(ys))
print('Validation acc:', acc.item())
dist.barrier()

if rank == 0:
print(np.mean(durations[4:]), np.std(durations[4:]))
model.eval()
with torch.no_grad():
pred = model.module.inference(graph, 'cuda', 1000, 12, graph.device)
acc = MF.accuracy(pred.to(graph.device), graph.ndata['label'])
print('Test acc:', acc.item())

if __name__ == '__main__':
dataset = DglNodePropPredDataset('ogbn-products')
graph, labels = dataset[0]
graph.ndata['label'] = labels
graph.create_formats_()
shared_memory_name = 'shm' # can be any string
feat = graph.ndata['feat']
graph = graph.shared_memory(shared_memory_name)
split_idx = dataset.get_idx_split()
num_classes = dataset.num_classes
n_procs = 4

# Tested with mp.spawn and fork. Both worked and got 4s per epoch with 4 GPUs
# and 3.86s per epoch with 8 GPUs on p2.8x, compared to 5.2s from official examples.
#import torch.multiprocessing as mp
#mp.spawn(train, args=(n_procs, graph, num_classes, split_idx), nprocs=n_procs)
import dgl.multiprocessing as mp
procs = []
for i in range(n_procs):
p = mp.Process(target=train, args=(i, n_procs, graph, num_classes, split_idx))
p.start()
procs.append(p)
for p in procs:
p.join()
import torch.multiprocessing as mp
mp.spawn(train, args=(n_procs, shared_memory_name, (feat, labels), num_classes, split_idx), nprocs=n_procs)
Loading

0 comments on commit 3f138eb

Please sign in to comment.