Skip to content

Commit

Permalink
[GraphBolt] remove SingleProcessDataLoader (dmlc#6663)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rhett-Ying authored Dec 4, 2023
1 parent 018df05 commit 2968c9b
Show file tree
Hide file tree
Showing 26 changed files with 60 additions and 113 deletions.
12 changes: 1 addition & 11 deletions docs/source/api/python/dgl.graphbolt.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ APIs
:nosignatures:
:template: graphbolt_classtemplate.rst

DataLoader
Dataset
Task
ItemSet
Expand All @@ -35,17 +36,6 @@ APIs
CopyTo


DataLoaders
-----------

.. autosummary::
:toctree: ../../generated/
:nosignatures:
:template: graphbolt_classtemplate.rst

SingleProcessDataLoader
MultiProcessDataLoader

Standard Implementations
-------------------------

Expand Down
6 changes: 3 additions & 3 deletions docs/source/guide/minibatch-custom-sampler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The code below implements a classical neighbor sampler:
seed_nodes = input_nodes
return input_nodes, subgs
To use this sampler with :class:`~dgl.graphbolt.MultiProcessDataLoader`:
To use this sampler with :class:`~dgl.graphbolt.DataLoader`:

.. code:: python
Expand All @@ -49,7 +49,7 @@ To use this sampler with :class:`~dgl.graphbolt.MultiProcessDataLoader`:
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
for data in dataloader:
input_features = data.node_features["feat"]
Expand Down Expand Up @@ -95,7 +95,7 @@ can be used on heterogeneous graphs:
)
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
for data in dataloader:
input_features = {
Expand Down
6 changes: 3 additions & 3 deletions docs/source/guide/minibatch-edge.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ edges(namely, node pairs) in the training set instead of the nodes.
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
Iterating over the DataLoader will yield :class:`~dgl.graphbolt.DGLMiniBatch`
which contains a list of specially created graphs representing the computation
Expand Down Expand Up @@ -93,7 +93,7 @@ You can use :func:`~dgl.graphbolt.exclude_seed_edges` alongside with
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
Adapt your model for minibatch training
Expand Down Expand Up @@ -275,7 +275,7 @@ only difference is that the train_set is now an instance of
)
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
Things become a little different if you wish to exclude the reverse
edges on heterogeneous graphs. On heterogeneous graphs, reverse edges
Expand Down
2 changes: 1 addition & 1 deletion docs/source/guide/minibatch-inference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ only one layer at a time.
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
Note that offline inference is implemented as a method of the GNN module
Expand Down
4 changes: 2 additions & 2 deletions docs/source/guide/minibatch-link.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ The whole data loader pipeline is as follows:
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
For the details about the builtin uniform negative sampler please see
Expand Down Expand Up @@ -215,7 +215,7 @@ only difference is that you need to give edge types for feature fetching.
)
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
If you want to give your own negative sampling function, just inherit from the
:class:`~dgl.graphbolt.NegativeSampler` class and override the
Expand Down
8 changes: 4 additions & 4 deletions docs/source/guide/minibatch-node.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ or the equivalent function-like interface :func:`~dgl.graphbolt.sample_neighbor`
which makes the node gather messages from its neighbors.

To use a sampler provided by DGL, one also need to combine it with
:class:`~dgl.graphbolt.MultiProcessDataLoader`, which iterates
:class:`~dgl.graphbolt.DataLoader`, which iterates
over a set of indices (nodes in this case) in minibatches.

For example, the following code creates a DataLoader that
Expand All @@ -52,7 +52,7 @@ putting the list of generated MFGs onto GPU.
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
Iterating over the DataLoader will yield :class:`~dgl.graphbolt.DGLMiniBatch`
Expand Down Expand Up @@ -196,7 +196,7 @@ removed for simplicity):
The samplers provided by DGL also support heterogeneous graphs.
For example, one can still use the provided
:class:`~dgl.graphbolt.NeighborSampler` class and
:class:`~dgl.graphbolt.MultiProcessDataLoader` class for
:class:`~dgl.graphbolt.DataLoader` class for
stochastic training. The only difference is that the itemset is now an
instance of :class:`~dgl.graphbolt.ItemSetDict` which is a dictionary
of node types to node IDs.
Expand All @@ -217,7 +217,7 @@ of node types to node IDs.
)
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
The training loop is almost the same as that of homogeneous graphs,
except for the implementation of ``compute_loss`` that will take in two
Expand Down
4 changes: 2 additions & 2 deletions docs/source/guide/minibatch-parallelism.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ generate a minibatch, including:
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
dataloader = gb.DataLoader(datapipe, num_workers=0)
All these stages are implemented in separate
`IterableDataPipe <https://pytorch.org/data/main/torchdata.datapipes.iter.html>`__
Expand Down Expand Up @@ -52,5 +52,5 @@ which prefetches elements from previous data pipes and puts them into a buffer.
Such prefetching is totally transparent to users and requires no extra code. It
brings a significant performance boost to minibatch training of GNNs.

Please refer to the source code of :class:`~dgl.graphbolt.MultiProcessDataLoader`
Please refer to the source code of :class:`~dgl.graphbolt.DataLoader`
for more details.
4 changes: 1 addition & 3 deletions examples/multigpu/graphbolt/node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ def create_dataloader(
# A CopyTo object copying data in the datapipe to a specified device.\
############################################################################
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(
datapipe, num_workers=args.num_workers
)
dataloader = gb.DataLoader(datapipe, num_workers=args.num_workers)

# Return the fully-initialized DataLoader object.
return dataloader
Expand Down
4 changes: 1 addition & 3 deletions examples/sampling/graphbolt/lightning/node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ def create_dataloader(self, node_set, is_train):
datapipe = sampler(self.graph, self.fanouts)
datapipe = datapipe.fetch_feature(self.feature_store, ["feat"])
datapipe = datapipe.to_dgl()
dataloader = gb.MultiProcessDataLoader(
datapipe, num_workers=self.num_workers
)
dataloader = gb.DataLoader(datapipe, num_workers=self.num_workers)
return dataloader

########################################################################
Expand Down
4 changes: 2 additions & 2 deletions examples/sampling/graphbolt/link_prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ def create_dataloader(args, graph, features, itemset, is_train=True):
# 'datapipe': The datapipe object to be used for data loading.
# 'args.num_workers': The number of processes to be used for data loading.
# [Output]:
# A MultiProcessDataLoader object to handle data loading.
# A DataLoader object to handle data loading.
# [Role]:
# Initialize a multi-process dataloader to load the data in parallel.
############################################################################
dataloader = gb.MultiProcessDataLoader(
dataloader = gb.DataLoader(
datapipe,
num_workers=args.num_workers,
)
Expand Down
6 changes: 3 additions & 3 deletions examples/sampling/graphbolt/node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,16 @@ def create_dataloader(

############################################################################
# [Step-6]:
# gb.MultiProcessDataLoader()
# gb.DataLoader()
# [Input]:
# 'datapipe': The datapipe object to be used for data loading.
# 'num_workers': The number of processes to be used for data loading.
# [Output]:
# A MultiProcessDataLoader object to handle data loading.
# A DataLoader object to handle data loading.
# [Role]:
# Initialize a multi-process dataloader to load the data in parallel.
############################################################################
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=num_workers)
dataloader = gb.DataLoader(datapipe, num_workers=num_workers)

# Return the fully-initialized DataLoader object.
return dataloader
Expand Down
2 changes: 1 addition & 1 deletion examples/sampling/graphbolt/quickstart/link_prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def create_dataloader(dateset, device, is_train=True):
datapipe = datapipe.copy_to(device)

# Initiate the dataloader for the datapipe.
return gb.SingleProcessDataLoader(datapipe)
return gb.DataLoader(datapipe)


class GraphSAGE(nn.Module):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def create_dataloader(dateset, itemset, device):
datapipe = datapipe.copy_to(device)

# Initiate the dataloader for the datapipe.
return gb.SingleProcessDataLoader(datapipe)
return gb.DataLoader(datapipe)


class GCN(nn.Module):
Expand Down
2 changes: 1 addition & 1 deletion examples/sampling/graphbolt/rgcn/hetero_rgcn.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def create_dataloader(
# Create a DataLoader from the datapipe.
# `num_workers`:
# The number of worker processes to use for data loading.
return gb.MultiProcessDataLoader(datapipe, num_workers=num_workers)
return gb.DataLoader(datapipe, num_workers=num_workers)


def extract_embed(node_embed, input_nodes):
Expand Down
2 changes: 1 addition & 1 deletion examples/sampling/graphbolt/sparse/graphsage.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def create_dataloader(A, fanouts, ids, features, device):
# Use grapbolt to fetch features.
datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"])
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=4)
dataloader = gb.DataLoader(datapipe, num_workers=4)
return dataloader


Expand Down
4 changes: 2 additions & 2 deletions notebooks/stochastic_training/link_prediction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
"datapipe = datapipe.fetch_feature(feature, node_feature_keys=[\"feat\"])\n",
"datapipe = datapipe.to_dgl()\n",
"datapipe = datapipe.copy_to(device)\n",
"train_dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)"
"train_dataloader = gb.DataLoader(datapipe, num_workers=0)"
],
"metadata": {
"id": "LZgXGfBvYijJ"
Expand Down Expand Up @@ -344,7 +344,7 @@
"datapipe = datapipe.fetch_feature(feature, node_feature_keys=[\"feat\"])\n",
"datapipe = datapipe.to_dgl()\n",
"datapipe = datapipe.copy_to(device)\n",
"eval_dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)\n",
"eval_dataloader = gb.DataLoader(datapipe, num_workers=0)\n",
"\n",
"logits = []\n",
"labels = []\n",
Expand Down
6 changes: 3 additions & 3 deletions notebooks/stochastic_training/node_classification.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
"source": [
"## Defining Neighbor Sampler and Data Loader in DGL\n",
"\n",
"DGL provides tools to iterate over the dataset in minibatches while generating the computation dependencies to compute their outputs with the MFGs above. For node classification, you can use `dgl.graphbolt.MultiProcessDataLoader` for iterating over the dataset. It accepts a data pipe that generates minibatches of nodes and their labels, sample neighbors for each node, and generate the computation dependencies in the form of MFGs. Feature fetching, block creation and copying to target device are also supported. All these operations are split into separate stages in the data pipe, so that you can customize the data pipeline by inserting your own operations.\n",
"DGL provides tools to iterate over the dataset in minibatches while generating the computation dependencies to compute their outputs with the MFGs above. For node classification, you can use `dgl.graphbolt.DataLoader` for iterating over the dataset. It accepts a data pipe that generates minibatches of nodes and their labels, sample neighbors for each node, and generate the computation dependencies in the form of MFGs. Feature fetching, block creation and copying to target device are also supported. All these operations are split into separate stages in the data pipe, so that you can customize the data pipeline by inserting your own operations.\n",
"\n",
"Let’s say that each node will gather messages from 4 neighbors on each layer. The code defining the data loader and neighbor sampler will look like the following.\n"
],
Expand All @@ -154,7 +154,7 @@
"datapipe = datapipe.fetch_feature(feature, node_feature_keys=[\"feat\"])\n",
"datapipe = datapipe.to_dgl()\n",
"datapipe = datapipe.copy_to(device)\n",
"train_dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)"
"train_dataloader = gb.DataLoader(datapipe, num_workers=0)"
],
"metadata": {
"id": "yQVYDO0ZbBvi"
Expand Down Expand Up @@ -287,7 +287,7 @@
"datapipe = datapipe.fetch_feature(feature, node_feature_keys=[\"feat\"])\n",
"datapipe = datapipe.to_dgl()\n",
"datapipe = datapipe.copy_to(device)\n",
"valid_dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)\n",
"valid_dataloader = gb.DataLoader(datapipe, num_workers=0)\n",
"\n",
"\n",
"import sklearn.metrics"
Expand Down
40 changes: 3 additions & 37 deletions python/dgl/graphbolt/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@


__all__ = [
"SingleProcessDataLoader",
"MultiProcessDataLoader",
"DataLoader",
]


Expand All @@ -36,38 +35,6 @@ def _find_and_wrap_parent(
)


class SingleProcessDataLoader(torch.utils.data.DataLoader):
"""Single process DataLoader.
Iterates over the data pipeline in the main process.
Parameters
----------
datapipe : DataPipe
The data pipeline.
"""

# In the single process dataloader case, we don't need to do any
# modifications to the datapipe, and we just PyTorch's native
# dataloader as-is.
#
# The exception is that batch_size should be None, since we already
# have minibatch sampling and collating in ItemSampler.
def __init__(self, datapipe):
datapipe_graph = dp_utils.traverse_dps(datapipe)
datapipe_adjlist = datapipe_graph_to_adjlist(datapipe_graph)
# Cut datapipe at CopyTo and wrap with prefetcher. This enables the
# data pipeline up to the CopyTo operation to run in a separate thread.
_find_and_wrap_parent(
datapipe_graph,
datapipe_adjlist,
CopyTo,
dp.iter.Prefetcher,
buffer_size=2,
)
super().__init__(datapipe, batch_size=None, num_workers=0)


class MultiprocessingWrapper(dp.iter.IterDataPipe):
"""Wraps a datapipe with multiprocessing.
Expand Down Expand Up @@ -97,7 +64,7 @@ def __iter__(self):
yield from self.dataloader


class MultiProcessDataLoader(torch.utils.data.DataLoader):
class DataLoader(torch.utils.data.DataLoader):
"""Multiprocessing DataLoader.
Iterates over the data pipeline with everything before feature fetching
Expand All @@ -112,8 +79,7 @@ class MultiProcessDataLoader(torch.utils.data.DataLoader):
datapipe : DataPipe
The data pipeline.
num_workers : int, optional
Number of worker processes. Default is 0, which is identical to
:class:`SingleProcessDataLoader`.
Number of worker processes. Default is 0.
persistent_workers : bool, optional
If True, the data loader will not shut down the worker processes after a
dataset has been consumed once. This allows to maintain the workers
Expand Down
Loading

0 comments on commit 2968c9b

Please sign in to comment.