Most modern natural-language-processing applications are built on top of pretrained language models, which encode the probabilities of word sequences for entire languages. Over time, these models have trended larger and larger, into the regime of billions or even trillions of parameters.
Training these models within a reasonable amount of time requires very large computing clusters, whose large communication volume can block computation, resulting in low or inefficient GPU utilization. Communication between the GPUs needs to be carefully managed to avoid becoming a performance bottleneck.
Microsoft’s DeepSpeed distributed-training library introduced one such management technique, called the Zero Redundancy Optimizer (ZeRO). ZeRO works by partitioning the state of a machine learning model across distributed workers and fetching the necessary model state from other workers as needed during training. ZeRO has several “stages,” each of which allows the training of larger models through reducing memory requirements, typically at the cost of additional communication volume.
While Microsoft researchers were able to achieve ideal scaling performance with this technique, they reported experiments only on a specialized hypercluster that uses expensive high-speed InfiniBand networking (specifically, an Nvidia DGX system).
To reduce costs for customers in need of high-performance computing, Amazon Web Services (AWS) uses an Elastic Fabric Adapter (EFA) network instead of InfiniBand. The EFA available on instances of AWS’s p4d.24xlarge computational infrastructure has less communication bandwidth than InfiniBand on the Nvidia DGX hypercluster, so we would expect some performance dropoff for bandwidth-intensive tasks. When we tried to reproduce Microsoft’s results, however, we found that the relative dropoff in ZeRO’s third stage was twice that of the dropoff in the second stage.
We profiled the training process to look for bottlenecks and observed that in ZeRO Stage 3, communication dominated training time. We have made a series of optimizations to ZeRO Stage 3 in order to close the performance gap relative to results obtained on InfiniBand-equipped DGX clusters. Below is a table showing the overall performance improvement conferred by our optimizations, measured when training a RoBERTa language model on AWS p4d.24xlarge instances.
Model | Number of GPUs | TFLOPS/GPU |
RoBERTa-10B | 64 | Optimized: 123 teraflops Unoptimized: 73 teraflops |
RoBERTa-50B | 64 | Optimized: 154 teraflops Unoptimized: 89 teraflops |
In January, we merged our optimizations into the DeepSpeed code repository for public use.
Optimizations
Our optimizations can roughly be categorized as (1) improving overlap between communication and computation, (2) improving bandwidth utilization, and (3) improving memory efficiency.
Synchronization/Parallelism
Finer-grained synchronization between communication and computation streams
In lower-bandwidth or large clusters where communication times dominate, it is critical to mask communication costs by overlapping computation with communication. Through profiling, we found that this overlapping was limited by ZeRO’s overly coarse synchronization.
This resulted in a suboptimal level of overlapping for two distributed-computing operations: allgather, which aggregates data (in this case, model parameters) from all workers across the network, and reduce-scatter, which reduces data (in this case, summing gradients) across workers. These two operations were causing poor GPU utilization because communication was constantly blocking computation operations. In response, we made significant changes to the parameter gathering and gradient reduce-scatter paths to reduce or remove synchronization while maintaining correctness.
After these changes, we were able to achieve much better overlapping and thus much fewer and smaller computation bubbles.
Precomputation/caching of Python fetching and partitioning decisions
During training, many complex decisions need to be made, relating to which parameters should be fetched, which parameters will be used next, which parameters may be reused soon and should be kept, and which can be released. These operations were slow enough to frequently prevent the Python process from keeping GPUs fed with work, creating large computation bubbles.
We optimized this by precomputing or caching as many decisions as possible, speeding up their computation to the point that it has become a nonfactor for training throughput.
Communication/bandwidth use
Batching allgather/reduce-scatter calls
We found that batching the collective operations — allgather and reduce-scatter — uses bandwidth more efficiently and amortizes the fixed costs of running the computational kernels that execute the operations. To implement collective batching, we flatten tensor data into a single, contiguous buffer to be sent in a single transaction. Each collective requires a special interleaving scheme to ensure that each worker receives the correct data.
Memory
Our implementation of ZeRO, like the Microsoft implementation, uses the Compute Unified Device Architecture (CUDA), Nvidia’s parallel-computing platform. CUDA memory allocations are both synchronous and slow (ignoring the stream-ordered alternatives cudaMallocAsync and cudaMemcpyAsync, which are not yet used in PyTorch), so PyTorch uses a caching allocator to avoid the large costs of constantly reallocating memory. If there are no cached or free blocks for an allocation request, the allocator will flush its cache. This is disastrous for a few reasons:
- Before the flush can begin, several cudaEventSynchronize calls are necessary to allow computation on held memory to complete. This and the subsequent cudaFree calls can take multiple seconds.
- Different workers are not guaranteed to flush their caches simultaneously. This means that for any collective, if even a single worker is currently flushing its cache, the other N-1 workers sit blocked waiting for that worker to join. As cluster size increases, so does the probability that at least one worker is flushing its cache for any given collective.
- After the cache flush, subsequent allocations require cudaMalloc calls, which as mentioned earlier are both synchronous and slow.
For these reasons, memory efficiency is critical for performance.
Memory-efficient batched PyTorch collectives
Although our use of batched collectives significantly reduced kernel launch overhead and improved bandwidth utilization, it also increased memory consumption because of its flattening of batched tensors into an additional buffer.
To avoid redundant flatten operations in PyTorch collectives, we used the *_base variants of the collective operations, which accept pre-flattened tensors, avoiding the need to internally allocate additional flattened buffers. In future work, we plan to use group-based batching operations from the Nvidia Collective Communications Library (NCCL) to eliminate all flattening operations.
More aggressive initialization-time defragmentation of parameter partitions
Even with more than 10GB of free GPU memory, we continued to see evidence of allocator cache flushes, suggesting memory fragmentation. In order to reduce this, we made initialization-time defragmentation changes to move all persisted tensors into a single contiguous buffer.
Miscellaneous
In addition to the optimizations described above, we also
- optimized gradient normalization by reducing host-device data movement and synchronization and pulling math operations out of a for-loop into a single kernel launch with parallelized computation; and
- removed tensor operations (.norm()) that were being added to debug messages via string formatting. (These were causing copies from host to device, which meant data movement and host-device synchronization.)
By making DeepSpeed ZeRO Stage 3 performant on widely available public cloud offerings, we hope to further democratize the training of large language models.
Acknowledgments: Zhen Zhang, Stephen Rawls, Yida Wang