[example] reorganize for community examples (#3557)

This commit is contained in:
binmakeswell
2023-04-14 16:27:48 +08:00
committed by GitHub
parent 1a809eddaa
commit f1b3d60cae
31 changed files with 785 additions and 844 deletions

View File

@@ -0,0 +1,23 @@
# Pretraining
1. Pretraining roberta through running the script below. Detailed parameter descriptions can be found in the arguments.py. `data_path_prefix` is absolute path specifies output of preprocessing. **You have to modify the *hostfile* according to your cluster.**
```bash
bash run_pretrain.sh
```
* `--hostfile`: servers' host name from /etc/hosts
* `--include`: servers which will be used
* `--nproc_per_node`: number of process(GPU) from each server
* `--data_path_prefix`: absolute location of train data, e.g., /h5/0.h5
* `--eval_data_path_prefix`: absolute location of eval data
* `--tokenizer_path`: tokenizer path contains huggingface tokenizer.json, e.g./tokenizer/tokenizer.json
* `--bert_config`: config.json which represent model
* `--mlm`: model type of backbone, bert or deberta_v2
2. if resume training from earylier checkpoint, run the script below.
```shell
bash run_pretrain_resume.sh
```
* `--resume_train`: whether to resume training
* `--load_pretrain_model`: absolute path which contains model checkpoint
* `--load_optimizer_lr`: absolute path which contains optimizer checkpoint

View File

@@ -0,0 +1,87 @@
from numpy import require
import colossalai
__all__ = ['parse_args']
def parse_args():
parser = colossalai.get_default_parser()
parser.add_argument(
"--distplan",
type=str,
default='CAI_Gemini',
help="The distributed plan [colossalai, zero1, zero2, torch_ddp, torch_zero].",
)
parser.add_argument(
"--tp_degree",
type=int,
default=1,
help="Tensor Parallelism Degree. Valid when using colossalai as dist plan.",
)
parser.add_argument(
"--placement",
type=str,
default='cpu',
help="Placement Policy for Gemini. Valid when using colossalai as dist plan.",
)
parser.add_argument(
"--shardinit",
action='store_true',
help=
"Shard the tensors when init the model to shrink peak memory size on the assigned device. Valid when using colossalai as dist plan.",
)
parser.add_argument('--lr', type=float, required=True, help='initial learning rate')
parser.add_argument('--epoch', type=int, required=True, help='number of epoch')
parser.add_argument('--data_path_prefix', type=str, required=True, help="location of the train data corpus")
parser.add_argument('--eval_data_path_prefix',
type=str,
required=True,
help='location of the evaluation data corpus')
parser.add_argument('--tokenizer_path', type=str, required=True, help='location of the tokenizer')
parser.add_argument('--max_seq_length', type=int, default=512, help='sequence length')
parser.add_argument('--refresh_bucket_size',
type=int,
default=1,
help="This param makes sure that a certain task is repeated for this time steps to \
optimise on the back propogation speed with APEX's DistributedDataParallel")
parser.add_argument("--max_predictions_per_seq",
"--max_pred",
default=80,
type=int,
help="The maximum number of masked tokens in a sequence to be predicted.")
parser.add_argument("--gradient_accumulation_steps", default=1, type=int, help="accumulation_steps")
parser.add_argument("--train_micro_batch_size_per_gpu", default=2, type=int, required=True, help="train batch size")
parser.add_argument("--eval_micro_batch_size_per_gpu", default=2, type=int, required=True, help="eval batch size")
parser.add_argument("--num_workers", default=8, type=int, help="")
parser.add_argument("--async_worker", action='store_true', help="")
parser.add_argument("--bert_config", required=True, type=str, help="location of config.json")
parser.add_argument("--wandb", action='store_true', help="use wandb to watch model")
parser.add_argument("--wandb_project_name", default='roberta', help="wandb project name")
parser.add_argument("--log_interval", default=100, type=int, help="report interval")
parser.add_argument("--log_path", type=str, required=True, help="log file which records train step")
parser.add_argument("--tensorboard_path", type=str, required=True, help="location of tensorboard file")
parser.add_argument("--colossal_config",
type=str,
required=True,
help="colossal config, which contains zero config and so on")
parser.add_argument("--ckpt_path",
type=str,
required=True,
help="location of saving checkpoint, which contains model and optimizer")
parser.add_argument('--seed', type=int, default=42, help="random seed for initialization")
parser.add_argument('--vscode_debug', action='store_true', help="use vscode to debug")
parser.add_argument('--load_pretrain_model', default='', type=str, help="location of model's checkpoin")
parser.add_argument(
'--load_optimizer_lr',
default='',
type=str,
help="location of checkpoint, which contains optimerzier, learning rate, epoch, shard and global_step")
parser.add_argument('--resume_train', action='store_true', help="whether resume training from a early checkpoint")
parser.add_argument('--mlm', default='bert', type=str, help="model type, bert or deberta")
parser.add_argument('--checkpoint_activations', action='store_true', help="whether to use gradient checkpointing")
args = parser.parse_args()
return args

View File

@@ -0,0 +1,16 @@
class BertDatasetProviderInterface:
def get_shard(self, index, shuffle=True):
raise NotImplementedError
def release_shard(self, index):
raise NotImplementedError
def prefetch_shard(self, index):
raise NotImplementedError
def get_batch(self, batch_iter):
raise NotImplementedError
def prefetch_batch(self):
raise NotImplementedError

View File

@@ -0,0 +1,76 @@
import math
import os
import torch
from nvidia_bert_dataset_provider import NvidiaBertDatasetProvider
from tqdm import tqdm
from utils.global_vars import get_tensorboard_writer, get_timers
def evaluate(model, args, logger, global_step, criterion):
evaluate_dataset_provider = NvidiaBertDatasetProvider(args, evaluate=True)
start_shard = 0
model.eval()
timers = get_timers()
eval_step = 0
eval_loss = 0
cur_loss = 0
world_size = torch.distributed.get_world_size()
with torch.no_grad():
for shard in range(start_shard, len(os.listdir(args.eval_data_path_prefix))):
timers('eval_shard_time').start()
dataset_iterator, total_length = evaluate_dataset_provider.get_shard(shard)
# evaluate_dataset_provider.prefetch_shard(shard + 1)
if torch.distributed.get_rank() == 0:
iterator_data = tqdm(enumerate(dataset_iterator),
total=(total_length // args.eval_micro_batch_size_per_gpu // world_size),
colour='MAGENTA',
smoothing=1)
else:
iterator_data = enumerate(dataset_iterator)
for step, batch_data in iterator_data: #tqdm(enumerate(dataset_iterator), total=(total_length // args.train_micro_batch_size_per_gpu // world_size), colour='cyan', smoothing=1):
# batch_data = pretrain_dataset_provider.get_batch(batch_index)
eval_step += 1
input_ids = batch_data[0].cuda()
attention_mask = batch_data[1].cuda()
token_type_ids = batch_data[2].cuda()
mlm_label = batch_data[3].cuda()
# nsp_label = batch_data[5].cuda()
output = model(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
loss = criterion(output.logits, mlm_label) #prediction_scores
evaluate_dataset_provider.prefetch_batch()
eval_loss += loss.float().item()
cur_loss = eval_loss / eval_step
elapsed_time = timers("eval_shard_time").elapsed()
elapsed_time_per_iteration = elapsed_time / eval_step
ppl = math.exp(cur_loss)
if args.wandb and torch.distributed.get_rank() == 0:
tensorboard_log = get_tensorboard_writer()
tensorboard_log.log_eval({
'loss': cur_loss,
'ppl': ppl,
'mins_batch': elapsed_time_per_iteration
}, global_step)
eval_log_str = f'evaluation shard: {shard} | step: {eval_step} | elapsed_time: {elapsed_time / 60 :.3f} minutes ' + \
f'| mins/batch: {elapsed_time_per_iteration :.3f} seconds | loss: {cur_loss:.7f} | ppl: {ppl:.7f}'
logger.info(eval_log_str)
logger.info('-' * 100)
logger.info('')
evaluate_dataset_provider.release_shard()
model.train()
return cur_loss

View File

@@ -0,0 +1,10 @@
GPU001
GPU002
GPU003
GPU004
GPU005
GPU006
GPU007
GPU008
GPU009
GPU010

View File

@@ -0,0 +1,17 @@
import torch
__all__ = ['LossForPretraining']
class LossForPretraining(torch.nn.Module):
def __init__(self, vocab_size):
super(LossForPretraining, self).__init__()
self.loss_fn = torch.nn.CrossEntropyLoss(ignore_index=-1)
self.vocab_size = vocab_size
def forward(self, prediction_scores, masked_lm_labels, next_sentence_labels=None):
masked_lm_loss = self.loss_fn(prediction_scores.view(-1, self.vocab_size), masked_lm_labels.view(-1))
# next_sentence_loss = self.loss_fn(seq_relationship_score.view(-1, 2), next_sentence_labels.view(-1))
total_loss = masked_lm_loss #+ next_sentence_loss
return total_loss

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,170 @@
import json
import logging
import os
import random
import time
from concurrent.futures import ProcessPoolExecutor
import h5py
import numpy as np
import torch
import torch.distributed as dist
from bert_dataset_provider import BertDatasetProviderInterface
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data.sampler import RandomSampler
import colossalai.utils as utils
# Workaround because python functions are not picklable
class WorkerInitObj(object):
def __init__(self, seed):
self.seed = seed
def __call__(self, id):
np.random.seed(seed=self.seed + id)
random.seed(self.seed + id)
def create_pretraining_dataset(input_file, max_predictions_per_seq, num_workers, train_batch_size, worker_init,
data_sampler):
train_data = pretraining_dataset(input_file=input_file, max_predictions_per_seq=max_predictions_per_seq)
train_dataloader = DataLoader(train_data,
sampler=data_sampler(train_data),
batch_size=train_batch_size,
num_workers=num_workers,
worker_init_fn=worker_init,
pin_memory=True)
return train_dataloader, len(train_data)
class pretraining_dataset(Dataset):
def __init__(self, input_file, max_predictions_per_seq):
self.input_file = input_file
self.max_predictions_per_seq = max_predictions_per_seq
f = h5py.File(input_file, "r")
keys = ['input_ids', 'input_mask', 'segment_ids', 'masked_lm_positions']
self.inputs = [np.asarray(f[key][:]) for key in keys]
f.close()
def __len__(self):
'Denotes the total number of samples'
return len(self.inputs[0])
def __getitem__(self, index):
[input_ids, input_mask, segment_ids, masked_lm_labels] = [
torch.from_numpy(input[index].astype(np.int64)) if indice < 5 else torch.from_numpy(
np.asarray(input[index].astype(np.int64))) for indice, input in enumerate(self.inputs)
]
return [input_ids, input_mask, segment_ids, masked_lm_labels]
class NvidiaBertDatasetProvider(BertDatasetProviderInterface):
def __init__(self, args, evaluate=False):
self.num_workers = args.num_workers
self.max_seq_length = args.max_seq_length
self.max_predictions_per_seq = args.max_predictions_per_seq
self.gradient_accumulation_steps = args.gradient_accumulation_steps
if not evaluate:
self.train_micro_batch_size_per_gpu = args.train_micro_batch_size_per_gpu
else:
self.train_micro_batch_size_per_gpu = args.eval_micro_batch_size_per_gpu
self.logger = args.logger
self.global_rank = dist.get_rank()
self.world_size = dist.get_world_size()
# Initialize dataset files
if not evaluate:
self.dataset_files = [
os.path.join(args.data_path_prefix, f)
for f in os.listdir(args.data_path_prefix)
if os.path.isfile(os.path.join(args.data_path_prefix, f)) and 'h5' in f
]
else:
self.dataset_files = [
os.path.join(args.eval_data_path_prefix, f)
for f in os.listdir(args.eval_data_path_prefix)
if os.path.isfile(os.path.join(args.eval_data_path_prefix, f)) and 'h5' in f
]
self.dataset_files.sort()
# random.shuffle(self.dataset_files)
self.num_files = len(self.dataset_files)
# self.data_sampler = RandomSampler
self.data_sampler = DistributedSampler
self.worker_init = WorkerInitObj(args.seed + args.local_rank)
self.dataset_future = None
self.pool = ProcessPoolExecutor(1)
self.data_file = None
self.shuffle = True
if self.global_rank == 0:
self.logger.info(f"NvidiaBertDatasetProvider - Initialization: num_files = {self.num_files}")
def get_shard(self, index):
start = time.time()
if self.dataset_future is None:
self.data_file = self._get_shard_file(index)
self.train_dataloader, sample_count = create_pretraining_dataset(
input_file=self.data_file,
max_predictions_per_seq=self.max_predictions_per_seq,
num_workers=self.num_workers,
train_batch_size=self.train_micro_batch_size_per_gpu,
worker_init=self.worker_init,
data_sampler=self.data_sampler)
else:
self.train_dataloader, sample_count = self.dataset_future.result(timeout=None)
self.logger.info(
f"Data Loading Completed for Pretraining Data from {self.data_file} with {sample_count} samples took {time.time()-start:.2f}s."
)
return self.train_dataloader, sample_count
def release_shard(self):
del self.train_dataloader
self.pool.shutdown()
def prefetch_shard(self, index):
self.data_file = self._get_shard_file(index)
self.dataset_future = self.pool.submit(create_pretraining_dataset, self.data_file, self.max_predictions_per_seq,
self.num_workers, self.train_micro_batch_size_per_gpu, self.worker_init,
self.data_sampler)
def get_batch(self, batch_iter):
return batch_iter
def prefetch_batch(self):
pass
def _get_shard_file(self, shard_index):
file_index = self._get_shard_file_index(shard_index, self.global_rank)
return self.dataset_files[file_index]
def _get_shard_file_index(self, shard_index, global_rank):
# if dist.is_initialized() and self.world_size > self.num_files:
# remainder = self.world_size % self.num_files
# file_index = (shard_index * self.world_size) + global_rank + (
# remainder * shard_index)
# else:
# file_index = shard_index * self.world_size + global_rank
return shard_index % self.num_files
def shuffle_dataset(self, epoch):
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(self.num_files, generator=g).tolist()
new_dataset = [self.dataset_files[i] for i in indices]
self.dataset_files = new_dataset

View File

@@ -0,0 +1,124 @@
import logging
import os
import sys
import torch
import transformers
from torch.optim import AdamW
from transformers import (
AutoModelForMaskedLM,
AutoTokenizer,
BertForPreTraining,
GPT2Config,
GPT2LMHeadModel,
RobertaConfig,
RobertaForMaskedLM,
get_linear_schedule_with_warmup,
)
from colossalai.core import global_context as gpc
from colossalai.nn.lr_scheduler import LinearWarmupLR
from colossalai.nn.optimizer import FusedAdam, HybridAdam
sys.path.append(os.getcwd())
from collections import OrderedDict
import torch.nn as nn
from model.bert import BertForMaskedLM
from model.deberta_v2 import DebertaV2ForMaskedLM
__all__ = ['get_model', 'get_optimizer', 'get_lr_scheduler', 'get_dataloader_for_pretraining']
def get_new_state_dict(state_dict, start_index=13):
new_state_dict = OrderedDict()
for k, v in state_dict.items():
name = k[start_index:]
new_state_dict[name] = v
return new_state_dict
class LMModel(nn.Module):
def __init__(self, model, config, args):
super().__init__()
self.checkpoint = args.checkpoint_activations
self.config = config
self.model = model
if self.checkpoint:
self.model.gradient_checkpointing_enable()
def forward(self, input_ids, token_type_ids=None, attention_mask=None):
# Only return lm_logits
return self.model(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
def get_model(args, logger):
if args.mlm == 'bert':
config = transformers.BertConfig.from_json_file(args.bert_config)
model = BertForMaskedLM(config)
elif args.mlm == 'deberta_v2':
config = transformers.DebertaV2Config.from_json_file(args.bert_config)
model = DebertaV2ForMaskedLM(config)
else:
raise Exception("Invalid mlm!")
if len(args.load_pretrain_model) > 0:
assert os.path.exists(args.load_pretrain_model)
# load_checkpoint(args.load_pretrain_model, model, strict=False)
m_state_dict = torch.load(args.load_pretrain_model,
map_location=torch.device(f"cuda:{torch.cuda.current_device()}"))
# new_state_dict = get_new_state_dict(m_state_dict)
model.load_state_dict(m_state_dict,
strict=True) # must insure that every process have identical parameters !!!!!!!
logger.info("load model success")
numel = sum([p.numel() for p in model.parameters()])
if args.checkpoint_activations:
model.gradient_checkpointing_enable()
# model = LMModel(model, config, args)
return config, model, numel
def get_optimizer(model, lr):
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'gamma', 'beta', 'LayerNorm']
# configure the weight decay for bert models
optimizer_grouped_parameters = [{
'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
'weight_decay': 0.1
}, {
'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)],
'weight_decay': 0.0
}]
optimizer = HybridAdam(optimizer_grouped_parameters, lr=lr, betas=[0.9, 0.95])
return optimizer
def get_lr_scheduler(optimizer, total_steps, warmup_steps=2000, last_epoch=-1):
# warmup_steps = int(total_steps * warmup_ratio)
lr_scheduler = get_linear_schedule_with_warmup(optimizer,
num_warmup_steps=warmup_steps,
num_training_steps=total_steps,
last_epoch=last_epoch)
# lr_scheduler = LinearWarmupLR(optimizer, total_steps=total_steps, warmup_steps=warmup_steps)
return lr_scheduler
def save_ckpt(model, optimizer, lr_scheduler, path, epoch, shard, global_step):
model_path = path + '_pytorch_model.bin'
optimizer_lr_path = path + '.op_lrs'
checkpoint = {}
checkpoint['optimizer'] = optimizer.state_dict()
checkpoint['lr_scheduler'] = lr_scheduler.state_dict()
checkpoint['epoch'] = epoch
checkpoint['shard'] = shard
checkpoint['global_step'] = global_step
model_state = model.state_dict() #each process must run model.state_dict()
if gpc.get_global_rank() == 0:
torch.save(checkpoint, optimizer_lr_path)
torch.save(model_state, model_path)

View File

@@ -0,0 +1,37 @@
#!/usr/bin/env sh
root_path=$PWD
PY_FILE_PATH="$root_path/run_pretraining.py"
tensorboard_path="$root_path/tensorboard"
log_path="$root_path/exp_log"
ckpt_path="$root_path/ckpt"
mkdir -p $tensorboard_path
mkdir -p $log_path
mkdir -p $ckpt_path
export PYTHONPATH=$PWD
env OMP_NUM_THREADS=40 colossalai run --hostfile ./hostfile \
--include GPU002,GPU003,GPU004,GPU007 \
--nproc_per_node=8 \
$PY_FILE_PATH \
--master_addr GPU007 \
--master_port 20024 \
--lr 2.0e-4 \
--train_micro_batch_size_per_gpu 190 \
--eval_micro_batch_size_per_gpu 20 \
--epoch 15 \
--data_path_prefix /h5 \
--eval_data_path_prefix /eval_h5 \
--tokenizer_path /roberta \
--bert_config /roberta/config.json \
--tensorboard_path $tensorboard_path \
--log_path $log_path \
--ckpt_path $ckpt_path \
--log_interval 50 \
--mlm bert \
--wandb \
--checkpoint_activations \

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env sh
root_path=$PWD
PY_FILE_PATH="$root_path/run_pretraining.py"
tensorboard_path="$root_path/tensorboard"
log_path="$root_path/exp_log"
ckpt_path="$root_path/ckpt"
mkdir -p $tensorboard_path
mkdir -p $log_path
mkdir -p $ckpt_path
export PYTHONPATH=$PWD
env OMP_NUM_THREADS=40 colossalai run --hostfile ./hostfile \
--include GPU002,GPU003,GPU004,GPU007 \
--nproc_per_node=8 \
$PY_FILE_PATH \
--master_addr GPU007 \
--master_port 20024 \
--lr 2.0e-4 \
--train_micro_batch_size_per_gpu 190 \
--eval_micro_batch_size_per_gpu 20 \
--epoch 15 \
--data_path_prefix /h5 \
--eval_data_path_prefix /eval_h5 \
--tokenizer_path /roberta \
--bert_config /roberta/config.json \
--tensorboard_path $tensorboard_path \
--log_path $log_path \
--ckpt_path $ckpt_path \
--log_interval 50 \
--mlm bert \
--wandb \
--checkpoint_activations \
--resume_train \
--load_pretrain_model /ckpt/1.pt \
--load_optimizer_lr /ckpt/1.op_lrs \

View File

@@ -0,0 +1,263 @@
import math
import os
import time
from functools import partial
import torch
from arguments import parse_args
from evaluation import evaluate
from loss import LossForPretraining
from nvidia_bert_dataset_provider import NvidiaBertDatasetProvider
from pretrain_utils import get_lr_scheduler, get_model, get_optimizer, save_ckpt
from tqdm import tqdm
from transformers import AutoTokenizer
from utils.exp_util import get_mem_info, get_tflops, log_args, throughput_calculator
from utils.global_vars import get_tensorboard_writer, get_timers, set_global_variables
from utils.logger import Logger
import colossalai
from colossalai.context import ParallelMode
from colossalai.core import global_context as gpc
from colossalai.nn.parallel import GeminiDDP, zero_model_wrapper, zero_optim_wrapper
from colossalai.tensor import ColoParameter, ComputePattern, ComputeSpec, ProcessGroup, ReplicaSpec, ShardSpec
from colossalai.utils import get_current_device
from colossalai.utils.model.colo_init_context import ColoInitContext
from colossalai.zero import ZeroOptimizer
def main():
args = parse_args()
launch_time = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime())
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_path)
# os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
logger = Logger(os.path.join(args.log_path, launch_time), cuda=torch.cuda.is_available(), debug=args.vscode_debug)
if args.vscode_debug:
colossalai.launch(config={},
rank=args.rank,
world_size=args.world_size,
host=args.host,
port=args.port,
backend=args.backend)
args.local_rank = -1
args.log_interval = 1
else:
colossalai.launch_from_torch(config={}) #args.colossal_config
args.local_rank = int(os.environ["LOCAL_RANK"])
logger.info(
f'launch_from_torch, world size: {torch.distributed.get_world_size()} | ' +
f'ParallelMode.MODEL: {ParallelMode.MODEL} | ParallelMode.DATA: {ParallelMode.DATA} | ParallelMode.TENSOR: {ParallelMode.TENSOR}'
)
log_args(logger, args)
args.tokenizer = tokenizer
args.logger = logger
set_global_variables(launch_time, args.tensorboard_path)
world_size = torch.distributed.get_world_size()
init_dev = get_current_device()
# build model, optimizer and criterion
if args.distplan.startswith("CAI"):
# all param must use the same process group.
world_size = torch.distributed.get_world_size()
shard_pg = ProcessGroup(tp_degree=world_size) if args.shardinit else None
default_dist_spec = ShardSpec([-1], [world_size]) if args.shardinit else None
if args.shardinit and args.distplan != "CAI_Gemini":
raise RuntimeError("You can only use shardinit with CAI_Gemini")
# build GPT model
with ColoInitContext(device=get_current_device(),
dtype=torch.half,
default_dist_spec=default_dist_spec,
default_pg=shard_pg):
config, model, numel = get_model(args, logger)
# asign running configurations
gemini_config = None
if args.distplan.startswith("CAI_ZeRO"):
optim_config = dict(reduce_bucket_size=12 * 1024 * 1024, overlap_communication=True, verbose=True)
elif args.distplan == "CAI_Gemini":
gemini_config = dict(strict_ddp_mode=args.tp_degree == 1,
device=get_current_device(),
placement_policy=args.placement,
pin_memory=True,
hidden_dim=model.config.hidden_size,
search_range_mb=128)
optim_config = dict(gpu_margin_mem_ratio=0.)
else:
raise RuntimeError
# build a highly optimized gpu/cpu optimizer
optimizer = get_optimizer(model, lr=args.lr)
if args.distplan == "CAI_ZeRO1":
zero_stage = 1
elif args.distplan == "CAI_ZeRO2":
zero_stage = 2
elif args.distplan == "CAI_Gemini":
zero_stage = 3
else:
raise RuntimeError
# wrap your model and optimizer
model = zero_model_wrapper(model, zero_stage, gemini_config)
optimizer = zero_optim_wrapper(model, optimizer, optim_config=optim_config)
logger.info(get_mem_info(prefix='After init optim, '))
else:
config, model, numel = get_model(args, logger)
logger.info("no_zero")
if torch.distributed.get_rank() == 0:
os.mkdir(os.path.join(args.ckpt_path, launch_time))
logger.info(f'Model numel: {numel}')
get_tflops_func = partial(get_tflops, numel, args.train_micro_batch_size_per_gpu, args.max_seq_length)
# 144003367 is is the length of the entire dataset
steps_per_epoch = 144003367 // world_size // args.train_micro_batch_size_per_gpu // args.gradient_accumulation_steps // args.refresh_bucket_size #len(dataloader)
total_steps = steps_per_epoch * args.epoch
lr_scheduler = get_lr_scheduler(optimizer, total_steps=total_steps, last_epoch=-1)
start_epoch = 0
start_shard = 0
global_step = 0
if args.resume_train:
assert os.path.exists(args.load_optimizer_lr)
o_l_state_dict = torch.load(args.load_optimizer_lr, map_location='cpu')
o_l_state_dict['lr_scheduler']['last_epoch'] = o_l_state_dict['lr_scheduler']['last_epoch'] - 1
optimizer.load_state_dict(o_l_state_dict['optimizer'])
# o_l_state_dict['lr_scheduler']['last_epoch']
lr_scheduler = get_lr_scheduler(optimizer,
total_steps=total_steps,
last_epoch=o_l_state_dict['lr_scheduler']['last_epoch'])
for state in optimizer.state.values():
for k, v in state.items():
if isinstance(v, torch.Tensor):
state[k] = v.cuda(f"cuda:{torch.cuda.current_device()}")
# if you want delete the above three code, must move the model to gpu. Because in optimizer.step()
lr_scheduler.load_state_dict(o_l_state_dict['lr_scheduler'])
start_epoch = o_l_state_dict['epoch']
start_shard = o_l_state_dict['shard'] + 1
# global_step = o_l_state_dict['global_step'] + 1
logger.info(
f'resume from epoch {start_epoch} shard {start_shard} step {lr_scheduler.last_epoch} lr {lr_scheduler.get_last_lr()[0]}'
)
criterion = LossForPretraining(config.vocab_size)
# build dataloader
pretrain_dataset_provider = NvidiaBertDatasetProvider(args)
logger.info(get_mem_info(prefix='After init model, '))
best_loss = None
eval_loss = 0
train_loss = 0
timers = get_timers()
timers('interval_time').start()
timers('epoch_time').start()
timers('shard_time').start()
for epoch in range(start_epoch, args.epoch):
for shard in range(start_shard, len(os.listdir(args.data_path_prefix))):
dataset_iterator, total_length = pretrain_dataset_provider.get_shard(shard)
# pretrain_dataset_provider.prefetch_shard(shard + 1) # may cause cpu memory overload
if torch.distributed.get_rank() == 0:
iterator_data = tqdm(enumerate(dataset_iterator),
total=(total_length // args.train_micro_batch_size_per_gpu // world_size),
colour='cyan',
smoothing=1)
else:
iterator_data = enumerate(dataset_iterator)
model.train()
for step, batch_data in iterator_data:
# batch_data = pretrain_dataset_provider.get_batch(batch_index)
input_ids = batch_data[0].cuda(f"cuda:{torch.cuda.current_device()}")
attention_mask = batch_data[1].cuda(f"cuda:{torch.cuda.current_device()}")
token_type_ids = batch_data[2].cuda(f"cuda:{torch.cuda.current_device()}")
mlm_label = batch_data[3].cuda(f"cuda:{torch.cuda.current_device()}")
# nsp_label = batch_data[5].cuda()
output = model(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
loss = criterion(output.logits, mlm_label)
pretrain_dataset_provider.prefetch_batch()
optimizer.backward(loss)
train_loss += loss.float().item()
# if (step + 1) % args.accumulation_step == 0:
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
global_step += 1
if global_step % args.log_interval == 0 and global_step != 0 \
and torch.distributed.get_rank() == 0:
elapsed_time = timers('interval_time').elapsed(reset=False)
elapsed_time_per_iteration = elapsed_time / global_step
samples_per_sec, tflops, approx_parameters_in_billions = throughput_calculator(
numel, args, config, elapsed_time, global_step, world_size)
cur_loss = train_loss / args.log_interval
current_lr = lr_scheduler.get_last_lr()[0]
log_str = f'| epoch: {epoch} | shard: {shard} | step: {global_step} | lr {current_lr:.7f} | elapsed_time: {elapsed_time / 60 :.3f} minutes ' + \
f'| mins/batch: {elapsed_time_per_iteration :.3f} seconds | loss: {cur_loss:.7f} | ppl: {math.exp(cur_loss):.3f} | TFLOPS: {get_tflops_func(elapsed_time_per_iteration):.3f} or {tflops:.3f}'
logger.info(log_str, print_=False)
if args.wandb:
tensorboard_log = get_tensorboard_writer()
tensorboard_log.log_train(
{
'lr': current_lr,
'loss': cur_loss,
'ppl': math.exp(cur_loss),
'mins_batch': elapsed_time_per_iteration
}, global_step)
train_loss = 0
logger.info(f'epoch {epoch} shard {shard} has cost {timers("shard_time").elapsed() / 60 :.3f} mins')
logger.info('*' * 100)
eval_loss += evaluate(model, args, logger, global_step, criterion)
save_ckpt(model, optimizer, lr_scheduler,
os.path.join(args.ckpt_path, launch_time, f'epoch-{epoch}_shard-{shard}_' + launch_time), epoch,
shard, global_step)
eval_loss /= len(os.listdir(args.data_path_prefix))
logger.info(
f'epoch {epoch} | shard_length {len(os.listdir(args.data_path_prefix))} | elapsed_time: {timers("epoch_time").elapsed() / 60 :.3f} mins'
+ f'eval_loss: {eval_loss} | ppl: {math.exp(eval_loss)}')
logger.info('-' * 100)
if args.wandb and torch.distributed.get_rank() == 0:
tensorboard_log = get_tensorboard_writer()
tensorboard_log.log_eval({
'all_eval_shard_loss': eval_loss,
}, epoch)
start_shard = 0
eval_loss = 0
pretrain_dataset_provider.release_shard()
logger.info('Congratulation, training has finished!!!')
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,42 @@
import os
import time
import wandb
from torch.utils.tensorboard import SummaryWriter
class WandbLog:
@classmethod
def init_wandb(cls, project, notes=None, name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), config=None):
wandb.init(project=project, notes=notes, name=name, config=config)
@classmethod
def log(cls, result, model=None, gradient=None):
wandb.log(result)
if model:
wandb.watch(model)
if gradient:
wandb.watch(gradient)
class TensorboardLog:
def __init__(self, location, name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), config=None):
if not os.path.exists(location):
os.mkdir(location)
self.writer = SummaryWriter(location, comment=name)
def log_train(self, result, step):
for k, v in result.items():
self.writer.add_scalar(f'{k}/train', v, step)
def log_eval(self, result, step):
for k, v in result.items():
self.writer.add_scalar(f'{k}/eval', v, step)
def log_zeroshot(self, result, step):
for k, v in result.items():
self.writer.add_scalar(f'{k}_acc/eval', v, step)

View File

@@ -0,0 +1,114 @@
import functools
import os
import shutil
import psutil
import torch
from colossalai.core import global_context as gpc
def logging(s, log_path, print_=True, log_=True):
if print_:
print(s)
if log_:
with open(log_path, 'a+') as f_log:
f_log.write(s + '\n')
def get_logger(log_path, **kwargs):
return functools.partial(logging, log_path=log_path, **kwargs)
def create_exp_dir(dir_path, scripts_to_save=None, debug=False):
if debug:
print('Debug Mode : no experiment dir created')
return functools.partial(logging, log_path=None, log_=False)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
print('Experiment dir : {}'.format(dir_path))
if scripts_to_save is not None:
script_path = os.path.join(dir_path, 'scripts')
if not os.path.exists(script_path):
os.makedirs(script_path)
for script in scripts_to_save:
dst_file = os.path.join(dir_path, 'scripts', os.path.basename(script))
shutil.copyfile(script, dst_file)
return get_logger(log_path=os.path.join(dir_path, 'log.txt'))
def get_cpu_mem():
return psutil.Process().memory_info().rss / 1024**2
def get_gpu_mem():
return torch.cuda.memory_allocated() / 1024**2
def get_mem_info(prefix=''):
return f'{prefix}GPU memory usage: {get_gpu_mem():.2f} MB, CPU memory usage: {get_cpu_mem():.2f} MB'
def get_tflops(model_numel, batch_size, seq_len, step_time):
return model_numel * batch_size * seq_len * 8 / 1e12 / (step_time + 1e-12)
def get_parameters_in_billions(model, world_size=1):
gpus_per_model = world_size
approx_parameters_in_billions = sum([
sum([p.ds_numel if hasattr(p, 'ds_id') else p.nelement()
for p in model_module.parameters()])
for model_module in model
])
return approx_parameters_in_billions * gpus_per_model / (1e9)
def throughput_calculator(numel, args, config, iteration_time, total_iterations, world_size=1):
gpus_per_model = 1
batch_size = args.train_micro_batch_size_per_gpu
samples_per_model = batch_size * args.max_seq_length
model_replica_count = world_size / gpus_per_model
approx_parameters_in_billions = numel
elapsed_time_per_iter = iteration_time / total_iterations
samples_per_second = batch_size / elapsed_time_per_iter
#flops calculator
hidden_size = config.hidden_size
num_layers = config.num_hidden_layers
vocab_size = config.vocab_size
# General TFLOPs formula (borrowed from Equation 3 in Section 5.1 of
# https://arxiv.org/pdf/2104.04473.pdf).
# The factor of 4 is when used with activation check-pointing,
# otherwise it will be 3.
checkpoint_activations_factor = 4 if args.checkpoint_activations else 3
flops_per_iteration = (24 * checkpoint_activations_factor * batch_size * args.max_seq_length * num_layers *
(hidden_size**2)) * (1. + (args.max_seq_length / (6. * hidden_size)) +
(vocab_size / (16. * num_layers * hidden_size)))
tflops = flops_per_iteration / (elapsed_time_per_iter * (10**12))
return samples_per_second, tflops, approx_parameters_in_billions
def synchronize():
if not torch.distributed.is_available():
return
if not torch.distributed.is_intialized():
return
world_size = torch.distributed.get_world_size()
if world_size == 1:
return
torch.distributed.barrier()
def log_args(logger, args):
logger.info('--------args----------')
message = '\n'.join([f'{k:<30}: {v}' for k, v in vars(args).items()])
message += '\n'
message += '\n'.join([f'{k:<30}: {v}' for k, v in gpc.config.items()])
logger.info(message)
logger.info('--------args----------\n')

View File

@@ -0,0 +1,130 @@
import time
import torch
from .WandbLog import TensorboardLog
_GLOBAL_TIMERS = None
_GLOBAL_TENSORBOARD_WRITER = None
def set_global_variables(launch_time, tensorboard_path):
_set_timers()
_set_tensorboard_writer(launch_time, tensorboard_path)
def _set_timers():
"""Initialize timers."""
global _GLOBAL_TIMERS
_ensure_var_is_not_initialized(_GLOBAL_TIMERS, 'timers')
_GLOBAL_TIMERS = Timers()
def _set_tensorboard_writer(launch_time, tensorboard_path):
"""Set tensorboard writer."""
global _GLOBAL_TENSORBOARD_WRITER
_ensure_var_is_not_initialized(_GLOBAL_TENSORBOARD_WRITER, 'tensorboard writer')
if torch.distributed.get_rank() == 0:
_GLOBAL_TENSORBOARD_WRITER = TensorboardLog(tensorboard_path + f'/{launch_time}', launch_time)
def get_timers():
"""Return timers."""
_ensure_var_is_initialized(_GLOBAL_TIMERS, 'timers')
return _GLOBAL_TIMERS
def get_tensorboard_writer():
"""Return tensorboard writer. It can be None so no need
to check if it is initialized."""
return _GLOBAL_TENSORBOARD_WRITER
def _ensure_var_is_initialized(var, name):
"""Make sure the input variable is not None."""
assert var is not None, '{} is not initialized.'.format(name)
def _ensure_var_is_not_initialized(var, name):
"""Make sure the input variable is not None."""
assert var is None, '{} is already initialized.'.format(name)
class _Timer:
"""Timer."""
def __init__(self, name):
self.name_ = name
self.elapsed_ = 0.0
self.started_ = False
self.start_time = time.time()
def start(self):
"""Start the timer."""
# assert not self.started_, 'timer has already been started'
torch.cuda.synchronize()
self.start_time = time.time()
self.started_ = True
def stop(self):
"""Stop the timer."""
assert self.started_, 'timer is not started'
torch.cuda.synchronize()
self.elapsed_ += (time.time() - self.start_time)
self.started_ = False
def reset(self):
"""Reset timer."""
self.elapsed_ = 0.0
self.started_ = False
def elapsed(self, reset=True):
"""Calculate the elapsed time."""
started_ = self.started_
# If the timing in progress, end it first.
if self.started_:
self.stop()
# Get the elapsed time.
elapsed_ = self.elapsed_
# Reset the elapsed time
if reset:
self.reset()
# If timing was in progress, set it back.
if started_:
self.start()
return elapsed_
class Timers:
"""Group of timers."""
def __init__(self):
self.timers = {}
def __call__(self, name):
if name not in self.timers:
self.timers[name] = _Timer(name)
return self.timers[name]
def write(self, names, writer, iteration, normalizer=1.0, reset=False):
"""Write timers to a tensorboard writer"""
# currently when using add_scalars,
# torch.utils.add_scalars makes each timer its own run, which
# polutes the runs list, so we just add each as a scalar
assert normalizer > 0.0
for name in names:
value = self.timers[name].elapsed(reset=reset) / normalizer
writer.add_scalar(name + '-time', value, iteration)
def log(self, names, normalizer=1.0, reset=True):
"""Log a group of timers."""
assert normalizer > 0.0
string = 'time (ms)'
for name in names:
elapsed_time = self.timers[name].elapsed(reset=reset) * 1000.0 / normalizer
string += ' | {}: {:.2f}'.format(name, elapsed_time)
if torch.distributed.is_initialized():
if torch.distributed.get_rank() == (torch.distributed.get_world_size() - 1):
print(string, flush=True)
else:
print(string, flush=True)

View File

@@ -0,0 +1,30 @@
import logging
import os
import torch.distributed as dist
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
datefmt='%m/%d/%Y %H:%M:%S',
level=logging.INFO)
logger = logging.getLogger(__name__)
class Logger():
def __init__(self, log_path, cuda=False, debug=False):
self.logger = logging.getLogger(__name__)
self.cuda = cuda
self.log_path = log_path
self.debug = debug
def info(self, message, log_=True, print_=True, *args, **kwargs):
if (self.cuda and dist.get_rank() == 0) or not self.cuda:
if print_:
self.logger.info(message, *args, **kwargs)
if log_:
with open(self.log_path, 'a+') as f_log:
f_log.write(message + '\n')
def error(self, message, *args, **kwargs):
self.logger.error(message, *args, **kwargs)