mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-11-03 15:48:06 +00:00
[doc] migrate the markdown files (#2652)
This commit is contained in:
124
docs/source/en/advanced_tutorials/add_your_parallel.md
Normal file
124
docs/source/en/advanced_tutorials/add_your_parallel.md
Normal file
@@ -0,0 +1,124 @@
|
||||
# Add Your Own Parallel Mode
|
||||
|
||||
Author: Shenggui Li, Yongbin Li
|
||||
|
||||
**Prerequisite:**
|
||||
- [Define Your Configuration](../basics/define_your_config.md)
|
||||
- [Configure Parallelization](../basics/configure_parallelization.md)
|
||||
|
||||
## Introduction
|
||||
|
||||
To enable researchers and engineers to extend our system to other novel large-scale distributed training algorithm
|
||||
with less effort, we have decoupled various components in the training lifecycle. You can implement your own
|
||||
parallelism by simply inheriting from the base class.
|
||||
|
||||
The main components are:
|
||||
|
||||
1. `ProcessGroupInitializer`
|
||||
2. `GradientHandler`
|
||||
3. `Schedule`
|
||||
|
||||
**This currently requires some code to the source code, thus we recommend that you install from source with the `-e` flag.
|
||||
`-e` flag makes the installation editable, thus, your code change will be reflected in your Python runtime.
|
||||
We will work on this to avoid change to source code in future releases.**
|
||||
|
||||
|
||||
## Process Group Initializer
|
||||
|
||||
Parallelism is often managed by process groups where processes involved in the same parallel algorithm are placed in the same
|
||||
process group. For different parallel algorithms, different process groups need to be created. Colossal-AI provides a
|
||||
global context for users to easily manage their process groups. If you wish to add new process group, you can easily
|
||||
define a new class and set it in your configuration file. To define your own way of creating process groups, you can
|
||||
follow the steps below to create a new distributed initialization.
|
||||
|
||||
1. Add your parallel mode in `colossalai.context.parallel_mode.ParallelMode`.
|
||||
```python
|
||||
class ParallelMode(Enum):
|
||||
GLOBAL = 'global'
|
||||
DATA = 'data'
|
||||
PIPELINE = 'pipe'
|
||||
...
|
||||
|
||||
NEW_MODE = 'new_mode' # define your mode here
|
||||
```
|
||||
|
||||
2. Create a `ProcessGroupInitializer`. You can refer to examples given in `colossalai.context.dist_group_initializer`. The
|
||||
first six arguments are fixed. `ParallelContext` will pass in these arguments for you. If you need to set other
|
||||
arguments, you can add it behind like the `arg1, arg2` in the example below. Lastly, register your initializer to the
|
||||
registry by adding the decorator `@DIST_GROUP_INITIALIZER.register_module`.
|
||||
```python
|
||||
# sample initializer class
|
||||
@DIST_GROUP_INITIALIZER.register_module
|
||||
class MyParallelInitializer(ProcessGroupInitializer):
|
||||
|
||||
def __init__(self,
|
||||
rank: int,
|
||||
world_size: int,
|
||||
config: Config,
|
||||
data_parallel_size: int,
|
||||
pipeline_parlalel_size: int,
|
||||
tensor_parallel_size: int,
|
||||
arg1,
|
||||
arg2):
|
||||
super().__init__(rank, world_size, config)
|
||||
self.arg1 = arg1
|
||||
self.arg2 = arg2
|
||||
# ... your variable init
|
||||
|
||||
def init_parallel_groups(self):
|
||||
# initialize your process groups
|
||||
pass
|
||||
|
||||
```
|
||||
|
||||
Then, you can insert your new initializer to the current mode-to-initialize mapping
|
||||
in `colossalai.constants.INITIALIZER_MAPPING`. You can modify the file or insert new key-value pair dynamically.
|
||||
|
||||
```python
|
||||
colossalai.constants.INITIALIZER_MAPPING['new_mode'] = 'MyParallelInitializer'
|
||||
```
|
||||
|
||||
3. Set your initializer in your config file. You can pass in your own arguments if there is any. This allows
|
||||
the `ParallelContext` to create your initializer and initialize your desired process groups.
|
||||
|
||||
```python
|
||||
parallel = dict(
|
||||
pipeline=dict(size=1),
|
||||
tensor=dict(size=x, mode='new_mode') # this is where you enable your new parallel mode
|
||||
)
|
||||
```
|
||||
|
||||
## Gradient Handler
|
||||
|
||||
Gradient handlers are objects which execute the all-reduce operations on parameters' gradients. As different all-reduce
|
||||
strategies may be executed for different kinds of parallelism, users can
|
||||
inherit `colossalai.engine.gradient_handler.BaseGradientHandler` to implement their strategies. Currently, the library
|
||||
uses the normal data parallel gradient handler which all-reduces the gradients across data parallel ranks. The data
|
||||
parallel gradient handler is added to the engine automatically if data parallel is detected. You can add your own
|
||||
gradient handler like below:
|
||||
|
||||
```python
|
||||
from colossalai.registry import GRADIENT_HANDLER
|
||||
from colossalai.engine import BaseGradientHandler
|
||||
|
||||
@GRADIENT_HANDLER.register_module
|
||||
class YourGradientHandler(BaseGradientHandler):
|
||||
|
||||
def handle_gradient(self):
|
||||
do_something()
|
||||
|
||||
```
|
||||
|
||||
Afterwards, you can specify the gradient handler you want to use in your configuration file.
|
||||
|
||||
```python
|
||||
gradient_handlers = [
|
||||
dict(type='YourGradientHandler'),
|
||||
]
|
||||
```
|
||||
|
||||
## Schedule
|
||||
|
||||
Schedule entails how to execute a forward and backward pass. Currently, Colossal-AI provides pipeline and non-pipeline
|
||||
schedules. If you want to modify how the forward and backward passes are executed, you can
|
||||
inherit `colossalai.engine.schedule.BaseSchedule` and implement the `forward_back_step` function.
|
||||
@@ -0,0 +1,36 @@
|
||||
# Define your own parallel model
|
||||
|
||||
Author: Zhengda Bian, Yongbin Li
|
||||
|
||||
> ⚠️ We are working on this documentation to make it more detailed. We will introduce the mechanism of different parallelism
|
||||
> and how to use them to write a model.
|
||||
|
||||
Let's say that you have a huge MLP model with billions of parameters and its extremely large hidden layer size makes it
|
||||
impossible to fit into a single GPU directly. Don't worry, Colossal-AI is here to help you sort things out. With the help of Colossal-AI,
|
||||
you can write your model in the familiar way in which you used to write models for a single GPU, while Colossal-AI automatically
|
||||
splits your model weights and fit them perfectly into a set of GPUs. We give a simple example showing how to write a simple
|
||||
2D parallel model in the Colossal-AI context.
|
||||
|
||||
## Write a simple 2D parallel model
|
||||
|
||||
```python
|
||||
from colossalai.nn import Linear2D
|
||||
import torch.nn as nn
|
||||
|
||||
class MLP_2D(nn.Module):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.linear_1 = Linear2D(in_features=1024, out_features=16384)
|
||||
self.linear_2 = Linear2D(in_features=16384, out_features=1024)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.linear_1(x)
|
||||
x = self.linear_2(x)
|
||||
return x
|
||||
```
|
||||
|
||||
## Use pre-defined model
|
||||
|
||||
For the sake of your convenience, we kindly provide you in our Model Zoo with some prevalent models such as *BERT*, *ViT*, *MoE*,
|
||||
and *GPT*. Feel free to customize them into different sizes to fit into your special needs.
|
||||
@@ -0,0 +1,139 @@
|
||||
# Integrate Mixture-of-Experts Into Your Model
|
||||
|
||||
Author: Haichen Huang
|
||||
|
||||
**Example Code**
|
||||
- [ColossalAI-Examples WideNet](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/widenet)
|
||||
|
||||
**Related Paper**
|
||||
- [Switch Transformers: Scaling to Trillion Parameter Models with Simple and Efficient Sparsity](https://arxiv.org/abs/2101.03961)
|
||||
- [Go Wider Instead of Deeper](https://arxiv.org/abs/2107.11817)
|
||||
|
||||
|
||||
## Introduction
|
||||
|
||||
Since the advent of Switch Transformer, the AI community has found Mixture of Experts (MoE) a useful technique to enlarge the capacity of deep learning models.
|
||||
|
||||
Colossal-AI provides an early access version of parallelism specifically designed for MoE models.
|
||||
The most prominent advantage of MoE in Colossal-AI is convenience.
|
||||
We aim to help our users to easily combine MoE with model parallelism and data parallelism.
|
||||
|
||||
However, the current implementation has two main drawbacks now.
|
||||
The first drawback is its poor efficiency in large batch size and long sequence length training.
|
||||
The second drawback is incompatibility with tensor parallelism.
|
||||
We are working on system optimization to overcome the training efficiency problem.
|
||||
The compatibility problem with tensor parallelism requires more adaptation, and we will tackle this issue in the future.
|
||||
|
||||
Here, we will introduce how to use MoE with model parallelism and data parallelism.
|
||||
|
||||
## Table of Content
|
||||
In this tutorial we will cover:
|
||||
1. Set up MoE running environment
|
||||
2. Create MoE layer
|
||||
3. Train your model
|
||||
|
||||
We provided the [example code](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/widenet) for this tutorial in [ColossalAI-Examples](https://github.com/hpcaitech/ColossalAI-Examples).
|
||||
This example uses [WideNet](https://arxiv.org/abs/2107.11817) as an example of MoE-based model.
|
||||
|
||||
|
||||
## Set up MoE running environment
|
||||
In your project folder, create a `config.py`.
|
||||
|
||||
This file is to specify some features you may want to use to train your model.
|
||||
In order to enable MoE, you need to add a dict called parallel and specify the value of key moe.
|
||||
You can assign a value for the key size of moe, which represents the model parallel size of experts (i.e. the number of experts in one group to parallelize training).
|
||||
|
||||
For example, if the size is 4, 4 processes will be assigned to 4 consecutive GPUs and these 4 processes form a moe model parallel group.
|
||||
Each process on the 4 GPUs will only get a portion of experts. Increasing the model parallel size will reduce communication cost, but increase computation cost in each GPU and activation cost in memory.
|
||||
The total data parallel size is auto-detected and set as the number of GPUs by default.
|
||||
|
||||
```python
|
||||
MOE_MODEL_PARALLEL_SIZE = ...
|
||||
parallel = dict(
|
||||
moe=dict(size=MOE_MODEL_PARALLEL_SIZE)
|
||||
)
|
||||
```
|
||||
|
||||
If `MOE_MODEL_PARALLEL_SIZE = E` and set the number of experts as `E` where `E` is a constant number, the process flow of forward pass of a transformer encoder in a model parallel group is shown below.
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://s2.loli.net/2022/01/28/oI59QcxdteKUTks.png"/>
|
||||
<figcaption>MoE Transformer, image source: <a href="https://arxiv.org/abs/2006.16668">GShard</a></figcaption>
|
||||
</figure>
|
||||
|
||||
Since all experts are allocated to all GPUs in a model parallel group and a GPU only owns a portion of experts,
|
||||
original data parallel groups are no longer correct for the parameters of experts during gradient handling in backward pass anymore.
|
||||
So we create a new kind of parallel group called moe data parallel group.
|
||||
The difference among different kinds of parallel group, when the configuration is set as `WORLD_SIZE=4`,
|
||||
`MOE_MODEL_PARALLEL_SIZE=2`, is shown here.
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://s2.loli.net/2022/01/28/Sn8FpmQPKIiBEq2.png"/>
|
||||
<figcaption>MoE process group</figcaption>
|
||||
</figure>
|
||||
|
||||
|
||||
As for gradient handling, we provide MoeGradientHandler to all-reduce every parameter of the model.
|
||||
If you use `colossalai.initialize` function to create your training engine, the MoE gradient handler will be added to your engine automatically.
|
||||
Otherwise, you should take care of gradient by yourself.
|
||||
All parameters of MoE running environment are stored in colossalai.global_variables.moe_env.
|
||||
You can access your configuration parameters to check whether your setup is correct.
|
||||
```python
|
||||
from colossalai.global_variables import moe_env
|
||||
```
|
||||
|
||||
## Create MoE layer
|
||||
You can create a MoE layer from `colossalai.nn.moe`.
|
||||
But before doing that, you should set up random seeds for all processes like this.
|
||||
|
||||
```python
|
||||
from colossalai.context.random import moe_set_seed
|
||||
from model_zoo.moe.models import Widenet
|
||||
|
||||
moe_set_seed(42)
|
||||
model = Widenet(num_experts=4, capacity_factor=1.2)
|
||||
```
|
||||
|
||||
`moe_set_seed` will set different seed for different processes in a moe model parallel group.
|
||||
This helps initialize parameters in experts.
|
||||
Then create an instance of experts and an instance of router.
|
||||
Here is the example in model zoo.
|
||||
|
||||
```python
|
||||
from colossalai.nn.layer.moe import Experts, MoeLayer, Top2Router, NormalNoiseGenerator
|
||||
|
||||
|
||||
noisy_func = NormalNoiseGenerator(num_experts)
|
||||
shared_router = Top2Router(capacity_factor,
|
||||
noisy_func=noisy_func)
|
||||
shared_experts = Experts(expert=VanillaFFN,
|
||||
num_experts=num_experts,
|
||||
**moe_mlp_args(
|
||||
d_model=d_model,
|
||||
d_ff=d_ff,
|
||||
drop_rate=drop_rate
|
||||
))
|
||||
ffn=MoeLayer(dim_model=d_model, num_experts=num_experts,
|
||||
router=shared_router, experts=shared_experts)
|
||||
```
|
||||
|
||||
Inside the initialization of Experts, the local expert number of each GPU will be calculated automatically. You just need to specify the class of each expert and its parameters used in its initialization. As for routers, we have provided top1 router and top2 router. You can find them in colossalai.nn.layer.moe. After creating the instance of experts and router, the only thing initialized in Moelayer is gate module. More definitions of each class can be found in our API document and code.
|
||||
|
||||
|
||||
## Train Your Model
|
||||
Do not to forget to use `colossalai.initialize` function in `colosalai` to add gradient handler for the engine.
|
||||
We handle the back-propagation of MoE models for you.
|
||||
In `colossalai.initialize`, we will automatically create a `MoeGradientHandler` object to process gradients.
|
||||
You can find more information about the handler `MoeGradientHandler` in colossal directory.
|
||||
|
||||
The loss criterion should be wrapped by `Moeloss` to add auxiliary loss of MoE. Example is like this.
|
||||
```python
|
||||
criterion = MoeLoss(
|
||||
aux_weight=0.01,
|
||||
loss_fn=nn.CrossEntropyLoss,
|
||||
label_smoothing=0.1
|
||||
)
|
||||
```
|
||||
|
||||
Finally, just use trainer or engine in `colossalai` to do your training.
|
||||
Otherwise, you should take care of gradient by yourself.
|
||||
88
docs/source/en/advanced_tutorials/meet_gemini.md
Normal file
88
docs/source/en/advanced_tutorials/meet_gemini.md
Normal file
@@ -0,0 +1,88 @@
|
||||
|
||||
# Meet Gemini:The Heterogeneous Memory Manager of Colossal-AI
|
||||
|
||||
Author: [Jiarui Fang](https://github.com/feifeibear), Yang You
|
||||
|
||||
## Brief
|
||||
|
||||
When you only have a few GPUs for large model training tasks, **heterogeneous training** is the most effective approach. By accommodating model data in CPU and GPU and moving the data to the computing device when necessary, it can breakthrough the GPU memory wall by using GPU and CPU memory (composed of CPU DRAM or nvme SSD memory) together at the same time. Moreover, the model scale can be further improved by combining heterogeneous training with the other parallel approaches, such as data parallel, tensor parallel and pipeline parallel . We now describe the design details of **Gemini**, the heterogeneous memory space manager of Colossal-AI. Its idea comes from [PatrickStar](https://arxiv.org/abs/2108.05818), which has been adapted to Colossal-AI.
|
||||
|
||||
## Usage
|
||||
|
||||
At present, Gemini supports compatibility with ZeRO parallel mode, and it is really simple to use Gemini. Set attribute of zero model_config, i.e., tensor_placement_policy='auto'.
|
||||
|
||||
```
|
||||
zero = dict(
|
||||
model_config=dict(
|
||||
tensor_placement_policy='auto',
|
||||
shard_strategy=BucketTensorShardStrategy()
|
||||
),
|
||||
optimizer_config=dict(
|
||||
...)
|
||||
)
|
||||
```
|
||||
|
||||
Note that Gemini and parallel strategies such as tensor parallelism, data parallelism, pipeline parallelism and zero should be decoupled. However, Colossal-AI requires users to use Gemini with ZeRO. Although they are not necessarily coupled, we will improve it in the near future.
|
||||
|
||||
## Concepts
|
||||
|
||||
**OP**(**OP**erator):operation of a neural network layer, such as linear, LayerNorm, etc. The operator can be a forward propagation calculation or a back-propagation calculation.
|
||||
|
||||
Neural networks must manage two types of training data during training.
|
||||
**model data**: consists of parameters, gradients and optimizer states, and its scale is related to the definition of model structure.
|
||||
|
||||
**Non-model data**: mainly composed of the intermediate tensor generated by the operator and the temporary variables of the operator. Non-model data changes dynamically according to the configuration of training tasks, such as batch size. Model data and non-model data compete with each other for GPU memory.
|
||||
|
||||
## Design Details
|
||||
|
||||
|
||||
In some solutions, the [Zero-offload](https://arxiv.org/abs/2101.06840) adopted by DeepSpeed statically divides model data between CPU and GPU memory, and their memory layout is constant for different training configurations. As shown on the left of the figure below, when the GPU memory is insufficient to meet its corresponding model data requirements, the system will crash even if there is still available memory on the CPU at that time. While Colossal-AI can complete the training by moving part of the model data to the CPU.
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/tutorial/gemini/deepspeed_compare.png"/>
|
||||
<figcaption>Comparison of the memory management of Zero-Offload and Gemini</figcaption>
|
||||
</figure>
|
||||
|
||||
|
||||
Colossal-AI designed Gemini, just like two-stars, which manages the memory space of CPU and GPU efficiently. It can make the tensor dynamically distributed in the storage space of CPU-GPU during training, so that the model training can break through the memory wall of GPU. The memory manager consists of two parts: **MemStatsCollector (MSC)** and **StatefuleTensorMgr (STM)**.
|
||||
|
||||
We take advantage of the iterative characteristics of the deep learning network training process. We divide iterations into two stages: warmup and non-warmup. One or several iterative steps at the beginning belong to the warmup stage, and the other iterative steps belong to the non-warmup stage. In the warmup stage, we collect information for the MSC, while in the non-warmup stage, STM gets the information collected by the MSC to move the tensor, so as to minimize the CPU-GPU data movement volume.
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/tutorial/gemini/gemini_workflow.png"/>
|
||||
<figcaption>The workflow of Gemini during warmup and non-warmup phase</figcaption>
|
||||
</figure>
|
||||
|
||||
|
||||
### StatefulTensorMgr
|
||||
|
||||
STM manages the information of all model data tensors. In the process of model construction, Colossal-AI registers all model data tensors with STM. The memory manager marks each tensor with state information. The state set includes three types: HOLD, COMPUTE and FREE. The functions of STM are as follows:
|
||||
|
||||
**Query memory usage:**by traversing the locations of all tensors in heterogeneous space, obtain the memory occupation of CPU and GPU by model data.
|
||||
|
||||
**Transition tensor state:** it marks the tensor as COMPUTE state before each model data tensor participates in the operator calculation, and as HOLD state after calculation. The FREE state marked if the tensor is no longer in use.
|
||||
|
||||
**Adjust tensor position:**tensor manager ensures that the tensor in COMPUTE state is placed on the computing device. If the storage space of the computing device is insufficient, it is necessary to move some tensors in HOLD state to other devices for storage. Tensor eviction strategy requires information from MSC, which will be introduced later.
|
||||
|
||||
|
||||
### MemStatsCollector
|
||||
In the warmup stage, the memory information statistician monitors the memory usage of model data and non-model data in CPU and GPU for reference in the non-warmup stage. We can obtain the memory usage of model data at a certain time by querying STM. However, the memory usage of non-model data is difficult to obtain. Owing to the life cycle of non-model data not being managed by users, the existing deep learning framework does not expose the tracking interface of non-model data to users. MSC obtains the usage of CPU and GPU memory by non-model in the warmup stage through sampling. The specific methods are as follows:
|
||||
|
||||
We trigger the memory sampling operation at the beginning and end of the operator. We call this time point **sampling moment**, and the time between the two sampling moments is called **period**. The calculation process is a black box. Due to the possible allocation of temporary buffer, the memory usage is very complex. However, we can accurately obtain the maximum memory usage of the system during the period. The use of non-model data can be obtained by the maximum memory use of the system between two statistical moments-model memory use.
|
||||
|
||||
How do we design the sampling time. Before we choose model data layout adjust of preOp. As shown in the figure below. We sample the system memory used of the previous period and the model data memory used of the next period. The parallel strategy will cause obstacles to the work of MSC. As shown in the figure, for example, for ZeRO or Tensor Parallel, because gathering model data is required before OP calculation, it will bring additional memory requirements. Therefore, we require to sample the system memory before the model data changes, so that the MSC will capture the model change memory of preOp within a period. For example, in period 2-3, we consider the memory changes brought by tensor gather and shard.
|
||||
|
||||
Although the sampling time can be placed in other locations, such as excluding the new information of the change of the gather buffer, it will cause trouble. There are differences in the implementation of Op in different parallel modes. For example, for Linear Op, gather buffer in Tensor Parallel is allocated in Op. For ZeRO, the allocation of gather buffer is in PreOp. Sampling at the beginning of PreOp helps to unify the two situations.
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/tutorial/gemini/gemini_mem_curve.png"/>
|
||||
<figcaption>workflow</figcaption>
|
||||
</figure>
|
||||
|
||||
### Tensor Eviction Strategy
|
||||
|
||||
The important duty of MSC is to adjust the tensor layout position. For example, at S2 in the figure above, we reduce the model data on the device, and meet the peak memory requirement calculated in period 2-3.
|
||||
|
||||
In the warmup stage, since we haven't finished a complete iteration yet, we don't know actual memory occupation. At this time, we limit the upper bound of memory usage of the model data. For example, only 30% of the GPU memory can be used. This ensures that we can successfully complete the warmup state.
|
||||
|
||||
In the non-warmup stage, we need to use the memory information of non-model data collected in the warm-up stage to reserve the peak memory required by the computing device for the next Period, which requires us to move some model tensors. In order to avoid frequent replacement of the same tensor in and out of the CPU-GPU, causing a phenomenon similar to [cache thrashing](https://en.wikipedia.org/wiki/Thrashing_(computer_science)). Using the iterative characteristics of DNN training, we design the OPT cache swap out strategy. Specifically, in the warmup stage, we record the sampling time required by each tensor computing device. If we need to expel some HOLD tensors, we will choose the latest tensor needed on this device as the victim.
|
||||
81
docs/source/en/advanced_tutorials/opt_service.md
Normal file
81
docs/source/en/advanced_tutorials/opt_service.md
Normal file
@@ -0,0 +1,81 @@
|
||||
# Build an online OPT service using Colossal-AI in 5 minutes
|
||||
|
||||
## Introduction
|
||||
|
||||
This tutorial shows how to build your own service with OPT with the help of [Colossal-AI](https://github.com/hpcaitech/ColossalAI).
|
||||
|
||||
## Colossal-AI Inference Overview
|
||||
Colossal-AI provides an inference subsystem [Energon-AI](https://github.com/hpcaitech/EnergonAI), a serving system built upon Colossal-AI, which has the following characteristics:
|
||||
|
||||
- **Parallelism for Large-scale Models:** With the help of tensor parallel operations, pipeline parallel strategies from Colossal-AI, Colossal-AI inference enables efficient parallel inference for large-scale models.
|
||||
- **Pre-built large models:** There are pre-built implementations for popular models, such as OPT. It supports a caching technique for the generation task and checkpoints loading.
|
||||
- **Engine encapsulation:** There has an abstraction layer called an engine. It encapsulates the single instance multiple devices (SIMD) execution with the remote procedure call, making it act as the single instance single device (SISD) execution.
|
||||
- **An online service system:** Based on FastAPI, users can launch a web service of a distributed inference quickly. The online service makes special optimizations for the generation task. It adopts both left padding and bucket batching techniques to improve efficiency.
|
||||
|
||||
## Basic Usage:
|
||||
|
||||
1. Download OPT model
|
||||
|
||||
To launch the distributed inference service quickly, you can download the OPT-125M from [here](https://huggingface.co/patrickvonplaten/opt_metaseq_125m/blob/main/model/restored.pt). You can get details for loading other sizes of models [here](https://github.com/hpcaitech/EnergonAI/tree/main/examples/opt/script).
|
||||
|
||||
2. Prepare a prebuilt service image
|
||||
|
||||
Pull a docker image from dockerhub installed with Colossal-AI inference.
|
||||
|
||||
```bash
|
||||
docker pull hpcaitech/energon-ai:latest
|
||||
```
|
||||
|
||||
3. Launch an HTTP service
|
||||
|
||||
To launch a service, we need to provide python scripts to describe the model type and related configurations, and settings for the HTTP service.
|
||||
We have provided a set of [examples](https://github.com/hpcaitech/EnergonAI/tree/main/examples]). We will use the [OPT example](https://github.com/hpcaitech/EnergonAI/tree/main/examples/opt) in this tutorial.
|
||||
The entrance of the service is a bash script server.sh.
|
||||
The config of the service is at opt_config.py, which defines the model type, the checkpoint file path, the parallel strategy, and http settings. You can adapt it for your own case.
|
||||
For example, set the model class as opt_125M and set the correct checkpoint path as follows.
|
||||
|
||||
```bash
|
||||
model_class = opt_125M
|
||||
checkpoint = 'your_file_path'
|
||||
```
|
||||
|
||||
Set the tensor parallelism degree the same as your gpu number.
|
||||
|
||||
```bash
|
||||
tp_init_size = #gpu
|
||||
```
|
||||
|
||||
Now, we can launch a service using docker. You can map the path of the checkpoint and directory containing configs to local disk path `/model_checkpoint` and `/config`.
|
||||
|
||||
|
||||
```bash
|
||||
export CHECKPOINT_DIR="your_opt_checkpoint_path"
|
||||
# the ${CONFIG_DIR} must contain a server.sh file as the entry of service
|
||||
export CONFIG_DIR="config_file_path"
|
||||
|
||||
docker run --gpus all --rm -it -p 8020:8020 -v ${CHECKPOINT_DIR}:/model_checkpoint -v ${CONFIG_DIR}:/config --ipc=host energonai:lastest
|
||||
```
|
||||
|
||||
Then open `https://[IP-ADDRESS]:8020/docs#` in your browser to try out!
|
||||
|
||||
|
||||
## Advance Features Usage:
|
||||
|
||||
1. Batching Optimization
|
||||
|
||||
To use our advanced batching technique to collect multiple queries in batches to serve, you can set the executor_max_batch_size as the max batch size. Note, that only the decoder task with the same top_k, top_p and temperature can be batched together.
|
||||
|
||||
```
|
||||
executor_max_batch_size = 16
|
||||
```
|
||||
|
||||
All queries are submitted to a FIFO queue. All consecutive queries whose number of decoding steps is less than or equal to that of the head of the queue can be batched together. Left padding is applied to ensure correctness. executor_max_batch_size should not be too large. This ensures batching won't increase latency. For opt-30b, `executor_max_batch_size=16` may be a good choice, while for opt-175b, `executor_max_batch_size=4` may be better.
|
||||
|
||||
2. Cache Optimization.
|
||||
|
||||
You can cache several recently served query results for each independent serving process. Set the cache_size and cache_list_size in config.py. The cache size is the number of queries cached. The cache_list_size is the number of results stored for each query. And a random cached result will be returned. When the cache is full, LRU is applied to evict cached queries. cache_size=0means no cache is applied.
|
||||
|
||||
```
|
||||
cache_size = 50
|
||||
cache_list_size = 2
|
||||
```
|
||||
@@ -0,0 +1,192 @@
|
||||
# Parallelize Your Training like Megatron-LM via ColoTensor
|
||||
|
||||
Author: [Haichen Huang](https://github.com/1SAA) and [Jiarui Fang](https://github.com/feifeibear)
|
||||
|
||||
**Prerequisite:**
|
||||
- [ColoTensor Concepts](../basics/colotensor_concept.md)
|
||||
|
||||
## Introduction
|
||||
|
||||
Thanks to the convenience given by ColoTensor, users can apply parallelism with the least edition to their serial code.
|
||||
In this tutorial, we will illustrate how to modify the training model to automatically adapt the code to parallel training like Megatron-LM.
|
||||
We take the GPT-2 model offered by HuggingFace as an example and provide a way for you to pre-train the GPT-2 model on a single GPU.
|
||||
|
||||
Megatron-LM provided a profound paradigm to parallelize large transformer language models.
|
||||
However, in order to train large transformer language models at scale, users have to build their models with those modules provided by Megatron.
|
||||
It imposes several difficult jobs on users, such as loading the weights from the pre-trained models and constructing the parallelized models.
|
||||
To mitigate users' trouble, we offer ColoTensor to enable the tensor model parallelism automatically.
|
||||
|
||||
## Definitions of the model and the loss function
|
||||
|
||||
First we use the GPTModel and GPTLoss directly from the HuggingFace library.
|
||||
|
||||
```python
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from transformers import GPT2Config, GPT2LMHeadModel
|
||||
|
||||
class GPTLMModel(nn.Module):
|
||||
def __init__(self, hidden_size=768, num_layers=12, num_attention_heads=12, max_seq_len=1024, vocab_size=50257, checkpoint=False):
|
||||
super().__init__()
|
||||
self.checkpoint = checkpoint
|
||||
self.model = GPT2LMHeadModel(GPT2Config(n_embd=hidden_size, n_layer=num_layers,
|
||||
n_head=num_attention_heads, n_positions=max_seq_len, n_ctx=max_seq_len, vocab_size=vocab_size))
|
||||
if checkpoint:
|
||||
self.model.gradient_checkpointing_enable()
|
||||
|
||||
def forward(self, input_ids, attention_mask):
|
||||
# Only return lm_logits
|
||||
return self.model(input_ids=input_ids, attention_mask=attention_mask, use_cache=not self.checkpoint)[0]
|
||||
|
||||
|
||||
class GPTLMLoss(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.loss_fn = nn.CrossEntropyLoss()
|
||||
|
||||
def forward(self, logits, labels):
|
||||
shift_logits = logits[..., :-1, :].contiguous()
|
||||
shift_labels = labels[..., 1:].contiguous()
|
||||
# Flatten the tokens
|
||||
return self.loss_fn(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
|
||||
```
|
||||
|
||||
## Brief Review of GPT-2
|
||||
|
||||
Now, we recall the structure of each GPT-2 model.
|
||||
Every GPT-2 model can be represented as a DAG.
|
||||
As shown in the below pictures, each circle represents an operator and each square represents a weight.
|
||||
An arrow indicates the flow of the input data, and the notation alongside the arrow demonstrates the shape of the input data.
|
||||
|
||||
Then, let's take an insight into this GPT-2 model. It consists of three parts.
|
||||
They are the **embedding module**, **transformer layers**, and the **classification head**.
|
||||
|
||||
The embedding module contains two weights, token embedding weight and position embedding weight.
|
||||
After the forward operation of the embedding module, each word in all sequences of the raw input data will be embedded into a hidden state.
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://s2.loli.net/2022/08/17/omfkIEN6ui5jcL3.png"/>
|
||||
<figcaption>The embedding module</figcaption>
|
||||
</figure>
|
||||
|
||||
Each transformer layer contains two blocks. The self-attention operation is called in the first block and a two-layer percepton is located in the second block.
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://s2.loli.net/2022/08/17/LAVzDlpRcj4dYeb.png"/>
|
||||
<figcaption>The transformer layer</figcaption>
|
||||
</figure>
|
||||
|
||||
In the end, the classification head is just a linear module without bias, which only has a weight inside.
|
||||
|
||||
## Applied with ColoTensor
|
||||
|
||||
Two steps make your serial code adapted to Megatron-LM tensor parallel style.
|
||||
1. Initialize the model in the context of ColoInitContext.
|
||||
2. Setting ColoTensorSpec for each parameter.
|
||||
|
||||
### Initialize with ColoInitContext
|
||||
|
||||
We should build the model in the ColoInitContext.
|
||||
In this context, any parameter initialized would be transformed to ColoParameter and moved to the corresponded device automatically.
|
||||
|
||||
```python
|
||||
from colossalai.utils.model.colo_init_context import ColoInitContext
|
||||
|
||||
with ColoInitContext(device=torch.device('cpu')):
|
||||
model = GPTLMModel()
|
||||
```
|
||||
|
||||
### Setting ColoTensorSpec for each parameter
|
||||
|
||||
After the creation of the model, we establish the distributed environment through ProcessGroup.
|
||||
Here, we specify the degree of the tensor parallelism as the same as the number of all GPUs, which means the degree of data parallelism is 1.
|
||||
|
||||
```python
|
||||
import torch.distributed as dist
|
||||
from colossalai.tensor import ProcessGroup
|
||||
|
||||
pg = ProcessGroup(tp_degree=dist.get_world_size())
|
||||
```
|
||||
|
||||
Now, some auxiliary functions are necessary for the next step. We define two functions to split a parameter.
|
||||
Megatron-LM-like tensor parallelism requires splitting a parameter tensor along its first dimension or its last dimension.
|
||||
|
||||
```python
|
||||
from colossalai.tensor import ShardSpec, ComputeSpec, ComputePattern, ColoParameter, ProcessGroup
|
||||
|
||||
def split_param_single_dim_tp1d(dim: int, param: ColoParameter, pg: ProcessGroup):
|
||||
spec = (ShardSpec([dim], [pg.tp_world_size()]), ComputeSpec(ComputePattern.TP1D))
|
||||
if param.process_group.tp_world_size() == 1:
|
||||
param.set_process_group(pg)
|
||||
param.set_tensor_spec(*spec)
|
||||
|
||||
|
||||
def split_param_row_tp1d(param: ColoParameter, pg: ProcessGroup):
|
||||
split_param_single_dim_tp1d(0, param, pg)
|
||||
|
||||
|
||||
def split_param_col_tp1d(param: ColoParameter, pg: ProcessGroup):
|
||||
split_param_single_dim_tp1d(-1, param, pg)
|
||||
```
|
||||
|
||||
Then we adapt the model to the tensor parallelism.
|
||||
According to the tensor parallelism applied in Megatron, it is supposed to shard along the last dimension of tensors, including the weights of token embedding, position embedding, all linear weights and biases in self-attention blocks, the first weight linear and bias in each MLP.
|
||||
And it shards the second linear weight along its first dimension.
|
||||
|
||||
```python
|
||||
for mn, module in model.named_modules():
|
||||
for pn, param in module.named_parameters(recurse=False):
|
||||
# set process group for all parameters
|
||||
param.set_process_group(pg)
|
||||
|
||||
if 'mlp.c_fc' in mn:
|
||||
if 'weight' in pn or 'bias' in pn:
|
||||
split_param_col_tp1d(param, pg) # colmn slice
|
||||
# keep the shape of the output from c_fc
|
||||
param.compute_spec.set_output_replicate(False)
|
||||
elif 'mlp.c_proj' in mn:
|
||||
if 'weight' in pn:
|
||||
split_param_row_tp1d(param, pg) # row slice
|
||||
elif 'wte' in mn or 'wpe' in mn:
|
||||
split_param_col_tp1d(param, pg) # colmn slice
|
||||
elif 'c_attn' in mn or 'c_proj' in mn:
|
||||
split_param_col_tp1d(param, pg) # colmn slice
|
||||
```
|
||||
|
||||
The modified model is illustrated below.
|
||||
|
||||
The embedding module:
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://s2.loli.net/2022/08/17/Yu2xzXEabHV7pwe.png"/>
|
||||
<figcaption>The modified embedding module</figcaption>
|
||||
</figure>
|
||||
|
||||
The transformer layers:
|
||||
|
||||
<figure style={{textAlign: "center"}}>
|
||||
<img src="https://s2.loli.net/2022/08/17/4HWsA2xz51IhPFO.png"/>
|
||||
<figcaption>The modified transformer layer</figcaption>
|
||||
</figure>
|
||||
|
||||
Once users have specified the distributed pattern of each parameter, ColoTensor is capable of inferring the computation patterns of all operators, including matrix multiplication, the linear function, other elementwise functions in torch.nn.functional, etc.
|
||||
In this way, users can train their models as usual.
|
||||
|
||||
In our latest example, a Gemini + ZeRO DDP model is also defined to reduce overhead and improve efficiency.For the details of this part, please refer to [ZeRO](../features/zero_with_chunk.md). You can combine these two parts to understand our entire training process:
|
||||
|
||||
```python
|
||||
def gemini_zero_dpp(model: torch.nn.Module, pg: ProcessGroup, placememt_policy: str = "auto"):
|
||||
from colossalai.nn.parallel import GeminiDDP
|
||||
model = GeminiDDP(model,
|
||||
device=get_current_device(),
|
||||
placement_policy=placememt_policy,
|
||||
pin_memory=True,
|
||||
search_range_mb=32)
|
||||
return model
|
||||
```
|
||||
|
||||
## Pretrain GPT-2 On Single GPU
|
||||
|
||||
The above optimization we made allows us to pretrain the GPT-2 model on a single GPU. We only need to set the parameter `GPUNUM`=1 in `run.sh`, and then we can complete the model training on a single GPU when running the file.
|
||||
|
||||
The GPT-2 example is accessible at [Train GPT with Colossal-AI](https://github.com/hpcaitech/ColossalAI/tree/main/examples/language/gpt).
|
||||
@@ -0,0 +1,270 @@
|
||||
# Train GPT Using Hybrid Parallelism
|
||||
|
||||
Author: Hongxin Liu, Yongbin Li
|
||||
|
||||
**Example Code**
|
||||
- [ColossalAI-Examples GPT2](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/language/gpt_2)
|
||||
- [ColossalAI-Examples GPT3](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/language/gpt_3)
|
||||
|
||||
**Related Paper**
|
||||
- [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Training](https://arxiv.org/abs/2110.14883)
|
||||
- [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473)
|
||||
|
||||
## Introduction
|
||||
|
||||
In the previous tutorial, we introduce how to train ViT with pipeline. In this tutorial, you will learn a more complex scenario -- train GPT with hybrid parallelism. In this case, GPT-3 is so large that CPU memory cannot fit it as well. Therefore, you must split the model by yourself.
|
||||
|
||||
## Table of content
|
||||
|
||||
In this tutorial we will cover:
|
||||
|
||||
1. The definition of GPT model, based on colossalai/model_zoo
|
||||
2. Processing the dataset
|
||||
3. Training GPT using hybrid parallelism
|
||||
|
||||
## Import libraries
|
||||
|
||||
```python
|
||||
import json
|
||||
import os
|
||||
from typing import Callable
|
||||
|
||||
import colossalai
|
||||
import colossalai.utils as utils
|
||||
import model_zoo.gpt.gpt as col_gpt
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from colossalai import nn as col_nn
|
||||
from colossalai.amp import AMP_TYPE
|
||||
from colossalai.builder.pipeline import partition_uniform
|
||||
from colossalai.context.parallel_mode import ParallelMode
|
||||
from colossalai.core import global_context as gpc
|
||||
from colossalai.engine.schedule import (InterleavedPipelineSchedule,
|
||||
PipelineSchedule)
|
||||
from colossalai.logging import disable_existing_loggers, get_dist_logger
|
||||
from colossalai.nn.layer.wrapper import PipelineSharedModuleWrapper
|
||||
from colossalai.trainer import Trainer, hooks
|
||||
from colossalai.utils.timer import MultiTimer
|
||||
from model_zoo.gpt import GPTLMLoss
|
||||
from torch.nn import functional as F
|
||||
from torch.utils.data import Dataset
|
||||
from transformers import GPT2Tokenizer
|
||||
```
|
||||
|
||||
|
||||
|
||||
## Define GPT model
|
||||
|
||||
In the previous tutorial, we introduced 3 ways to build a pipelined model. But for huge models like GPT-3, you can't even build the model in CPU. In this case, you must split the model by yourself.
|
||||
|
||||
GPT dataloader returns `input_ids` and `attention_mask`, so we use two keyword arguments in `forward()` to get them. Note that for stages except the first stage, the first positional argument of `forward()` is the output tensor from the previous stage. So the `hidden_states` is from the previous stage, and for the first stage it's `None`.
|
||||
|
||||
For GPT, the *word embedding layer* shares the weights with the *output head*. We provide `PipelineSharedModuleWrapper` to share parameters among pipeline stages. It takes a `list` of `int` as argument, which means those ranks share the parameters. You can use `register_module()` or `register_parameter()` to register a module or a parameter as the shared module or parameter. If you have multiple sets of shared modules / parameters, you should have multiple `PipelineSharedModuleWrapper` instance. If the parameter is shared within **one** stage, you should not use `PipelineSharedModuleWrapper`, and just use the same module / parameter instance. In this example, the *word embedding layer* is at the first stage, and the *output head* is at the last stage. Thus, they are shared among ranks `[0, pipeline_size - 1]`.
|
||||
|
||||
For the first stage, it maintains the embedding layer and some transformer blocks. For the last stage, it maintains some transformer blocks and the output head layer. For other stages, they just maintain some transformer blocks. `partition_uniform(num_layers, pipeline_size, num_chunks)` returns the parts of all ranks, and the part is a `tuple` of `(start, end)` (exclude end). `start == 0` means that it's the first stage, and `end == num_layers` means it's the last stage.
|
||||
|
||||
```python
|
||||
class PipelineGPTHybrid(nn.Module):
|
||||
def __init__(self,
|
||||
num_layers: int = 12,
|
||||
hidden_size: int = 768,
|
||||
num_attention_heads: int = 12,
|
||||
vocab_size: int = 50304,
|
||||
embed_drop_rate: float = 0.,
|
||||
act_func: Callable = F.gelu,
|
||||
mlp_ratio: int = 4,
|
||||
attn_drop_rate: float = 0.,
|
||||
drop_rate: float = 0.,
|
||||
dtype: torch.dtype = torch.float,
|
||||
checkpoint: bool = False,
|
||||
max_position_embeddings: int = 1024,
|
||||
layer_norm_epsilon: float = 1e-5,
|
||||
first: bool = False,
|
||||
last: bool = False):
|
||||
super().__init__()
|
||||
self.embedding = None
|
||||
self.norm = None
|
||||
self.head = None
|
||||
if first:
|
||||
self.embedding = col_gpt.GPTEmbedding(
|
||||
hidden_size, vocab_size, max_position_embeddings, dropout=embed_drop_rate, dtype=dtype)
|
||||
self.blocks = nn.ModuleList([
|
||||
col_gpt.GPTBlock(hidden_size, num_attention_heads, mlp_ratio=mlp_ratio, attention_dropout=attn_drop_rate,
|
||||
dropout=drop_rate, dtype=dtype, checkpoint=checkpoint, activation=act_func)
|
||||
for _ in range(num_layers)
|
||||
])
|
||||
if last:
|
||||
self.norm = col_nn.LayerNorm(hidden_size, eps=layer_norm_epsilon)
|
||||
self.head = col_gpt.GPTLMHead(vocab_size=vocab_size,
|
||||
dim=hidden_size,
|
||||
dtype=dtype,
|
||||
bias=False)
|
||||
|
||||
def forward(self, hidden_states=None, input_ids=None, attention_mask=None):
|
||||
if self.embedding is not None:
|
||||
hidden_states = self.embedding(input_ids=input_ids)
|
||||
batch_size = hidden_states.shape[0]
|
||||
attention_mask = attention_mask.view(batch_size, -1)
|
||||
attention_mask = attention_mask[:, None, None, :]
|
||||
attention_mask = attention_mask.to(dtype=hidden_states.dtype) # fp16 compatibility
|
||||
attention_mask = (1.0 - attention_mask) * -10000.0
|
||||
for block in self.blocks:
|
||||
hidden_states, attention_mask = block(hidden_states, attention_mask)
|
||||
if self.norm is not None:
|
||||
hidden_states = self.head(self.norm(hidden_states))
|
||||
return hidden_states
|
||||
|
||||
|
||||
def build_gpt_pipeline(num_layers, num_chunks, device=torch.device('cuda'), **kwargs):
|
||||
logger = get_dist_logger()
|
||||
pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE)
|
||||
pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE)
|
||||
rank = gpc.get_global_rank()
|
||||
wrapper = PipelineSharedModuleWrapper([0, pipeline_size - 1])
|
||||
parts = partition_uniform(num_layers, pipeline_size, num_chunks)[pipeline_rank]
|
||||
models = []
|
||||
for start, end in parts:
|
||||
kwargs['num_layers'] = end - start
|
||||
kwargs['first'] = start == 0
|
||||
kwargs['last'] = end == num_layers
|
||||
logger.info(f'Rank{rank} build layer {start}-{end}, {end-start}/{num_layers} layers')
|
||||
chunk = PipelineGPTHybrid(**kwargs).to(device)
|
||||
if start == 0:
|
||||
wrapper.register_module(chunk.embedding.word_embeddings)
|
||||
elif end == num_layers:
|
||||
wrapper.register_module(chunk.head)
|
||||
models.append(chunk)
|
||||
if len(models) == 1:
|
||||
model = models[0]
|
||||
else:
|
||||
model = nn.ModuleList(models)
|
||||
return model
|
||||
|
||||
|
||||
def GPT2_exlarge_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float):
|
||||
cfg = dict(hidden_size=1600, num_attention_heads=32, checkpoint=checkpoint, dtype=dtype)
|
||||
return build_gpt_pipeline(48, num_chunks, **cfg)
|
||||
|
||||
|
||||
def GPT3_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float):
|
||||
cfg = dict(hidden_size=12288, num_attention_heads=96,
|
||||
checkpoint=checkpoint, max_position_embeddings=2048, dtype=dtype)
|
||||
return build_gpt_pipeline(96, num_chunks, **cfg)
|
||||
```
|
||||
|
||||
## Process the dataset
|
||||
|
||||
We provide a small GPT web-text dataset here. The original format is loose JSON, and we will save the processed dataset.
|
||||
|
||||
```python
|
||||
class WebtextDataset(Dataset):
|
||||
def __init__(self, path, seq_len=1024) -> None:
|
||||
super().__init__()
|
||||
root = os.path.dirname(path)
|
||||
encoded_data_cache_path = os.path.join(root, f'gpt_webtext_{seq_len}.pt')
|
||||
if os.path.isfile(encoded_data_cache_path):
|
||||
seq_len_, data, attention_mask = torch.load(
|
||||
encoded_data_cache_path)
|
||||
if seq_len_ == seq_len:
|
||||
self.data = data
|
||||
self.attention_mask = attention_mask
|
||||
return
|
||||
raw_data = []
|
||||
with open(path) as f:
|
||||
for line in f.readlines():
|
||||
raw_data.append(json.loads(line)['text'])
|
||||
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
|
||||
tokenizer.pad_token = tokenizer.unk_token
|
||||
encoded_data = tokenizer(
|
||||
raw_data, padding=True, truncation=True, max_length=seq_len, return_tensors='pt')
|
||||
self.data = encoded_data['input_ids']
|
||||
self.attention_mask = encoded_data['attention_mask']
|
||||
torch.save((seq_len, self.data, self.attention_mask),
|
||||
encoded_data_cache_path)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
|
||||
def __getitem__(self, index):
|
||||
return {
|
||||
'input_ids': self.data[index],
|
||||
'attention_mask': self.attention_mask[index]
|
||||
}, self.data[index]
|
||||
```
|
||||
|
||||
## Training GPT using hybrid parallelism
|
||||
|
||||
In the previous tutorial, we explained the meanings of some pipeline arguments. In this case, we can determine the shape of each output tensor which is exchanged among pipeline stages. For GPT, the shape is `(MICRO BATCH SIZE, SEQUENCE LEN, HIDDEN SIZE)`. By setting this, we can avoid exchanging the tensor shape of each stage. When you are not sure of the tensor shape, you can just leave it `None`, and the shape is inferred automatically. Make sure that the `dtype` of your model is correct. When you use `fp16`, the `dtype` of your model must be `torch.half`. Otherwise, the `dtype` must be `torch.float`. For pipeline parallelism, only `AMP_TYPE.NAIVE` is supported.
|
||||
|
||||
You can easily use tensor parallel by setting `parallel` in `CONFIG`. The data parallelism size is automatically set based on the number of GPUs.
|
||||
|
||||
```python
|
||||
NUM_EPOCHS = 60
|
||||
SEQ_LEN = 1024
|
||||
BATCH_SIZE = 192
|
||||
NUM_CHUNKS = None
|
||||
TENSOR_SHAPE = (1, 1024, 1600)
|
||||
# only pipeline parallel
|
||||
# CONFIG = dict(parallel=dict(pipeline=2), fp16=dict(mode=AMP_TYPE.NAIVE))
|
||||
# pipeline + 1D model parallel
|
||||
CONFIG = dict(NUM_MICRO_BATCHES = 192, parallel=dict(pipeline=2, tensor=dict(mode='1d', size=2)), fp16=dict(mode=AMP_TYPE.NAIVE))
|
||||
|
||||
|
||||
def train():
|
||||
disable_existing_loggers()
|
||||
parser = colossalai.get_default_parser()
|
||||
args = parser.parse_args()
|
||||
colossalai.launch_from_torch(config=CONFIG, backend=args.backend)
|
||||
logger = get_dist_logger()
|
||||
|
||||
train_ds = WebtextDataset(os.environ['DATA'], seq_len=SEQ_LEN)
|
||||
train_dataloader = utils.get_dataloader(train_ds,
|
||||
seed=42,
|
||||
batch_size=BATCH_SIZE,
|
||||
pin_memory=True,
|
||||
shuffle=True,
|
||||
drop_last=True)
|
||||
|
||||
use_interleaved = NUM_CHUNKS is not None
|
||||
num_chunks = 1 if not use_interleaved else NUM_CHUNKS
|
||||
model = GPT2_exlarge_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half)
|
||||
# model = GPT3_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half)
|
||||
if use_interleaved and not isinstance(model, nn.ModuleList):
|
||||
model = nn.ModuleList([model])
|
||||
|
||||
criterion = GPTLMLoss()
|
||||
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=0.00015, weight_decay=1e-2,)
|
||||
|
||||
engine, train_dataloader, _, _ = colossalai.initialize(model,
|
||||
optimizer,
|
||||
criterion,
|
||||
train_dataloader=train_dataloader)
|
||||
global_batch_size = BATCH_SIZE * \
|
||||
gpc.get_world_size(ParallelMode.DATA) * getattr(gpc.config, "gradient_accumulation", 1)
|
||||
logger.info(f'Init done, global batch size = {global_batch_size}', ranks=[0])
|
||||
|
||||
timer = MultiTimer()
|
||||
|
||||
trainer = Trainer(
|
||||
engine=engine,
|
||||
logger=logger,
|
||||
timer=timer
|
||||
)
|
||||
|
||||
hook_list = [
|
||||
hooks.LossHook(),
|
||||
hooks.LogMetricByEpochHook(logger),
|
||||
hooks.ThroughputHook(),
|
||||
hooks.LogMetricByStepHook(),
|
||||
]
|
||||
|
||||
trainer.fit(
|
||||
train_dataloader=train_dataloader,
|
||||
epochs=NUM_EPOCHS,
|
||||
test_interval=1,
|
||||
hooks=hook_list,
|
||||
display_progress=True,
|
||||
return_output_label=False,
|
||||
)
|
||||
```
|
||||
@@ -0,0 +1,247 @@
|
||||
# Train ViT Using Pipeline Parallelism
|
||||
|
||||
Author: Hongxin Liu, Yongbin Li
|
||||
|
||||
**Example Code**
|
||||
- [ColossalAI-Examples Pipeline Parallel ViT](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/vision_transformer/pipeline_parallel)
|
||||
|
||||
**Related Paper**
|
||||
- [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473)
|
||||
|
||||
## Introduction
|
||||
|
||||
In this tutorial, you will learn how to train Vision Transformer for image classification from scratch, using pipeline.
|
||||
Pipeline parallelism is a kind of model parallelism, which is useful when your GPU memory cannot fit your model.
|
||||
By using it, we split the original model into multi stages, and each stage maintains a part of the original model.
|
||||
We assume that your GPU memory cannot fit ViT/L-16, and your memory can fit this model.
|
||||
|
||||
## Table of contents
|
||||
|
||||
In this tutorial we will cover:
|
||||
|
||||
1. The definition of ViT model, based on [TIMM](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py)
|
||||
2. Processing the dataset
|
||||
3. Training ViT using pipeline
|
||||
|
||||
## Import libraries
|
||||
|
||||
```python
|
||||
import os
|
||||
from collections import OrderedDict
|
||||
from functools import partial
|
||||
|
||||
import colossalai
|
||||
import colossalai.nn as col_nn
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from colossalai.builder import build_pipeline_model
|
||||
from colossalai.engine.schedule import (InterleavedPipelineSchedule,
|
||||
PipelineSchedule)
|
||||
from colossalai.logging import disable_existing_loggers, get_dist_logger
|
||||
from colossalai.trainer import Trainer, hooks
|
||||
from colossalai.utils import MultiTimer, get_dataloader
|
||||
from timm.models import vision_transformer as vit
|
||||
from torchvision import transforms
|
||||
from torchvision.datasets import CIFAR10
|
||||
```
|
||||
|
||||
|
||||
|
||||
## Define Vision Transformer model
|
||||
|
||||
Generally, we provide 3 ways to build a pipelined model:
|
||||
|
||||
1. `colossalai.builder.build_pipeline_model_from_cfg`
|
||||
2. `colossalai.builder.build_pipeline_model`
|
||||
3. Split the model by stages by yourself
|
||||
|
||||
When your memory can fit the model, you can use the first two methods to build your model, otherwise you must split the model by yourself. The first two methods first build the whole model on CPU, then split the model, and finally you can just move the corresponding part of model to GPU.
|
||||
|
||||
`colossalai.builder.build_pipeline_model_from_cfg()` receives a config file of model, and it can split the model uniformly (by layer) or balanced (by parameter size).
|
||||
|
||||
If you are familiar with `PyTorch`, you can use `colossalai.builder.build_pipeline_model()` which receives a `torch.nn.Sequential` model and split it by layer uniformly.
|
||||
|
||||
In this tutorial, we will modify [TIMM/ViT](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py) to `torch.nn.Sequential` and then use `colossalai.builder.build_pipeline_model()` to build the pipelined model.
|
||||
|
||||
When the data is **one** `Tensor`, you can use the positional argument in `forward()` of your model to get the data tensor. For the first stage of pipeline, the first positional argument of `forward()` is the data tensor loaded from data loader. For other stages, the first positional argument of `forward()` is the output tensor from the previous stage. Note that if the stage is not the last stage, the return of `forward()` must be a `Tensor`.
|
||||
|
||||
When the data is a `dict` of `Tensor`, you can use named keyword arguments in `forward()` of your model to get the data `dict`.
|
||||
|
||||
```python
|
||||
class ViTEmbedding(nn.Module):
|
||||
def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768, embed_layer=vit.PatchEmbed, drop_rate=0., distilled=False):
|
||||
super().__init__()
|
||||
self.embed_dim = embed_dim # num_features for consistency with other models
|
||||
self.num_tokens = 2 if distilled else 1
|
||||
self.patch_embed = embed_layer(
|
||||
img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim)
|
||||
num_patches = self.patch_embed.num_patches
|
||||
|
||||
self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
|
||||
self.dist_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None
|
||||
self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + self.num_tokens, embed_dim))
|
||||
self.pos_drop = nn.Dropout(p=drop_rate)
|
||||
self.init_weights()
|
||||
|
||||
def forward(self, x):
|
||||
x = self.patch_embed(x)
|
||||
cls_token = self.cls_token.expand(x.shape[0], -1, -1) # stole cls_tokens impl from Phil Wang, thanks
|
||||
if self.dist_token is None:
|
||||
x = torch.cat((cls_token, x), dim=1)
|
||||
else:
|
||||
x = torch.cat((cls_token, self.dist_token.expand(x.shape[0], -1, -1), x), dim=1)
|
||||
x = self.pos_drop(x + self.pos_embed)
|
||||
return x
|
||||
|
||||
def init_weights(self):
|
||||
vit.trunc_normal_(self.pos_embed, std=.02)
|
||||
if self.dist_token is not None:
|
||||
vit.trunc_normal_(self.dist_token, std=.02)
|
||||
vit.trunc_normal_(self.cls_token, std=.02)
|
||||
self.apply(vit._init_vit_weights)
|
||||
|
||||
|
||||
class ViTHead(nn.Module):
|
||||
def __init__(self, embed_dim=768, num_classes=1000, norm_layer=None, distilled=False, representation_size=None):
|
||||
super().__init__()
|
||||
norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
|
||||
self.norm = norm_layer(embed_dim)
|
||||
self.num_classes = num_classes
|
||||
self.distilled = distilled
|
||||
self.num_features = embed_dim
|
||||
# Representation layer
|
||||
if representation_size and not distilled:
|
||||
self.num_features = representation_size
|
||||
self.pre_logits = nn.Sequential(OrderedDict([
|
||||
('fc', nn.Linear(embed_dim, representation_size)),
|
||||
('act', nn.Tanh())
|
||||
]))
|
||||
else:
|
||||
self.pre_logits = nn.Identity()
|
||||
# Classifier head(s)
|
||||
self.head = nn.Linear(self.num_features, num_classes) if num_classes > 0 else nn.Identity()
|
||||
self.head_dist = None
|
||||
if distilled:
|
||||
self.head_dist = nn.Linear(embed_dim, num_classes) if num_classes > 0 else nn.Identity()
|
||||
self.init_weights()
|
||||
|
||||
def forward(self, x):
|
||||
x = self.norm(x)
|
||||
if self.distilled:
|
||||
x, x_dist = self.head(x[:, 0]), self.head_dist(x[:, 1])
|
||||
if self.training and not torch.jit.is_scripting():
|
||||
# during inference, return the average of both classifier predictions
|
||||
return x, x_dist
|
||||
else:
|
||||
return (x + x_dist) / 2
|
||||
else:
|
||||
x = self.pre_logits(x[:, 0])
|
||||
x = self.head(x)
|
||||
return x
|
||||
|
||||
def init_weights(self):
|
||||
self.apply(vit._init_vit_weights)
|
||||
|
||||
|
||||
def sequential_vit(img_size=224, patch_size=16, in_chans=3, num_classes=1000, embed_dim=768, depth=12,
|
||||
num_heads=12, mlp_ratio=4., qkv_bias=True, representation_size=None, distilled=False,
|
||||
drop_rate=0., attn_drop_rate=0., drop_path_rate=0., embed_layer=vit.PatchEmbed, norm_layer=None,
|
||||
act_layer=None):
|
||||
norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
|
||||
act_layer = act_layer or nn.GELU
|
||||
embedding = ViTEmbedding(img_size=img_size, patch_size=patch_size, in_chans=in_chans,
|
||||
embed_dim=embed_dim, embed_layer=embed_layer, drop_rate=drop_rate, distilled=distilled)
|
||||
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)] # stochastic depth decay rule
|
||||
blocks = [vit.Block(
|
||||
dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, drop=drop_rate,
|
||||
attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer, act_layer=act_layer)
|
||||
for i in range(depth)]
|
||||
for block in blocks:
|
||||
block.apply(vit._init_vit_weights)
|
||||
head = ViTHead(embed_dim=embed_dim, num_classes=num_classes, norm_layer=norm_layer,
|
||||
distilled=distilled, representation_size=representation_size)
|
||||
return nn.Sequential(embedding, *blocks, head)
|
||||
|
||||
|
||||
def vit_large_patch16_224(**kwargs):
|
||||
model_kwargs = dict(embed_dim=1024, depth=24, num_heads=16, **kwargs)
|
||||
return sequential_vit(**model_kwargs)
|
||||
```
|
||||
|
||||
## Process the dataset
|
||||
|
||||
Generally, we train ViT on large dataset like Imagenet. For simplicity, we just use CIFAR-10 here, since this tutorial is just for pipeline training.
|
||||
|
||||
```python
|
||||
def build_cifar(batch_size):
|
||||
transform_train = transforms.Compose([
|
||||
transforms.RandomCrop(224, pad_if_needed=True),
|
||||
transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10),
|
||||
transforms.ToTensor(),
|
||||
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
|
||||
])
|
||||
transform_test = transforms.Compose([
|
||||
transforms.Resize(224),
|
||||
transforms.ToTensor(),
|
||||
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
|
||||
])
|
||||
|
||||
train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train)
|
||||
test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test)
|
||||
train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True)
|
||||
test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True)
|
||||
return train_dataloader, test_dataloader
|
||||
```
|
||||
|
||||
## Training ViT using pipeline
|
||||
|
||||
You can set the size of pipeline parallel and number of microbatches in config. `NUM_CHUNKS` is useful when using interleved-pipeline (for more details see [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473) ). The original batch will be split into `num_microbatches`, and each stage will load a micro batch each time. Then we will generate an approriate schedule for you to execute the pipeline training. If you don't need the output and label of model, you can set `return_output_label` to `False` when calling `trainer.fit()` which can further reduce GPU memory usage.
|
||||
|
||||
You should `export DATA=/path/to/cifar`.
|
||||
|
||||
```python
|
||||
BATCH_SIZE = 16
|
||||
NUM_EPOCHS = 60
|
||||
NUM_CHUNKS = 1
|
||||
CONFIG = dict(NUM_MICRO_BATCHES=4, parallel=dict(pipeline=2))
|
||||
|
||||
|
||||
def train():
|
||||
disable_existing_loggers()
|
||||
parser = colossalai.get_default_parser()
|
||||
args = parser.parse_args()
|
||||
colossalai.launch_from_torch(backend=args.backend, config=CONFIG)
|
||||
logger = get_dist_logger()
|
||||
|
||||
# build model
|
||||
model = vit_large_patch16_224()
|
||||
model = build_pipeline_model(model, num_chunks=NUM_CHUNKS, verbose=True)
|
||||
|
||||
# build criterion
|
||||
criterion = nn.CrossEntropyLoss()
|
||||
|
||||
# optimizer
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0)
|
||||
|
||||
# build dataloader
|
||||
train_dataloader, test_dataloader = build_cifar(BATCH_SIZE)
|
||||
|
||||
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model, optimizer, criterion,
|
||||
train_dataloader, test_dataloader)
|
||||
timer = MultiTimer()
|
||||
|
||||
trainer = Trainer(engine=engine, timer=timer, logger=logger)
|
||||
|
||||
hook_list = [
|
||||
hooks.LossHook(),
|
||||
hooks.AccuracyHook(col_nn.metric.Accuracy()),
|
||||
hooks.LogMetricByEpochHook(logger),
|
||||
]
|
||||
|
||||
trainer.fit(train_dataloader=train_dataloader,
|
||||
epochs=NUM_EPOCHS,
|
||||
test_dataloader=test_dataloader,
|
||||
test_interval=1,
|
||||
hooks=hook_list,
|
||||
display_progress=True)
|
||||
```
|
||||
@@ -0,0 +1,646 @@
|
||||
# Step By Step: Accelerate ViT Training With Colossal-AI (From Data Parallel to Hybrid Parallel)
|
||||
|
||||
Author: Yuxuan Lou
|
||||
|
||||
**Example Code**
|
||||
|
||||
- [Colossal-AI Examples ViT on Cifar10](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/vision_transformer)
|
||||
|
||||
**Related Paper**
|
||||
- [An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale](https://arxiv.org/pdf/2010.11929.pdf)
|
||||
|
||||
|
||||
## Introduction
|
||||
|
||||
In this example for ViT model, Colossal-AI provides three different parallelism techniques which acclerate model training: data parallelism, pipeline parallelism and tensor parallelism.
|
||||
We will show you how to train ViT on CIFAR-10 dataset with these parallelism techniques. To run this example, you will need 2-4 GPUs.
|
||||
|
||||
|
||||
## Tabel of Contents
|
||||
1. Colossal-AI installation
|
||||
2. Steps to train ViT with data parallelism
|
||||
3. Steps to train ViT with pipeline parallelism
|
||||
4. Steps to train ViT with tensor parallelism or hybrid parallelism
|
||||
|
||||
## Colossal-AI Installation
|
||||
You can install Colossal-AI pacakage and its dependencies with PyPI.
|
||||
```bash
|
||||
pip install colossalai
|
||||
```
|
||||
|
||||
|
||||
|
||||
## Data Parallelism
|
||||
Data parallism is one basic way to accelerate model training process. You can apply data parallism to training by only two steps:
|
||||
1. Define a configuration file
|
||||
2. Change a few lines of code in train script
|
||||
|
||||
### Define your configuration file (`data_parallel/config.py`)
|
||||
To use Colossal-AI, the first step is to define a configuration file. And there are two kinds of variables here:
|
||||
|
||||
1. **Colossal-AI feature specification**
|
||||
|
||||
There is an array of features Colossal-AI provides to speed up training (parallel mode, mixed precision, ZeRO, etc.). Each feature is defined by a corresponding field in the config file. If we apply data parallel only, we do not need to specify the parallel mode. In this example, we use mixed precision training natively provided by PyTorch by define the mixed precision configuration `fp16 = dict(mode=AMP_TYPE.TORCH)`.
|
||||
|
||||
2. **Global hyper-parameters**
|
||||
|
||||
Global hyper-parameters include model-specific hyper-parameters, training settings, dataset information, etc.
|
||||
|
||||
```python
|
||||
from colossalai.amp import AMP_TYPE
|
||||
|
||||
# ViT Base
|
||||
BATCH_SIZE = 256
|
||||
DROP_RATE = 0.1
|
||||
NUM_EPOCHS = 300
|
||||
|
||||
# mix precision
|
||||
fp16 = dict(
|
||||
mode=AMP_TYPE.TORCH,
|
||||
)
|
||||
|
||||
gradient_accumulation = 16
|
||||
clip_grad_norm = 1.0
|
||||
|
||||
dali = dict(
|
||||
gpu_aug=True,
|
||||
mixup_alpha=0.2
|
||||
)
|
||||
```
|
||||
|
||||
### Modify train script (`/data_parallel/train_with_cifar10.py`)
|
||||
|
||||
#### Import modules
|
||||
- Colossal-AI related modules
|
||||
```python
|
||||
import colossalai
|
||||
from colossalai.context import ParallelMode
|
||||
from colossalai.core import global_context as gpc
|
||||
from colossalai.logging import disable_existing_loggers, get_dist_logger
|
||||
from colossalai.nn.lr_scheduler import LinearWarmupLR
|
||||
from colossalai.nn.metric import Accuracy
|
||||
from colossalai.trainer import Trainer, hooks
|
||||
```
|
||||
|
||||
- Other modules
|
||||
```python
|
||||
import os
|
||||
|
||||
import torch
|
||||
from timm.models import vit_base_patch16_224
|
||||
|
||||
|
||||
from torchvision import transforms
|
||||
from torchvision.datasets import CIFAR10
|
||||
```
|
||||
|
||||
#### Lauch Colossal-AI
|
||||
|
||||
In train script, you need to initialize the distributed environment for Colossal-AI after your config file is prepared. We call this process `launch`. In Colossal-AI, we provided several launch methods to initialize the distributed backend. In most cases, you can use `colossalai.launch` and `colossalai.get_default_parser` to pass the parameters via command line. Besides, Colossal-AI can utilize the existing launch tool provided by PyTorch as many users are familiar with by using `colossalai.launch_from_torch`. For more details, you can view the related [documents](https://www.colossalai.org/docs/basics/launch_colossalai).
|
||||
|
||||
```python
|
||||
# initialize distributed setting
|
||||
parser = colossalai.get_default_parser()
|
||||
args = parser.parse_args()
|
||||
colossalai.launch_from_torch(config=args.config)
|
||||
|
||||
disable_existing_loggers()
|
||||
logger = get_dist_logger()
|
||||
```
|
||||
|
||||
After initialization, you can acess the variables in the config file by using `colossalai.core.global_context`.
|
||||
|
||||
```python
|
||||
#access parameters
|
||||
print(gpc.config.BATCH_SIZE)
|
||||
```
|
||||
|
||||
#### Build Model
|
||||
|
||||
If only data parallelism is required, you do not need to make any changes to your model. Here, we use `vit_base_patch16_224` from `timm`.
|
||||
```python
|
||||
# build model
|
||||
model = vit_base_patch16_224(drop_rate=0.1, num_classes=gpc.config.NUM_CLASSES)
|
||||
```
|
||||
|
||||
#### Build CIFAR-10 Dataloader
|
||||
`colossalai.utils.get_dataloader` can help you build dataloader easily.
|
||||
|
||||
```python
|
||||
def build_cifar(batch_size):
|
||||
transform_train = transforms.Compose([
|
||||
transforms.RandomCrop(224, pad_if_needed=True),
|
||||
transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10),
|
||||
transforms.ToTensor(),
|
||||
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
|
||||
])
|
||||
transform_test = transforms.Compose([
|
||||
transforms.Resize(224),
|
||||
transforms.ToTensor(),
|
||||
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
|
||||
])
|
||||
|
||||
train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train)
|
||||
test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test)
|
||||
train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True)
|
||||
test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True)
|
||||
return train_dataloader, test_dataloader
|
||||
|
||||
|
||||
# build dataloader
|
||||
train_dataloader, test_dataloader = build_cifar(gpc.config.BATCH_SIZE)
|
||||
```
|
||||
|
||||
#### Define optimizer, loss function and LR scheduler
|
||||
|
||||
Colossal-AI provides its own optimizer, loss function and LR scheduler. Those from PyTorch are also compatible.
|
||||
|
||||
```python
|
||||
# build optimizer
|
||||
optimizer = colossalai.nn.Lamb(model.parameters(), lr=1.8e-2, weight_decay=0.1)
|
||||
|
||||
# build loss
|
||||
criterion = torch.nn.CrossEntropyLoss()
|
||||
|
||||
# lr_scheduelr
|
||||
lr_scheduler = LinearWarmupLR(optimizer, warmup_steps=50, total_steps=gpc.config.NUM_EPOCHS)
|
||||
```
|
||||
|
||||
#### Start Colossal-AI engine
|
||||
|
||||
Engine is essentially a wrapper class for model, optimizer and loss function. When we call `colossalai.initialize`, an engine object will be returned, and it has already been equipped with functionalities such as gradient clipping, gradient accumulation and zero optimizer as specified in your configuration file. Further model training is based on Colossal-AI engine.
|
||||
|
||||
```python
|
||||
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(
|
||||
model, optimizer, criterion, train_dataloader, test_dataloader
|
||||
)
|
||||
```
|
||||
|
||||
#### Train: Trainer API
|
||||
Trainer is a more high-level wrapper for the user to execute training with fewer lines of code. It is easy to create a trainer object by passing the engine object.
|
||||
|
||||
Besides, In trainer, the user can customize some hooks and attach these hooks to the trainer object. A hook object will execute life-cycle methods periodically based on the training scheme. For example, The `LRSchedulerHook` will execute `lr_scheduler.step()` to update the learning rate of the model during either `after_train_iter` or `after_train_epoch` stages.
|
||||
|
||||
```python
|
||||
# build trainer
|
||||
trainer = Trainer(engine=engine, logger=logger)
|
||||
|
||||
# build hooks
|
||||
hook_list = [
|
||||
hooks.LossHook(),
|
||||
hooks.AccuracyHook(accuracy_func=MixupAccuracy()),
|
||||
hooks.LogMetricByEpochHook(logger),
|
||||
hooks.LRSchedulerHook(lr_scheduler, by_epoch=True),
|
||||
|
||||
# comment if you do not need to use the hooks below
|
||||
hooks.SaveCheckpointHook(interval=1, checkpoint_dir='./ckpt'),
|
||||
hooks.TensorboardHook(log_dir='./tb_logs', ranks=[0]),
|
||||
]
|
||||
```
|
||||
|
||||
Use `trainer.fit` for training:
|
||||
|
||||
```python
|
||||
# start training
|
||||
trainer.fit(
|
||||
train_dataloader=train_dataloader,
|
||||
test_dataloader=test_dataloader,
|
||||
epochs=gpc.config.NUM_EPOCHS,
|
||||
hooks=hook_list,
|
||||
display_progress=True,
|
||||
test_interval=1
|
||||
)
|
||||
```
|
||||
|
||||
### Start training
|
||||
`DATA` is the filepath where CIFAR-10 dataset will be automatically downloaded and stored.
|
||||
|
||||
`<NUM_GPUs>` is the number of GPUs you want to use to train ViT on CIFAR-10 with data parallelism.
|
||||
|
||||
```bash
|
||||
export DATA=<path_to_data>
|
||||
# If your torch >= 1.10.0
|
||||
torchrun --standalone --nproc_per_node <NUM_GPUs> train_dp.py --config ./configs/config_data_parallel.py
|
||||
# If your torch >= 1.9.0
|
||||
# python -m torch.distributed.run --standalone --nproc_per_node= <NUM_GPUs> train_dp.py --config ./configs/config_data_parallel.py
|
||||
# Otherwise
|
||||
# python -m torch.distributed.launch --nproc_per_node <NUM_GPUs> --master_addr <node_name> --master_port 29500 train_dp.py --config ./configs/config.py
|
||||
```
|
||||
|
||||
|
||||
|
||||
## Pipeline Parallelism
|
||||
Aside from data parallelism, Colossal-AI also support pipleline parallelism. In specific, Colossal-AI uses 1F1B pipeline introduced by NVIDIA. For more details, you can view the related [documents](https://www.colossalai.org/tutorials/features/pipeline_parallel).
|
||||
|
||||
### Define your configuration file(`hybrid_parallel/configs/vit_pipeline.py`)
|
||||
To apply pipleline parallel on the data parallel basis, you only need to add a **parallel dict**
|
||||
```python
|
||||
from colossalai.amp import AMP_TYPE
|
||||
|
||||
parallel = dict(
|
||||
pipeline=2
|
||||
)
|
||||
# pipeline config
|
||||
NUM_MICRO_BATCHES = parallel['pipeline']
|
||||
TENSOR_SHAPE = (BATCH_SIZE // NUM_MICRO_BATCHES, SEQ_LENGTH, HIDDEN_SIZE)
|
||||
|
||||
fp16 = dict(mode=AMP_TYPE.NAIVE)
|
||||
clip_grad_norm = 1.0
|
||||
```
|
||||
|
||||
Other configs:
|
||||
```python
|
||||
# hyperparameters
|
||||
# BATCH_SIZE is as per GPU
|
||||
# global batch size = BATCH_SIZE x data parallel size
|
||||
BATCH_SIZE = 256
|
||||
LEARNING_RATE = 3e-3
|
||||
WEIGHT_DECAY = 0.3
|
||||
NUM_EPOCHS = 300
|
||||
WARMUP_EPOCHS = 32
|
||||
|
||||
# model config
|
||||
IMG_SIZE = 224
|
||||
PATCH_SIZE = 16
|
||||
HIDDEN_SIZE = 768
|
||||
DEPTH = 12
|
||||
NUM_HEADS = 12
|
||||
MLP_RATIO = 4
|
||||
NUM_CLASSES = 10
|
||||
CHECKPOINT = True
|
||||
SEQ_LENGTH = (IMG_SIZE // PATCH_SIZE) ** 2 + 1 # add 1 for cls token
|
||||
```
|
||||
|
||||
### Build pipeline model (`/hybrid_parallel/model/vit.py`)
|
||||
Colossal-AI provides two methods to build a pipeline model from the existing model.
|
||||
- `colossalai.builder.build_pipeline_model_from_cfg`
|
||||
- `colossalai.builder.build_pipeline_model`
|
||||
|
||||
Besides, you can also build a pipeline model from scrath with Colossal-AI.
|
||||
```python
|
||||
import math
|
||||
from typing import Callable
|
||||
|
||||
import inspect
|
||||
import torch
|
||||
from colossalai import nn as col_nn
|
||||
from colossalai.registry import LAYERS, MODELS
|
||||
from colossalai.logging import get_dist_logger
|
||||
from colossalai.core import global_context as gpc
|
||||
from colossalai.context import ParallelMode
|
||||
from colossalai.builder.pipeline import partition_uniform
|
||||
from torch import dtype, nn
|
||||
from model_zoo.vit.vit import ViTBlock, ViTEmbedding, ViTHead
|
||||
|
||||
|
||||
@MODELS.register_module
|
||||
class PipelineVisionTransformer(nn.Module):
|
||||
def __init__(self,
|
||||
img_size: int = 224,
|
||||
patch_size: int = 16,
|
||||
in_chans: int = 3,
|
||||
num_classes: int = 1000,
|
||||
depth: int = 12,
|
||||
num_heads: int = 12,
|
||||
dim: int = 768,
|
||||
mlp_ratio: int = 4,
|
||||
attention_dropout: float = 0.,
|
||||
dropout: float = 0.1,
|
||||
drop_path: float = 0.,
|
||||
layernorm_epsilon: float = 1e-6,
|
||||
activation: Callable = nn.functional.gelu,
|
||||
representation_size: int = None,
|
||||
dtype: dtype = None,
|
||||
bias: bool = True,
|
||||
checkpoint: bool = False,
|
||||
init_method: str = 'torch',
|
||||
first_stage=True,
|
||||
last_stage=True,
|
||||
start_idx=None,
|
||||
end_idx=None,):
|
||||
super().__init__()
|
||||
|
||||
layers = []
|
||||
|
||||
if first_stage:
|
||||
embed = ViTEmbedding(img_size=img_size,
|
||||
patch_size=patch_size,
|
||||
in_chans=in_chans,
|
||||
embedding_dim=dim,
|
||||
dropout=dropout,
|
||||
dtype=dtype,
|
||||
init_method=init_method)
|
||||
layers.append(embed)
|
||||
|
||||
# stochastic depth decay rule
|
||||
dpr = [x.item() for x in torch.linspace(0, drop_path, depth)]
|
||||
|
||||
if start_idx is None and end_idx is None:
|
||||
start_idx = 0
|
||||
end_idx = depth
|
||||
|
||||
blocks = [
|
||||
ViTBlock(
|
||||
dim=dim,
|
||||
num_heads=num_heads,
|
||||
mlp_ratio=mlp_ratio,
|
||||
attention_dropout=attention_dropout,
|
||||
dropout=dropout,
|
||||
drop_path=dpr[i],
|
||||
activation=activation,
|
||||
dtype=dtype,
|
||||
bias=bias,
|
||||
checkpoint=checkpoint,
|
||||
init_method=init_method,
|
||||
) for i in range(start_idx, end_idx)
|
||||
]
|
||||
layers.extend(blocks)
|
||||
|
||||
if last_stage:
|
||||
norm = col_nn.LayerNorm(normalized_shape=dim, eps=layernorm_epsilon, dtype=dtype)
|
||||
head = ViTHead(dim=dim,
|
||||
num_classes=num_classes,
|
||||
representation_size=representation_size,
|
||||
dtype=dtype,
|
||||
bias=bias,
|
||||
init_method=init_method)
|
||||
layers.extend([norm, head])
|
||||
|
||||
self.layers = nn.Sequential(
|
||||
*layers
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.layers(x)
|
||||
return x
|
||||
|
||||
|
||||
def _filter_kwargs(func, kwargs):
|
||||
sig = inspect.signature(func)
|
||||
return {k: v for k, v in kwargs.items() if k in sig.parameters}
|
||||
|
||||
|
||||
def _build_pipeline_vit(module_cls, num_layers, num_chunks, device=torch.device('cuda'), **kwargs):
|
||||
logger = get_dist_logger()
|
||||
if gpc.is_initialized(ParallelMode.PIPELINE):
|
||||
pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE)
|
||||
pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE)
|
||||
else:
|
||||
pipeline_size = 1
|
||||
pipeline_rank = 0
|
||||
rank = gpc.get_global_rank()
|
||||
parts = partition_uniform(num_layers, pipeline_size, num_chunks)[pipeline_rank]
|
||||
models = []
|
||||
|
||||
for start, end in parts:
|
||||
kwargs['first_stage'] = start == 0
|
||||
kwargs['last_stage'] = end == num_layers
|
||||
kwargs['start_idx'] = start
|
||||
kwargs['end_idx'] = end
|
||||
logger.info(f'Rank{rank} build layer {start}-{end}, {end-start}/{num_layers} layers')
|
||||
chunk = module_cls(**_filter_kwargs(module_cls.__init__, kwargs)).to(device)
|
||||
models.append(chunk)
|
||||
if len(models) == 1:
|
||||
model = models[0]
|
||||
else:
|
||||
model = nn.ModuleList(models)
|
||||
return model
|
||||
|
||||
|
||||
def build_pipeline_vit(num_layers, num_chunks, device=torch.device('cuda'), **kwargs):
|
||||
return _build_pipeline_vit(PipelineVisionTransformer, num_layers, num_chunks, device, **kwargs)
|
||||
```
|
||||
|
||||
### Modify train script (`/hybrid_parallel/train_with_cifar10.py`)
|
||||
|
||||
#### Import modules
|
||||
```python
|
||||
from colossalai.engine.schedule import (InterleavedPipelineSchedule,
|
||||
PipelineSchedule)
|
||||
from colossalai.utils import MultiTimer
|
||||
import os
|
||||
|
||||
import colossalai
|
||||
|
||||
import torch
|
||||
from colossalai.context import ParallelMode
|
||||
from colossalai.core import global_context as gpc
|
||||
from colossalai.logging import get_dist_logger
|
||||
from colossalai.nn import CrossEntropyLoss
|
||||
from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR
|
||||
from colossalai.utils import is_using_pp, get_dataloader
|
||||
from model.vit import build_pipeline_vit
|
||||
from model_zoo.vit.vit import _create_vit_model
|
||||
from tqdm import tqdm
|
||||
|
||||
from torchvision import transforms
|
||||
from torchvision.datasets import CIFAR10
|
||||
```
|
||||
|
||||
#### Launch Colossal-AI
|
||||
`colossalai.utils.is_using_pp` can help check whether pipeline parallelism is required in config file.
|
||||
|
||||
```python
|
||||
# initialize distributed setting
|
||||
parser = colossalai.get_default_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
# launch from torch
|
||||
colossalai.launch_from_torch(config=args.config)
|
||||
|
||||
# get logger
|
||||
logger = get_dist_logger()
|
||||
logger.info("initialized distributed environment", ranks=[0])
|
||||
|
||||
if hasattr(gpc.config, 'LOG_PATH'):
|
||||
if gpc.get_global_rank() == 0:
|
||||
log_path = gpc.config.LOG_PATH
|
||||
if not os.path.exists(log_path):
|
||||
os.mkdir(log_path)
|
||||
logger.log_to_file(log_path)
|
||||
|
||||
use_pipeline = is_using_pp()
|
||||
```
|
||||
|
||||
#### Define model
|
||||
|
||||
```python
|
||||
# create model
|
||||
model_kwargs = dict(img_size=gpc.config.IMG_SIZE,
|
||||
patch_size=gpc.config.PATCH_SIZE,
|
||||
dim=gpc.config.HIDDEN_SIZE,
|
||||
depth=gpc.config.DEPTH,
|
||||
num_heads=gpc.config.NUM_HEADS,
|
||||
mlp_ratio=gpc.config.MLP_RATIO,
|
||||
num_classes=gpc.config.NUM_CLASSES,
|
||||
init_method='jax',
|
||||
checkpoint=gpc.config.CHECKPOINT)
|
||||
|
||||
if use_pipeline:
|
||||
model = build_pipeline_vit(num_layers=model_kwargs['depth'], num_chunks=1, **model_kwargs)
|
||||
else:
|
||||
model = _create_vit_model(**model_kwargs)
|
||||
```
|
||||
|
||||
#### Count number of parameters
|
||||
|
||||
You can count model parameters on different pipeline stages easily.
|
||||
|
||||
```
|
||||
# count number of parameters
|
||||
total_numel = 0
|
||||
for p in model.parameters():
|
||||
total_numel += p.numel()
|
||||
if not gpc.is_initialized(ParallelMode.PIPELINE):
|
||||
pipeline_stage = 0
|
||||
else:
|
||||
pipeline_stage = gpc.get_local_rank(ParallelMode.PIPELINE)
|
||||
logger.info(f"number of parameters: {total_numel} on pipeline stage {pipeline_stage}")
|
||||
```
|
||||
|
||||
#### Build dataloader, optimizer, etc.
|
||||
|
||||
```python
|
||||
def build_cifar(batch_size):
|
||||
transform_train = transforms.Compose([
|
||||
transforms.RandomCrop(224, pad_if_needed=True),
|
||||
transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10),
|
||||
transforms.ToTensor(),
|
||||
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
|
||||
])
|
||||
transform_test = transforms.Compose([
|
||||
transforms.Resize(224),
|
||||
transforms.ToTensor(),
|
||||
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
|
||||
])
|
||||
|
||||
train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train)
|
||||
test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test)
|
||||
train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True)
|
||||
test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True)
|
||||
return train_dataloader, test_dataloader
|
||||
|
||||
|
||||
# craete dataloaders
|
||||
train_dataloader , test_dataloader = build_cifar()
|
||||
|
||||
# create loss function
|
||||
criterion = CrossEntropyLoss(label_smoothing=0.1)
|
||||
|
||||
# create optimizer
|
||||
optimizer = torch.optim.AdamW(model.parameters(), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY)
|
||||
|
||||
# create lr scheduler
|
||||
lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer,
|
||||
total_steps=gpc.config.NUM_EPOCHS,
|
||||
warmup_steps=gpc.config.WARMUP_EPOCHS)
|
||||
```
|
||||
|
||||
#### Start Colossal-AI engine
|
||||
|
||||
```python
|
||||
# intiailize
|
||||
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model=model,
|
||||
optimizer=optimizer,
|
||||
criterion=criterion,
|
||||
train_dataloader=train_dataloader,
|
||||
test_dataloader=test_dataloader)
|
||||
|
||||
logger.info("Engine is built", ranks=[0])
|
||||
```
|
||||
|
||||
#### Train: based on engine
|
||||
|
||||
In the data parallelism example, we show how to train a model with Trainer API. We can also directly train a model based on engine. In this way, you can customize your training with more features.
|
||||
|
||||
```python
|
||||
data_iter = iter(train_dataloader)
|
||||
|
||||
for epoch in range(gpc.config.NUM_EPOCHS):
|
||||
# training
|
||||
engine.train()
|
||||
|
||||
if gpc.get_global_rank() == 0:
|
||||
description = 'Epoch {} / {}'.format(
|
||||
epoch,
|
||||
gpc.config.NUM_EPOCHS
|
||||
)
|
||||
progress = tqdm(range(len(train_dataloader)), desc=description)
|
||||
else:
|
||||
progress = range(len(train_dataloader))
|
||||
for _ in progress:
|
||||
engine.zero_grad()
|
||||
engine.execute_schedule(data_iter, return_output_label=False)
|
||||
engine.step()
|
||||
lr_scheduler.step()
|
||||
```
|
||||
|
||||
### Start training
|
||||
```bash
|
||||
export DATA=<path_to_dataset>
|
||||
# If your torch >= 1.10.0
|
||||
torchrun --standalone --nproc_per_node <NUM_GPUs> train_hybrid.py --config ./configs/config_pipeline_parallel.py
|
||||
# If your torch >= 1.9.0
|
||||
# python -m torch.distributed.run --standalone --nproc_per_node= <NUM_GPUs> train_hybrid.py --config ./configs/config_pipeline_parallel.py
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
## Tensor Parallelism and Hybrid Parallelism
|
||||
Tensor parallelism partitions each weight parameter across multiple devices in order to reduce memory load. Colossal-AI support 1D, 2D, 2.5D and 3D tensor parallelism. Besides, you can combine tensor parallelism with pipeline parallelism and data parallelism to reach hybrid parallelism. Colossal-AI also provides an easy way to apply tensor parallelism and hybrid parallelism. On the basis of pipeline parallelism, a few lines of code changing in config file is all you need.
|
||||
|
||||
### Define your configuration file(`/hybrid_parallel/configs/vit_1d_tp2_pp2.py`)
|
||||
To use tensor parallelism, you only need to add related information to the **parallel dict**. To be specific, `TENSOR_PARALLEL_MODE` can be '1d', '2d', '2.5d', '3d'. And the size of different parallelism should satisfy: `#GPUs = pipeline parallel size x tensor parallel size x data parallel size`. `data parallel size` will automatically computed after you specify the number of GPUs, pipeline parallel size and tensor parallel size.
|
||||
|
||||
```python
|
||||
from colossalai.amp import AMP_TYPE
|
||||
# parallel setting
|
||||
TENSOR_PARALLEL_SIZE = 2
|
||||
TENSOR_PARALLEL_MODE = '1d'
|
||||
|
||||
parallel = dict(
|
||||
pipeline=2,
|
||||
tensor=dict(mode=TENSOR_PARALLEL_MODE, size=TENSOR_PARALLEL_SIZE)
|
||||
)
|
||||
|
||||
fp16 = dict(mode=AMP_TYPE.NAIVE)
|
||||
clip_grad_norm = 1.0
|
||||
|
||||
|
||||
# pipeline config
|
||||
NUM_MICRO_BATCHES = parallel['pipeline']
|
||||
TENSOR_SHAPE = (BATCH_SIZE // NUM_MICRO_BATCHES, SEQ_LENGTH, HIDDEN_SIZE)
|
||||
```
|
||||
|
||||
Ohter configs:
|
||||
```python
|
||||
# hyperparameters
|
||||
# BATCH_SIZE is as per GPU
|
||||
# global batch size = BATCH_SIZE x data parallel size
|
||||
BATCH_SIZE = 256
|
||||
LEARNING_RATE = 3e-3
|
||||
WEIGHT_DECAY = 0.3
|
||||
NUM_EPOCHS = 300
|
||||
WARMUP_EPOCHS = 32
|
||||
|
||||
# model config
|
||||
IMG_SIZE = 224
|
||||
PATCH_SIZE = 16
|
||||
HIDDEN_SIZE = 768
|
||||
DEPTH = 12
|
||||
NUM_HEADS = 12
|
||||
MLP_RATIO = 4
|
||||
NUM_CLASSES = 10
|
||||
CHECKPOINT = True
|
||||
SEQ_LENGTH = (IMG_SIZE // PATCH_SIZE) ** 2 + 1 # add 1 for cls token
|
||||
```
|
||||
|
||||
### Start training
|
||||
```bash
|
||||
export DATA=<path_to_dataset>
|
||||
# If your torch >= 1.10.0
|
||||
torchrun --standalone --nproc_per_node <NUM_GPUs> train_hybrid.py --config ./configs/config_hybrid_parallel.py
|
||||
# If your torch >= 1.9.0
|
||||
# python -m torch.distributed.run --standalone --nproc_per_node= <NUM_GPUs> train_hybrid.py --config ./configs/config_hybrid_parallel.py
|
||||
```
|
||||
Reference in New Issue
Block a user