diff --git a/_posts/2023-08-18-GPT-lite-data-parallelism.md b/_posts/2023-08-18-GPT-lite-data-parallelism.md
index dd7e53b..9ee6936 100644
--- a/_posts/2023-08-18-GPT-lite-data-parallelism.md
+++ b/_posts/2023-08-18-GPT-lite-data-parallelism.md
@@ -12,13 +12,13 @@ Distributed data parallelism (DDP) refers to the parallel execution of different
{: style="text-align:center; font-size: small;"}
-An illustration of the DDP data layout split across 4 processes, here color-coded as blue, yellow, read and gree processes.
+An illustration of the DDP data layout, split across 4 processes colorcoded as blue, yellow, read and green.
-In this post, we will perform distributed data parallelism on the training process on the [GPT-lite model we built in the previous post]({{ site.baseurl }}{% post_url 2023-02-28-GPT-lite %}), on a network of 8 GPUs, using PyTorch's [DistributedDataParallel module](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html) and [DeepSpeed ZeRO](https://arxiv.org/abs/1910.02054) (Zero Redundancy Optimizer, a lightweight wrapper on PyTorch).
+In this post, we will perform distributed data parallelism on the training process of the [GPT-lite model we built in the previous post]({{ site.baseurl }}{% post_url 2023-02-28-GPT-lite %}), on a network of 8 GPUs, using PyTorch's [PyTorch](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html) and [DeepSpeed ZeRO](https://arxiv.org/abs/1910.02054) (Zero Redundancy Optimizer, a lightweight wrapper on PyTorch).
There are two main data parallelism approaches:
-**Distributed Data Parallel** keeps a full copy of the model (weights, optimizer parameters and gradients) in all processors. All models are initialized equally. Each processor takes as input to its model a different minibatch and performs a forward pass to compute the loss. On the backward pass, at every layer of the model, all processes compute the gradients that relate to the data batch in the forward pass, and perform an all-reduce to get the mean gradients across all processors. This is then used to update the optimizer states. Because parameters are initialized equally, the gradients are reduced and all parameters perform the same updates, all keeping the models in an identical state throughout the execution.
+**Distributed Data Parallelism** keeps a full copy of the model (weights, optimizer parameters and gradients) on each processor. All models are initialized equally. Each processor takes as input a different minibatch and performs a forward pass to compute the loss that relates to that batch. On the backward pass, at every layer of the model, all processes compute the gradients of that batch, and perform an all-reduce to get the mean gradients across all processors. This is then used to update the optimizer states. Because parameters are initialized equally, and the gradients are mean-reduced, and all parameters perform the same updates, the models across all processors are kept in an identical state throughout the execution.
{: style="text-align:center; font-size: small;"}
@@ -28,21 +28,21 @@ Workflow for distributed data parallelism (DDP). All processors have a copy of t
Looking at the previous, we can see that each processor holds a copy of the model and this leads to a superfluos memory usage. That's where sharding comes into play.
-**(Fully-)Sharded Data Parallelism (FSDP)** a.k.a **sharding** is a distributed setup where processors dont hold a full copy of the model, but only the parameters, optimizer states and gradients of disjoint non-overlapping subsets of layers. Different processors input different mini-batches, and activations are not sharded i.e. they are kept in *full shape* on each processor. In DeepSpeed lingo, sharding is called **ZeRO (Zero Redundancy Optimizer)**. ZeRO has several alternative execution modes (*stages*). Each stage represents a different level of memory redundancy, corresponding to different variable types being communicated across processes or stored locally:
+**(Fully-)Sharded Data Parallelism (FSDP)** a.k.a **sharding** is a distributed setup where processors dont hold a full copy of the model, but only the parameters, optimizer states and gradients of a subset of layers. As before, different processors input different mini-batches. In DeepSpeed lingo, sharding goes by the name of **ZeRO (Zero Redundancy Optimizer)**. ZeRO has several alternative execution modes (**stages**). Each stage represents a different level of memory redundancy, corresponding to different variable types being distributed or replicated across nodes:
- **ZeRO stage 0**: no sharding of any variables, being equivalent to Distributed Data Parallelism;
- **ZeRO stage 1 (ZeRO-1)**: the optimizer states (e.g., for Adam optimizer, the weights, and the first and second moment estimates) are partitioned across the processes. Affects only the backward pass.
- **ZeRO stage 2 (ZeRO-2)**: the gradients for updating the model weights are also partitioned such that each process retains only the gradients corresponding to its portion of the optimizer states. Also relevant only for the backward pass.
- **ZeRO stage 3 (ZeRO-3)**: the model parameters are partitioned across the processes. Includes a communication overhead on both the forward and backward passes.
-An important remark is that in modern models, **a huge chunk of memory is allocated to residual memory (activations, normalization layers, etc) which is not sharded by FSDP**. The following diagram illustrates the workflow of the stage 3 sharding (parameters, gradients and optimizer states):
+An important remark is that activations are not sharded i.e. they are kept in *full shape* on each processor. And in modern models, **a huge chunk of memory is allocated to residual memory (activations, normalization layers, etc) which is not sharded by FSDP**. With that in mind, the following diagram illustrates the workflow of the stage 3 sharding (of parameters, gradients and optimizer states):
{: style="text-align:center; font-size: small;"}
{: style="text-align:center; font-size: small;"}
-Workflow for stage 3 sharding. Each processors contains a non-overlap subset (layer-wise split) of parameters, gradients and optimiser data. Each processor loads a different data batch. During forward or backward passes, when computing is needed for a given layer, the process who is responsible for those layers will broadcasts its values to the remaining processes. Example for processor 1 (yellow): **Data loading:** yellow process loads data sample 1. $$\,\,\,$$ **Forward pass:** yellow process receives the parameters from rank 0 (blue) and computes its activations for layer 1. Afterwards, yellow process broadcasts its parameters to ranks 0, 2 and 3, so that they compute their activations for layer 2. Activations for layer 2 and 3 are computed similarly to layer 1. $$\,\,\,$$ **Backward pass:** green process (3) broadcast parameters to all other processes. Each process can use its activations and the received parameters to compute the gradients for the top layer. All processes gather their local grandients in process 3 that will use it to update the parameters of the last layer. For the remaining layer, the same happens, where the new process that be the one doing the broadcast of parameters, gather of gradients and update of local parameters.
+Workflow for stage 3 sharding. Each processor contains a non-overlapping subset (layer-wise split) of parameters, gradients and optimiser data. Each processor loads a different data batch. During forward and backward passes, when computing is needed for a given layer, the process who is responsible for those layers will broadcasts its values to the remaining processes. Example for processor 1 (yellow): **Data loading:** yellow process loads data sample 1. **Forward pass:** yellow process receives the parameters from rank 0 (blue) and computes the activations for layer 1. Afterwards, yellow process broadcasts its parameters to ranks 0, 2 and 3, so that they compute their activations for layer 2. Activations for layer 3 and 4 are computed similarly to layer 1, led by the red and green processes, specifically. **Backward pass:** the green process (3) broadcasts parameters to all other processes. Each process can use its activations and the received parameters to compute the gradients for the top layer. All processes gather their local gradients in process 3 that will use it to update the parameters of the last layer. For the remaining layers, the same happens, where the red, yellow and blue processes will be the ones doing the broadcast of parameters and gather of gradients (iteratively).
-The higher the stage, the more communication we incur, and the less memory we require locally. These memory improvements are summarized in the [Microsoft Research blog](https://www.microsoft.com/en-us/research/blog/zero-deepspeed-new-system-optimizations-enable-training-models-with-over-100-billion-parameters/) as:
+The higher the stage, the more communication we require, but the less memory we consume. These memory improvements are summarized in the [Microsoft Research blog](https://www.microsoft.com/en-us/research/blog/zero-deepspeed-new-system-optimizations-enable-training-models-with-over-100-billion-parameters/) as:
{: style="text-align:center; font-size: small;"}
@@ -50,29 +50,18 @@ The higher the stage, the more communication we incur, and the less memory we re
{: style="text-align:center; font-size: small;"}
Memory consumption of the three different stages of ZeRO FSDP. Source: [Microsoft Research blog](https://www.microsoft.com/en-us/research/blog/zero-deepspeed-new-system-optimizations-enable-training-models-with-over-100-billion-parameters/)
-Additionaly, on top of stages 1 and 2, we can enable [ZeRO-Offload](https://www.deepspeed.ai/tutorials/zero-offload/), a system for offloading optimizer and gradient states to CPU memory. On top of stage 3, we can enable [**ZeRO-Infinity**](https://arxiv.org/abs/2104.07857), also an offloading engine that extends ZeRO-offload with support to NVMe memory. According to the [ZeRO-3 documentation](https://deepspeed.readthedocs.io/en/stable/zero3.html#zero), "ZeRO-Infinity has all of the savings of ZeRO-Offload, plus is able to offload more the model weights and has more effective bandwidth utilization and overlapping of computation and communication".
+Additionaly, on top of stages 1 and 2, we can enable [**ZeRO-Offload**](https://www.deepspeed.ai/tutorials/zero-offload/), a system for offloading optimizer and gradient states to CPU memory. On top of stage 3, we can enable [**ZeRO-Infinity**](https://arxiv.org/abs/2104.07857), also an offloading engine that extends ZeRO-offload with support to NVMe memory. According to the [ZeRO-3 documentation](https://deepspeed.readthedocs.io/en/stable/zero3.html#zero), "ZeRO-Infinity has all of the savings of ZeRO-Offload, plus is able to offload more the model weights and has more effective bandwidth utilization and overlapping of computation and communication".
## Model and dataset setup
-We start by taking our previous *GPT-lite* implementation and matching the architecture of the model to the *GPT-2 Small* model description in [Language Models are Few-Shot Learners](https://arxiv.org/abs/2005.14165) (Fig 2.1):
+We start out implementation by taking our previous *GPT-lite* with the specs matching the *GPT-2 Small* model in [Language Models are Few-Shot Learners](https://arxiv.org/abs/2005.14165) (Fig 2.1):
```python
-## gptlite.py
-
-# depth of the network as number of decoder blocks.
-n_layer = 12
-
-# size of the embeddings (d_model)
-n_embd = 768
-
-# number of attention heads in the Multi-Attention mechanism
-n_head = 12
-
-# block size ie max number of training sequence, the $n_{ctx}$ in the paper .
-block_size = 2048
-
-# dropout rate (variable p) for dropout units
-dropout = 0.1
+n_layer = 12 # depth of the network as number of decoder blocks.
+n_embd = 768 # size of the embeddings (d_model)
+n_head = 12 # number of attention heads in the Multi-Attention mechanism
+block_size = 2048 # block size ie max number of training sequence, the $n_{ctx}$ in the paper .
+dropout = 0.1 # dropout rate (variable p) for dropout units
```
We then define the methods `get_model()` and `get_dataset()` that return our model and the [tiny shakespeare](https://github.com/karpathy/char-rnn/blob/master/data/tinyshakespeare/input.txt) dataset:
@@ -112,7 +101,7 @@ def get_model(vocab_size):
### Detour: using a pre-existing model {#torchvision-model}
-Note that code this code is applicable to any model of type `torch.nn.Module` and any dataset of type `torch.utils.data.Dataset`. As an example. jf you'd want to perform a multi-class classification using the `ResNet` network on the `CIFAR10` dataset available in `torchvision`, you'd define the previous 2 methods as:
+Note that code this code is applicable to any model of type `torch.nn.Module` and any dataset of type `torch.utils.data.Dataset`. As an example. if you wanted to perform a multi-class classification using the `ResNet` network on the `CIFAR10` dataset available in `torchvision`, you could define the previous 2 methods as:
```python
import torchvision
@@ -136,7 +125,7 @@ As a relevant remark, pre-existing models do not define activation checkpointing
## PyTorch implementation
-We start by colleting the global variables that are set by the [`torchrun`](https://pytorch.org/docs/stable/elastic/run.html) launcher (detailed below):
+We will now impement data parallelism in PyTorch. Firstly, we collect the global variables that are set by the [`torchrun`](https://pytorch.org/docs/stable/elastic/run.html) launcher (detailed below), as they're important to uniquely identify processes in the network and GPU devices within a node:
```python
import os
@@ -152,41 +141,45 @@ dataloader = torch.utils.data.DataLoader(
dataset, batch_size=4, sampler=sampler)
```
-Note the argument `sampler`, that is a [`DistributedSampler`](DistributedSampler) that will delegate different samples from the dataloader to different processes.
+Note the argument `sampler`, that is a [`DistributedSampler`](DistributedSampler) that will delegate different samples from the dataloader to different processes. Without this, all processes would load exactly the same datapoints in every iteration.
```python
sampler = torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=world_size, rank=rank)
```
-We then place each model in a different GPU and wrap it with the [`DistributedDataParallel`](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html) or [`FullyShardedDataParallel`](https://pytorch.org/docs/stable/fsdp.html) wrapper:
+We then place each model in a different GPU with the correct data type:
```python
device = f"cuda:{local_rank}"
dtype = torch.bfloat16
model = model.to(device=device, dtype=dtype)
+```
-if parallelism == "DDP":
- model = torch.nn.parallel.DistributedDataParallel(
- model, device_ids=[device],)
-elif parallelism == "FSDP":
- model = torch.distributed.fsdp.FullyShardedDataParallel(
- model,
- device_id=self.device,
- sharding_strategy=torch.distributed.fsdp.api.ShardingStrategy.SHARD_GRAD_OP, # define the stage here
- )
+and we finally wrap it with the [`DistributedDataParallel`](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html) for the DDP implementation:
+
+```python
+model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])
```
-And that's it. Now you can write your training loop normally and torch will do all under-the-hood communication and synchronization for you:
+or with the [`FullyShardedDataParallel`](https://pytorch.org/docs/stable/fsdp.html) wrapper for the sharded implemetation, as:
```python
-criterion = torch.nn.CrossEntropyLoss()
+model = torch.distributed.fsdp.FullyShardedDataParallel(
+ model,
+ device_id=self.device,
+ sharding_strategy=torch.distributed.fsdp.api.ShardingStrategy.SHARD_GRAD_OP, # define the stage here
+)
+```
+
+And that's it. Now you can write your training loop normally and torch will do all the communication and synchronization under the hood:
+```python
for step, data in enumerate(dataloader):
inputs = data[0].to(engine.device)
labels = data[1].to(engine.device)
outputs = engine(inputs) # fwd pass
- loss = criterion(outputs, labels) # loss
+ loss = torch.nn.functional.cross_entropy(outputs, labels) # loss
loss.backward() # compute gradients
optimizer.step() # update parameters
optimizer.zero_grad(set_to_none=True)
@@ -196,24 +189,24 @@ Because gradients are computed and mean-reduced from the top to the bottom layer
> The backward pass iteratively computes gradients (from last to first layer) and collects blocks of gradients to be communicated. These blocks will be mean-reduced asynchronously during the backward pass, while the computation for the backward pass proceeds. Therefore it overlaps backward pass computation with gradients communication. At the end of the backward pass, all GPUs wait for all gradient all-reduces to finish, and then triggers the parameter updates.
-As a side note, offloading of tensors can also be achieved via PyTorch by using custom [hooks for autograd saved tensors](https://pytorch.org/tutorials/intermediate/autograd_saved_tensors_hooks_tutorial.html).
+For extra memory savings, offloading of tensors can also be achieved via PyTorch by using custom [hooks for autograd saved tensors](https://pytorch.org/tutorials/intermediate/autograd_saved_tensors_hooks_tutorial.html).
-Finally, you can launch the application by calling `torchrun`. Torchrun is a network bootstrapper that spaws a python script across compute all compute nodes in a network and set a the environmental variables that allows processes to be uniquely identifiable in the network. The usage is simple:
+Finally, you can launch the application by calling [`torchrun`](https://pytorch.org/docs/stable/elastic/run.html). Torchrun is a network bootstrapper that spaws a python script across compute all compute nodes in a network and sets the previous environmental variables that allow for processes to be uniquely identifiable across the network. The usage is simple:
```shell
$ torch --standalone, --nproc_per_node=4, ./train.py
```
+
where `nproc_per_node` is the number of GPUs on each node.
## DeepSpeed implementation
-Implementing an existing code in DeepSpeed is pretty simple.
-DeepSpeed features can be activated via the deepspeed API or its [Configuration JSON](https://www.deepspeed.ai/docs/config-json/). The number of possible optimizations is large, as it defines parallelism, floating point precision, logger, communication parameters, etc. Here we start with a simple config, where we configure the DeepSpeed logger to output memory and throughput info at every 10 epochs (`steps_per_print`), we set the batch size to `4` and (optionally) define the settings of the optimizer (`optimizer`) and learning rate scheduler (`scheduler`):
+Implementing an existing code in DeepSpeed is pretty simple. To start, DeepSpeed features can be activated via the deepspeed API or its [Configuration JSON](https://www.deepspeed.ai/docs/config-json/). The number of possible optimizations is large, as it can defines parallelism, floating point precision, logger, communication parameters, etc. In our implementation, we will start with a simple config file, where we configure the DeepSpeed logger to output memory and throughput info at every 10 epochs (`steps_per_print`), we set the batch size to `256` and (optionally) define the settings of the optimizer (`optimizer`) and learning rate scheduler (`scheduler`):
```json
{
- "train_batch_size": 4,
+ "train_batch_size": 256,
"steps_per_print": 10,
"optimizer": {
"type": "AdamW",
@@ -240,7 +233,7 @@ train_batch_size = micro_batch_size_per_gpu * num_gpus * num_nodes * gradient_ac
}
```
-We can enable [**ZeRO-Infinity**](https://arxiv.org/abs/2104.07857) for offloading of several variables in memory to CPU and VNMe for huge memory savings. It is only compatible with ZeRO-3 and can be enabled with:
+CPU-offloading is called [**ZeRO-Infinity**](https://arxiv.org/abs/2104.07857) and performs offloading of several variables in memory to CPU and VNMe, providing huge memory savings. It is only compatible with ZeRO-3 and can be enabled with:
```json
{
@@ -252,9 +245,9 @@ We can enable [**ZeRO-Infinity**](https://arxiv.org/abs/2104.07857) for offloadi
}
```
-There's also similar field for [ZeRO-Offload](https://www.deepspeed.ai/tutorials/zero-offload/) for stage 2.
+There's also similar field called [ZeRO-Offload](https://www.deepspeed.ai/tutorials/zero-offload/) for stage 2.
-Once we have our config file, it's pretty simple. All boilerplate that PyTorch requires for parallelism and data loaders is managed internally by DeepSpeed. So the setup is pretty straightforward:
+We're almost done now. Once we have our config file properly calibrated, the implementation is straighforward. All boilerplate that PyTorch requires for parallelism and data loaders is managed internally by DeepSpeed. So we only need to setup DeepSpeed as:
```python
def main_deepspeed(n_epochs=100, random_seed=42):
@@ -271,7 +264,7 @@ def main_deepspeed(n_epochs=100, random_seed=42):
config=config, model=model, training_data=train_dataset,) # initialize deepspeed
```
-We then write the training loop, with a structure very similar to the PyTorch implementation. The only exception is that we don't perform zeroing of gradients, as this is managed internally by DeepSpeed. Also, `initialize()` already returns a `train_dataloader` that that assigns disjoint subsets of data to each process, so we dont need to fiddle with samplers.
+We then write the training loop, with a structure very similar to the PyTorch implementation. The only exception is that we don't perform zeroing of gradients, as this is managed internally by DeepSpeed. Also, `initialize()` already returns a `train_dataloader` that assigns disjoint subsets of data to each process, so we dont need to fiddle with distributed dataloaders and samplers.
```python
for epoch in range(n_epochs):
@@ -285,7 +278,7 @@ We then write the training loop, with a structure very similar to the PyTorch im
engine.step() # update weights, no need for zero-ing
```
-Finally, we can launch our run with the `torchrun` launcher as shown previously, or with the launcher included in DeepSpeed as:
+Finally, we can launch our run with the `torchrun` launcher as before, or with the launcher included in DeepSpeed as:
```shell
$ deepspeed --num_gpus=8 train.py --deepspeed --deepspeed_config ds_config.json