From 3a15b204219e8edb3fb7b8b80957ca9741636cee Mon Sep 17 00:00:00 2001 From: Ziyue Jiang Date: Fri, 6 Jan 2023 14:48:58 +0800 Subject: [PATCH 1/3] Move GPT PP Example --- .../experiments/pipeline_parallel/README.md | 37 ++++ .../pipeline_parallel/model_zoo.py | 73 ++++++++ .../gpt/experiments/pipeline_parallel/run.sh | 7 + .../pipeline_parallel/train_gpt_pp.py | 161 ++++++++++++++++++ .../experiments/pipeline_parallel/utils.py | 12 ++ 5 files changed, 290 insertions(+) create mode 100644 examples/language/gpt/experiments/pipeline_parallel/README.md create mode 100644 examples/language/gpt/experiments/pipeline_parallel/model_zoo.py create mode 100644 examples/language/gpt/experiments/pipeline_parallel/run.sh create mode 100644 examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py create mode 100644 examples/language/gpt/experiments/pipeline_parallel/utils.py diff --git a/examples/language/gpt/experiments/pipeline_parallel/README.md b/examples/language/gpt/experiments/pipeline_parallel/README.md new file mode 100644 index 000000000..d158b088d --- /dev/null +++ b/examples/language/gpt/experiments/pipeline_parallel/README.md @@ -0,0 +1,37 @@ +# Auto-Parallelism with GPT2 + +## Requirements + +Before you can launch training, you need to install the following requirements. + +### Install PyTorch + +```bash +#conda +conda install pytorch==1.12.0 torchvision==0.13.0 torchaudio==0.12.0 cudatoolkit=11.3 -c pytorch +#pip +pip install torch==1.12.0+cu113 torchvision==0.13.0+cu113 torchaudio==0.12.0 --extra-index-url https://download.pytorch.org/whl/cu113 +``` + +### Install [Colossal-AI v0.2.0](https://colossalai.org/download/) From Official Website + +```bash +pip install colossalai==0.2.0+torch1.12cu11.3 -f https://release.colossalai.org +``` + +### Install transformers + +```bash +pip install transformers +``` + +## Dataset + +For simplicity, the input data is randonly generated here. + +## Training + +```bash +#Run the Pipeline Parallel on GPT with default setting and a dummy dataset. +bash run.sh +``` diff --git a/examples/language/gpt/experiments/pipeline_parallel/model_zoo.py b/examples/language/gpt/experiments/pipeline_parallel/model_zoo.py new file mode 100644 index 000000000..c31b3fa6d --- /dev/null +++ b/examples/language/gpt/experiments/pipeline_parallel/model_zoo.py @@ -0,0 +1,73 @@ +from torch import nn +from transformers import GPT2Config, GPT2LMHeadModel + + +## Define the Model and Loss Based on Huggingface transformers GPT2LMHeadModel +class GPTLMModel(nn.Module): + + def __init__(self, + hidden_size=768, + num_layers=12, + num_attention_heads=12, + max_seq_len=1024, + vocab_size=50257, + checkpoint=False): + super().__init__() + self.checkpoint = checkpoint + self.config = GPT2Config(n_embd=hidden_size, + n_layer=num_layers, + n_head=num_attention_heads, + n_positions=max_seq_len, + n_ctx=max_seq_len, + vocab_size=vocab_size) + self.model = GPT2LMHeadModel(self.config) + if checkpoint: + self.model.gradient_checkpointing_enable() + + def forward(self, input_ids, attention_mask): + # Only return lm_logits + return self.model(input_ids=input_ids, attention_mask=attention_mask, use_cache=not self.checkpoint)[0] + + +def gpt2_medium(checkpoint=False): + return GPTLMModel(hidden_size=1024, num_layers=24, num_attention_heads=16, checkpoint=checkpoint) + + +def gpt2_xl(checkpoint=True): + return GPTLMModel(hidden_size=1600, num_layers=48, num_attention_heads=32, checkpoint=checkpoint) + + +def gpt2_10b(checkpoint=True): + return GPTLMModel(hidden_size=4096, num_layers=50, num_attention_heads=16, checkpoint=checkpoint) + + +def gpt2_14b(checkpoint=True): + return GPTLMModel(hidden_size=4096, num_layers=70, num_attention_heads=16, checkpoint=checkpoint) + + +def gpt2_20b(checkpoint=True): + return GPTLMModel(hidden_size=8192, num_layers=25, num_attention_heads=16, checkpoint=checkpoint) + + +def gpt2_24b(checkpoint=True): + return GPTLMModel(hidden_size=8192, num_layers=30, num_attention_heads=16, checkpoint=checkpoint) + + +def model_builder(model_size: str) -> callable: + if model_size == "gpt2_medium": + return gpt2_medium + elif model_size == "gpt2_xl": + return gpt2_xl + elif model_size == "gpt2_10b": + return gpt2_10b + elif model_size == "gpt2_14b": + return gpt2_14b + elif model_size == "gpt2_20b": + return gpt2_20b + elif model_size == "gpt2_24b": + return gpt2_24b + else: + raise TypeError(f"model_builder {model_size}") + + +__all__ = ['model_builder'] diff --git a/examples/language/gpt/experiments/pipeline_parallel/run.sh b/examples/language/gpt/experiments/pipeline_parallel/run.sh new file mode 100644 index 000000000..235cefcbc --- /dev/null +++ b/examples/language/gpt/experiments/pipeline_parallel/run.sh @@ -0,0 +1,7 @@ +export GPUNUM=${GPUNUM:-4} +export BATCH_SIZE=${BATCH_SIZE:-16} +export MODEL_TYPE=${MODEL_TYPE:-"gpt2_medium"} +export NUM_MICROBATCH=${NUM_MICROBATCH:-8} + +mkdir -p pp_logs +python train_gpt_pp.py --device="cuda" --model_type=${MODEL_TYPE} --num_microbatches=${NUM_MICROBATCH} --world_size=${GPUNUM} --batch_size=${BATCH_SIZE} 2>&1 | tee ./pp_logs/${MODEL_TYPE}_gpu_${GPUNUM}_bs_${BATCH_SIZE}_nm_${NUM_MICROBATCH}.log diff --git a/examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py b/examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py new file mode 100644 index 000000000..79efa61b0 --- /dev/null +++ b/examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py @@ -0,0 +1,161 @@ +import argparse +import time +from functools import partial + +import torch +from model_zoo import model_builder +from torch import nn +from tqdm import tqdm + +from colossalai.fx import ColoTracer +from colossalai.fx.passes.adding_split_node_pass import avgnode_split_pass, split_with_split_nodes_pass +from colossalai.logging import disable_existing_loggers, get_dist_logger +from colossalai.nn.optimizer import HybridAdam +from colossalai.pipeline.middleware.adaptor import get_fx_topology +from colossalai.pipeline.rpc._pipeline_schedule import OneFOneBPipelineEngine +from colossalai.pipeline.rpc.utils import rpc_run + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument('--model_type', type=str, default="gpt2_medium") + parser.add_argument('--world_size', type=int, default=2) + parser.add_argument('--batch_size', type=int, default=16) + parser.add_argument('--dp_degree', type=int, default=1) + parser.add_argument('--tp_degree', type=int, default=1) + parser.add_argument('--num_microbatches', type=int, default=2) + parser.add_argument('--device', type=str, choices=['cpu', 'cuda'], default='cuda') + parser.add_argument('--master_addr', type=str, default='localhost') + parser.add_argument('--master_port', type=str, default='29011') + parser.add_argument('--num_worker_threads', type=int, default=128) + return parser.parse_args() + + +class GPTLMLoss(nn.Module): + + def __init__(self): + super().__init__() + self.loss_fn = nn.CrossEntropyLoss() + + def forward(self, logits, labels): + shift_logits = logits[..., :-1, :].contiguous() + shift_labels = labels[..., 1:].contiguous() + # Flatten the tokens + return self.loss_fn(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)) + + +# Randomly Generated Data +def get_data(batch_size, seq_len, vocab_size): + input_ids = torch.randint(0, vocab_size, (batch_size, seq_len), device=torch.cuda.current_device()) + attention_mask = torch.ones_like(input_ids) + return input_ids, attention_mask + + +def get_tflops(model_numel, batch_size, seq_len, step_time): + return model_numel * batch_size * seq_len * 8 / 1e12 / (step_time + 1e-12) + + +def create_partition_module(pp_rank: int, stage_num: int, model, data_kwargs): + tracer = ColoTracer() + meta_args = {k: v.to('meta') for k, v in data_kwargs.items()} + graph = tracer.trace(root=model, meta_args=meta_args) + gm = torch.fx.GraphModule(model, graph, model.__class__.__name__) + annotated_model = avgnode_split_pass(gm, stage_num) + + top_module, split_submodules = split_with_split_nodes_pass(annotated_model, merge_output=True) + topo = get_fx_topology(top_module) + for submodule in split_submodules: + if isinstance(submodule, torch.fx.GraphModule): + setattr(submodule, '_topo', topo) + return split_submodules[pp_rank + 1] + + +def partition(model, data_kwargs, pp_rank: int, chunk: int, stage_num: int): + module = create_partition_module(pp_rank, stage_num, model, data_kwargs) + return module + + +def run_master(args): + batch_size = args.batch_size + device = args.device + world_size = args.world_size + stage_num = world_size + num_microbatches = args.num_microbatches + model_type = args.model_type + # batch size per DP degree + SEQ_LEN = 1024 + VOCAB_SIZE = 50257 + NUM_STEPS = 10 + WARMUP_STEPS = 1 + + disable_existing_loggers() + logger = get_dist_logger() + logger.info(f"{args.model_type}, batch size {batch_size}, num stage {stage_num}, num microbatch {num_microbatches}", + ranks=[0]) + + torch.manual_seed(123) + + # build criterion + criterion = GPTLMLoss() + + # warm up pipeline fx partition + input_ids, attn_mask = get_data(batch_size, SEQ_LEN, VOCAB_SIZE) + warmup_data_kwargs = {'input_ids': input_ids, 'attention_mask': attn_mask} + + # create model + model = model_builder(model_type)(checkpoint=False) + + # set 1f1b pipeline engine + pp_engine = OneFOneBPipelineEngine(partition_fn=partial(partition, model, warmup_data_kwargs), + stage_num=stage_num, + num_microbatches=num_microbatches, + device=device, + chunk=1, + criterion=criterion, + metric=None, + checkpoint=False) + + partition_numels = pp_engine.remote_numels() + for rank, numel in partition_numels.items(): + logger.info(f'{rank=} numel in the partition:{numel}') + + # build optim + pp_engine.initialize_optimizer(HybridAdam, lr=1e-3) + + ranks_tflops = {} + for n in range(NUM_STEPS): + # we just use randomly generated data here + input_ids, attn_mask = get_data(batch_size, SEQ_LEN, VOCAB_SIZE) + batch = {'input_ids': input_ids, 'attention_mask': attn_mask} + + start = time.time() + outputs = pp_engine.forward_backward(batch=batch, labels=input_ids, forward_only=False) + step_time = time.time() - start + + for rank, numel in partition_numels.items(): + if rank not in ranks_tflops: + ranks_tflops[rank] = [] + step_tflops = get_tflops(numel, batch_size, SEQ_LEN, step_time) + + logger.info( + f"Rank{rank} , [{n + 1}/{NUM_STEPS}] , Step time: {step_time:.3f}s, TFLOPS: {get_tflops(numel, batch_size, SEQ_LEN, step_time):.3f}", + ranks=[0], + ) + + if n >= WARMUP_STEPS: + ranks_tflops[rank].append(step_tflops) + + median_index = ((NUM_STEPS - WARMUP_STEPS) >> 1) + WARMUP_STEPS + gpu_tflops = [] + for rank, tflops_list in ranks_tflops.items(): + tflops_list.sort() + gpu_tflops.append(tflops_list[median_index]) + logger.info(f"GPU{rank} Median TFLOPS is {tflops_list[median_index]:.3f}") + + logger.info(f"Total TFLOPS is {sum(gpu_tflops):.3f}") + logger.info(f"Avg TFLOPS per GPU is {sum(gpu_tflops) / world_size:.3f}") + + +if __name__ == '__main__': + args = parse_args() + rpc_run(args, run_master) diff --git a/examples/language/gpt/experiments/pipeline_parallel/utils.py b/examples/language/gpt/experiments/pipeline_parallel/utils.py new file mode 100644 index 000000000..782f546dc --- /dev/null +++ b/examples/language/gpt/experiments/pipeline_parallel/utils.py @@ -0,0 +1,12 @@ +import torch + + +# Randomly Generated Data +def get_data(batch_size, seq_len, vocab_size): + input_ids = torch.randint(0, vocab_size, (batch_size, seq_len), device=torch.cuda.current_device()) + attention_mask = torch.ones_like(input_ids) + return input_ids, attention_mask + + +def get_tflops(model_numel, batch_size, seq_len, step_time): + return model_numel * batch_size * seq_len * 8 / 1e12 / (step_time + 1e-12) From 9ae9e74017c16df1d7686b7a8b276631f92032fe Mon Sep 17 00:00:00 2001 From: Ziyue Jiang Date: Fri, 6 Jan 2023 15:59:06 +0800 Subject: [PATCH 2/3] fix diff device in some partition --- colossalai/pipeline/rpc/_pipeline_base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/colossalai/pipeline/rpc/_pipeline_base.py b/colossalai/pipeline/rpc/_pipeline_base.py index 2a7998c14..4739cdaa9 100644 --- a/colossalai/pipeline/rpc/_pipeline_base.py +++ b/colossalai/pipeline/rpc/_pipeline_base.py @@ -789,6 +789,8 @@ class WorkerBase(ABC): args_kwargs = pyobj_map(args_kwargs, fn=lambda x: x.to(self.device).detach(), process_types=torch.Tensor) # torch rpc doesn't support args or rets in GPU + args_kwargs = pyobj_map(args_kwargs, fn=lambda x: self.device, + process_types=torch.device) # change devices from last stage to current device args, kwargs = data_process_func(args_kwargs) From ad00894f7f37c370cb9db162e727302ec633c0f0 Mon Sep 17 00:00:00 2001 From: Ziyue Jiang Date: Fri, 6 Jan 2023 16:03:16 +0800 Subject: [PATCH 3/3] polish --- .../gpt/experiments/pipeline_parallel/README.md | 3 ++- .../gpt/experiments/pipeline_parallel/utils.py | 12 ------------ 2 files changed, 2 insertions(+), 13 deletions(-) delete mode 100644 examples/language/gpt/experiments/pipeline_parallel/utils.py diff --git a/examples/language/gpt/experiments/pipeline_parallel/README.md b/examples/language/gpt/experiments/pipeline_parallel/README.md index d158b088d..702e3c8d6 100644 --- a/examples/language/gpt/experiments/pipeline_parallel/README.md +++ b/examples/language/gpt/experiments/pipeline_parallel/README.md @@ -1,4 +1,4 @@ -# Auto-Parallelism with GPT2 +# Pipeline Parallelism Demo with GPT2 ## Requirements @@ -33,5 +33,6 @@ For simplicity, the input data is randonly generated here. ```bash #Run the Pipeline Parallel on GPT with default setting and a dummy dataset. +#You can change the GPU number or microbatch number in the run.sh . bash run.sh ``` diff --git a/examples/language/gpt/experiments/pipeline_parallel/utils.py b/examples/language/gpt/experiments/pipeline_parallel/utils.py deleted file mode 100644 index 782f546dc..000000000 --- a/examples/language/gpt/experiments/pipeline_parallel/utils.py +++ /dev/null @@ -1,12 +0,0 @@ -import torch - - -# Randomly Generated Data -def get_data(batch_size, seq_len, vocab_size): - input_ids = torch.randint(0, vocab_size, (batch_size, seq_len), device=torch.cuda.current_device()) - attention_mask = torch.ones_like(input_ids) - return input_ids, attention_mask - - -def get_tflops(model_numel, batch_size, seq_len, step_time): - return model_numel * batch_size * seq_len * 8 / 1e12 / (step_time + 1e-12)