> [!info] > Source Code to follow along: https://github.com/jhuboo/ddp-pytorch # Distributed Data Parallel (DDP) in PyTorch ## Introduction This series explores Distributed Data Parallel in PyTorch. Distributed training allows us to leverage multiple machines or GPUs, significantly reducing training time and enabling us to train larger models that wouldn't fit on a single GPU. We’ll start with simple examples on local machines and scale up to multi-GPU and multi-machine setups. By the end, you’ll be able to use DDP to train large language models like GPT efficiently. ## What is Distributed Data Parallel (DDP)? ### High-Level Overview In a typical non-distributed training setup, a model runs on a single GPU. The model processes input data, computes the loss during the forward pass, and updates model parameters using gradients computed in the backward pass. This process is straightforward but limited to the capacity of one GPU. DDP scales this process by distributing the model and data across multiple GPUs. Each GPU runs a separate process with its own model replica. These replicas are synchronized to ensure they remain identical throughout training. ### Key Concepts - **Model Replicas**: Each GPU has a local copy of the model. All replicas start with identical parameters and use the same random seed for optimizers. - **Data Parallelism**: Input data is divided among GPUs using a `DistributedSampler`. Each GPU processes different data batches concurrently, effectively increasing the training data processed per step. - **Synchronization**: Gradients are synchronized using the bucketed Ring-AllReduce algorithm, which overlaps gradient computation with communication to minimize idle time. - **Optimizer Step**: After synchronization, the optimizer updates the model parameters, keeping all replicas in sync. ### DDP Workflow 1. **Initialization**: DDP launches one process per GPU. 2. **Forward Pass**: Each process performs a forward pass with its input batch. 3. **Backward Pass**: Each process computes gradients. 4. **Gradient Synchronization**: Gradients are aggregated across all processes. 5. **Optimizer Step**: Parameters are updated uniformly across all replicas. ## Single Node Multi-GPU Training with DDP ### Single GPU Training Setup Here’s a basic `Trainer` class for single GPU training: ```python class Trainer: def __init__(self, model, train_data, optimizer, device): self.model = model.to(device) self.train_data = train_data self.optimizer = optimizer self.device = device def _run_epoch(self, epoch): for batch in self.train_data: inputs, targets = batch inputs, targets = inputs.to(self.device), targets.to(self.device) self.optimizer.zero_grad() outputs = self.model(inputs) loss = self.compute_loss(outputs, targets) loss.backward() self.optimizer.step() def train(self, epochs): for epoch in range(epochs): self._run_epoch(epoch) ``` ### Migrating to Multi-GPU with DDP To migrate to DDP, we need to make several modifications: 1. **Import Necessary Modules**: ```python import torch.multiprocessing as mp from torch.utils.data.distributed import DistributedSampler from torch.nn.parallel import DistributedDataParallel as DDP import torch.distributed as dist ``` 2. **Initialize the Process Group**: ```python def setup(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '12355' dist.init_process_group("nccl", rank=rank, world_size=world_size) ``` 3. **Modify the Trainer Class**: ```python class Trainer: def __init__(self, model, train_data, optimizer, device_ids): self.model = DDP(model.to(device_ids[0]), device_ids=device_ids) self.train_data = train_data self.optimizer = optimizer self.device_ids = device_ids def _run_epoch(self, epoch): for batch in self.train_data: inputs, targets = batch inputs, targets = inputs.to(self.device_ids[0]), targets.to(self.device_ids[0]) self.optimizer.zero_grad() outputs = self.model(inputs) loss = self.compute_loss(outputs, targets) loss.backward() self.optimizer.step() def train(self, epochs): for epoch in range(epochs): self._run_epoch(epoch) ``` 4. **Adjust DataLoader with DistributedSampler**: ```python train_sampler = DistributedSampler(train_dataset) train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler) ``` 5. **Main Function for DDP Setup**: ```python def main(rank, world_size): setup(rank, world_size) model = Net() train_data = DataLoader(...) optimizer = torch.optim.SGD(model.parameters(), lr=0.01) trainer = Trainer(model, train_data, optimizer, [rank]) trainer.train(epochs) cleanup() def cleanup(): dist.destroy_process_group() if __name__ == "__main__": world_size = torch.cuda.device_count() mp.spawn(main, args=(world_size,), nprocs=world_size, join=True) ``` ## Fault-Tolerant Distributed Training with `torchrun` Fault tolerance is crucial for large-scale training. PyTorch's `torchrun` provides robust fault tolerance by taking snapshots of the training job at regular intervals. If a failure occurs, `torchrun` restarts the processes and resumes training from the latest snapshot. ### Modifying Code for `torchrun` 1. **Remove Explicit Environment Variable Setup**: ```python def setup(): dist.init_process_group("nccl") ``` 2. **Use `torchrun` Environment Variables**: ```python local_rank = int(os.environ["LOCAL_RANK"]) device = torch.device(f'cuda:{local_rank}') ``` 3. **Snapshot Saving and Loading**: ```python class Trainer: def __init__(self, ...): ... def save_snapshot(self): snapshot = {'model_state': self.model.module.state_dict(), 'epoch': self.epoch} torch.save(snapshot, 'snapshot.pt') def load_snapshot(self): snapshot = torch.load('snapshot.pt') self.model.module.load_state_dict(snapshot['model_state']) self.epoch = snapshot['epoch'] ``` 4. **Update Main Function**: ```python def main(): setup() model = Net() train_data = DataLoader(...) optimizer = torch.optim.SGD(model.parameters(), lr=0.01) trainer = Trainer(model, train_data, optimizer, [rank]) if os.path.exists('snapshot.pt'): trainer.load_snapshot() trainer.train(epochs) cleanup() ``` Run the training script with `torchrun`: ```sh torchrun --standalone --nproc_per_node=4 train.py ``` ## Multi-Node Training Multi-node training involves distributing the training job across multiple machines. The setup is similar to multi-GPU training but requires additional configurations for inter-node communication. ### Multi-Node Setup with `torchrun` 1. **Global Rank and Node Rank**: ```python global_rank = int(os.environ["RANK"]) node_rank = int(os.environ["NODE_RANK"]) world_size = int(os.environ["WORLD_SIZE"]) ``` 2. **Initialize Process Group**: ```python dist.init_process_group(backend="nccl", init_method="env://") ``` 3. **Launching Multi-Node Training**: ```sh torchrun --nnodes=2 --nproc_per_node=4 --node_rank=0 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=node0:29500 train.py ``` Repeat the command on each node, adjusting `--node_rank` accordingly. ### Using Slurm for Multi-Node Training Slurm is a workload manager that simplifies multi-node training jobs. 1. **Slurm Script**: ```sh #!/bin/bash #SBATCH --nodes=2 #SBATCH --ntasks-per-node=4 #SBATCH --cpus-per-task=4 #SBATCH --gpus-per-task=1 srun torchrun --nnodes=2 --nproc_per_node=4 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=$SLURM_NODELIST:29500 train.py ``` 2. **Submit Job**: ```sh sbatch slurm_script.sh ``` ## Training a GPT Model with DDP ### Model and Training Setup We use Andrej Karpathy’s minGPT repository as the base. The project is organized with separate files for the model and trainer. We modify the code to integrate DDP. 1. **Project Structure**: ``` - model.py - trainer.py - char_dataset.py - main.py - config.yaml ``` 2. **Model Implementation (`model.py`)**: ```python class GPT(nn.Module): def __init__(self, config): super().__init__() self.transformer = nn.Transformer(...) self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) def forward(self, x): return self.lm_head(self.transformer(x)) ``` 3. **Trainer Implementation (`trainer.py`)**: ```python class Trainer: def __init__(self, model, optimizer, train_data, test_data, config): self.model = DDP(model) self.optimizer = optimizer self.train_data = train_data self.test_data = test_data self.config = config def train(self): for epoch in range(self.config.epochs): self._train_epoch (epoch) self._evaluate() def _train_epoch(self, epoch): for batch in self.train_data: inputs, targets = batch outputs = self.model(inputs) loss = self.compute_loss(outputs, targets) loss.backward() self.optimizer.step() ``` 4. **Main File (`main.py`)**: ```python def main(): setup() config = load_config() model = GPT(config.model) optimizer = torch.optim.Adam(model.parameters(), lr=config.optimizer.lr) train_data = DataLoader(...) test_data = DataLoader(...) trainer = Trainer(model, optimizer, train_data, test_data, config) trainer.train() cleanup() ``` ### Running the Training Job For single node: ```sh torchrun --standalone --nproc_per_node=4 main.py ``` For multi-node with Slurm: 1. **Slurm Script**: ```sh #!/bin/bash #SBATCH --nodes=2 #SBATCH --ntasks-per-node=4 #SBATCH --gpus-per-node=4 srun torchrun --nnodes=2 --nproc_per_node=4 --rdzv_id=123 --rdzv_backend=c10d --rdzv_endpoint=$SLURM_NODELIST:29500 main.py ``` 2. **Submit Job**: ```sh sbatch slurm_script.sh ``` This concludes this series on Distributed Data Parallel in PyTorch. By now, you should have a comprehensive understanding of how to set up and run distributed training jobs on single and multi-GPU setups, as well as across multiple nodes. Additionally, you should be equipped to implement fault-tolerant training and train complex models like GPT using DDP.