add RoBERTa (#1980)

* update roberta

* update roberta & readme

* update roberta & readme

* update roberta & readme
This commit is contained in:
mandoxzhang
2022-11-18 14:04:49 +08:00
committed by GitHub
parent 31922110ad
commit 52bd106627
26 changed files with 5814 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
# Pretraining
1. Pretraining roberta through running the script below. Detailed parameter descriptions can be found in the arguments.py. `data_path_prefix` is absolute path specifies output of preprocessing. **You have to modify the *hostfile* according to your cluster.**
```bash
bash run_pretrain.sh
```
* `--hostfile`: servers' host name from /etc/hosts
* `--include`: servers which will be used
* `--nproc_per_node`: number of process(GPU) from each server
* `--data_path_prefix`: absolute location of train data, e.g., /h5/0.h5
* `--eval_data_path_prefix`: absolute location of eval data
* `--tokenizer_path`: tokenizer path contains huggingface tokenizer.json, e.g./tokenizer/tokenizer.json
* `--bert_config`: config.json which represent model
* `--mlm`: model type of backbone, bert or deberta_v2
2. if resume training from earylier checkpoint, run the script below.
```shell
bash run_pretrain_resume.sh
```
* `--resume_train`: whether to resume training
* `--load_pretrain_model`: absolute path which contains model checkpoint
* `--load_optimizer_lr`: absolute path which contains optimizer checkpoint

View File

@@ -0,0 +1,152 @@
import colossalai
from numpy import require
__all__ = ['parse_args']
def parse_args():
parser = colossalai.get_default_parser()
parser.add_argument(
'--lr',
type=float,
required=True,
help='initial learning rate')
parser.add_argument(
'--epoch',
type=int,
required=True,
help='number of epoch')
parser.add_argument(
'--data_path_prefix',
type=str,
required=True,
help="location of the train data corpus")
parser.add_argument(
'--eval_data_path_prefix',
type=str,
required=True,
help='location of the evaluation data corpus')
parser.add_argument(
'--tokenizer_path',
type=str,
required=True,
help='location of the tokenizer')
parser.add_argument(
'--max_seq_length',
type=int,
default=512,
help='sequence length')
parser.add_argument(
'--refresh_bucket_size',
type=int,
default=1,
help=
"This param makes sure that a certain task is repeated for this time steps to \
optimise on the back propogation speed with APEX's DistributedDataParallel")
parser.add_argument(
"--max_predictions_per_seq",
"--max_pred",
default=80,
type=int,
help=
"The maximum number of masked tokens in a sequence to be predicted.")
parser.add_argument(
"--gradient_accumulation_steps",
default=1,
type=int,
help="accumulation_steps")
parser.add_argument(
"--train_micro_batch_size_per_gpu",
default=2,
type=int,
required=True,
help="train batch size")
parser.add_argument(
"--eval_micro_batch_size_per_gpu",
default=2,
type=int,
required=True,
help="eval batch size")
parser.add_argument(
"--num_workers",
default=8,
type=int,
help="")
parser.add_argument(
"--async_worker",
action='store_true',
help="")
parser.add_argument(
"--bert_config",
required=True,
type=str,
help="location of config.json")
parser.add_argument(
"--wandb",
action='store_true',
help="use wandb to watch model")
parser.add_argument(
"--wandb_project_name",
default='roberta',
help="wandb project name")
parser.add_argument(
"--log_interval",
default=100,
type=int,
help="report interval")
parser.add_argument(
"--log_path",
type=str,
required=True,
help="log file which records train step")
parser.add_argument(
"--tensorboard_path",
type=str,
required=True,
help="location of tensorboard file")
parser.add_argument(
"--colossal_config",
type=str,
required=True,
help="colossal config, which contains zero config and so on")
parser.add_argument(
"--ckpt_path",
type=str,
required=True,
help="location of saving checkpoint, which contains model and optimizer")
parser.add_argument(
'--seed',
type=int,
default=42,
help="random seed for initialization")
parser.add_argument(
'--vscode_debug',
action='store_true',
help="use vscode to debug")
parser.add_argument(
'--load_pretrain_model',
default='',
type=str,
help="location of model's checkpoin")
parser.add_argument(
'--load_optimizer_lr',
default='',
type=str,
help="location of checkpoint, which contains optimerzier, learning rate, epoch, shard and global_step")
parser.add_argument(
'--resume_train',
action='store_true',
help="whether resume training from a early checkpoint")
parser.add_argument(
'--mlm',
default='bert',
type=str,
help="model type, bert or deberta")
parser.add_argument(
'--checkpoint_activations',
action='store_true',
help="whether to use gradient checkpointing")
args = parser.parse_args()
return args

View File

@@ -0,0 +1,15 @@
class BertDatasetProviderInterface:
def get_shard(self, index, shuffle=True):
raise NotImplementedError
def release_shard(self, index):
raise NotImplementedError
def prefetch_shard(self, index):
raise NotImplementedError
def get_batch(self, batch_iter):
raise NotImplementedError
def prefetch_batch(self):
raise NotImplementedError

View File

@@ -0,0 +1,71 @@
import os
import math
import torch
from tqdm import tqdm
from utils.global_vars import get_timers, get_tensorboard_writer
from nvidia_bert_dataset_provider import NvidiaBertDatasetProvider
def evaluate(engine, args, logger, global_step):
evaluate_dataset_provider = NvidiaBertDatasetProvider(args, evaluate=True)
start_shard = 0
engine.eval()
timers = get_timers()
eval_step = 0
eval_loss = 0
cur_loss = 0
world_size = torch.distributed.get_world_size()
with torch.no_grad():
for shard in range(start_shard, len(os.listdir(args.eval_data_path_prefix))):
timers('eval_shard_time').start()
dataset_iterator, total_length = evaluate_dataset_provider.get_shard(shard)
# evaluate_dataset_provider.prefetch_shard(shard + 1)
if torch.distributed.get_rank() == 0:
iterator_data = tqdm(enumerate(dataset_iterator), total=(total_length // args.eval_micro_batch_size_per_gpu // world_size), colour='MAGENTA', smoothing=1)
else:
iterator_data = enumerate(dataset_iterator)
for step, batch_data in iterator_data: #tqdm(enumerate(dataset_iterator), total=(total_length // args.train_micro_batch_size_per_gpu // world_size), colour='cyan', smoothing=1):
# batch_data = pretrain_dataset_provider.get_batch(batch_index)
eval_step += 1
input_ids = batch_data[0].cuda()
attention_mask = batch_data[1].cuda()
token_type_ids = batch_data[2].cuda()
mlm_label = batch_data[3].cuda()
# nsp_label = batch_data[5].cuda()
output = engine(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
loss = engine.criterion(output.logits, mlm_label)#prediction_scores
evaluate_dataset_provider.prefetch_batch()
eval_loss += loss.float().item()
cur_loss = eval_loss / eval_step
elapsed_time = timers("eval_shard_time").elapsed()
elapsed_time_per_iteration = elapsed_time / eval_step
ppl = math.exp(cur_loss)
if args.wandb and torch.distributed.get_rank() == 0:
tensorboard_log = get_tensorboard_writer()
tensorboard_log.log_eval({
'loss': cur_loss,
'ppl': ppl,
'mins_batch': elapsed_time_per_iteration
}, global_step)
eval_log_str = f'evaluation shard: {shard} | step: {eval_step} | elapsed_time: {elapsed_time / 60 :.3f} minutes ' + \
f'| mins/batch: {elapsed_time_per_iteration :.3f} seconds | loss: {cur_loss:.7f} | ppl: {ppl:.7f}'
logger.info(eval_log_str)
logger.info('-' * 100)
logger.info('')
evaluate_dataset_provider.release_shard()
engine.train()
return cur_loss

View File

@@ -0,0 +1,10 @@
GPU001
GPU002
GPU003
GPU004
GPU005
GPU006
GPU007
GPU008
GPU009
GPU010

View File

@@ -0,0 +1,17 @@
import torch
__all__ = ['LossForPretraining']
class LossForPretraining(torch.nn.Module):
def __init__(self, vocab_size):
super(LossForPretraining, self).__init__()
self.loss_fn = torch.nn.CrossEntropyLoss(ignore_index=-1)
self.vocab_size = vocab_size
def forward(self, prediction_scores, masked_lm_labels, next_sentence_labels=None):
masked_lm_loss = self.loss_fn(prediction_scores.view(-1, self.vocab_size), masked_lm_labels.view(-1))
# next_sentence_loss = self.loss_fn(seq_relationship_score.view(-1, 2), next_sentence_labels.view(-1))
total_loss = masked_lm_loss #+ next_sentence_loss
return total_loss

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,182 @@
import os
import random
import h5py
import logging
import json
import time
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import RandomSampler
from torch.utils.data.distributed import DistributedSampler
from bert_dataset_provider import BertDatasetProviderInterface
import colossalai.utils as utils
# Workaround because python functions are not picklable
class WorkerInitObj(object):
def __init__(self, seed):
self.seed = seed
def __call__(self, id):
np.random.seed(seed=self.seed + id)
random.seed(self.seed + id)
def create_pretraining_dataset(input_file, max_predictions_per_seq,
num_workers, train_batch_size, worker_init,
data_sampler):
train_data = pretraining_dataset(
input_file=input_file, max_predictions_per_seq=max_predictions_per_seq)
train_dataloader = DataLoader(train_data,
sampler=data_sampler(train_data),
batch_size=train_batch_size,
num_workers=num_workers,
worker_init_fn=worker_init,
pin_memory=True
)
return train_dataloader, len(train_data)
class pretraining_dataset(Dataset):
def __init__(self, input_file, max_predictions_per_seq):
self.input_file = input_file
self.max_predictions_per_seq = max_predictions_per_seq
f = h5py.File(input_file, "r")
keys = [
'input_ids', 'input_mask', 'segment_ids', 'masked_lm_positions'
]
self.inputs = [np.asarray(f[key][:]) for key in keys]
f.close()
def __len__(self):
'Denotes the total number of samples'
return len(self.inputs[0])
def __getitem__(self, index):
[
input_ids, input_mask, segment_ids, masked_lm_labels
] = [
torch.from_numpy(input[index].astype(np.int64)) if indice < 5 else
torch.from_numpy(np.asarray(input[index].astype(np.int64)))
for indice, input in enumerate(self.inputs)
]
return [
input_ids, input_mask,
segment_ids, masked_lm_labels
]
class NvidiaBertDatasetProvider(BertDatasetProviderInterface):
def __init__(self, args, evaluate=False):
self.num_workers = args.num_workers
self.max_seq_length = args.max_seq_length
self.max_predictions_per_seq = args.max_predictions_per_seq
self.gradient_accumulation_steps = args.gradient_accumulation_steps
if not evaluate:
self.train_micro_batch_size_per_gpu = args.train_micro_batch_size_per_gpu
else:
self.train_micro_batch_size_per_gpu = args.eval_micro_batch_size_per_gpu
self.logger = args.logger
self.global_rank = dist.get_rank()
self.world_size = dist.get_world_size()
# Initialize dataset files
if not evaluate:
self.dataset_files = [
os.path.join(args.data_path_prefix, f) for f in os.listdir(args.data_path_prefix) if
os.path.isfile(os.path.join(args.data_path_prefix, f)) and 'h5' in f
]
else:
self.dataset_files = [
os.path.join(args.eval_data_path_prefix, f) for f in os.listdir(args.eval_data_path_prefix) if
os.path.isfile(os.path.join(args.eval_data_path_prefix, f)) and 'h5' in f
]
self.dataset_files.sort()
# random.shuffle(self.dataset_files)
self.num_files = len(self.dataset_files)
# self.data_sampler = RandomSampler
self.data_sampler = DistributedSampler
self.worker_init = WorkerInitObj(args.seed + args.local_rank)
self.dataset_future = None
self.pool = ProcessPoolExecutor(1)
self.data_file = None
self.shuffle = True
if self.global_rank == 0:
self.logger.info(
f"NvidiaBertDatasetProvider - Initialization: num_files = {self.num_files}"
)
def get_shard(self, index):
start = time.time()
if self.dataset_future is None:
self.data_file = self._get_shard_file(index)
self.train_dataloader, sample_count = create_pretraining_dataset(
input_file=self.data_file,
max_predictions_per_seq=self.max_predictions_per_seq,
num_workers=self.num_workers,
train_batch_size=self.train_micro_batch_size_per_gpu,
worker_init=self.worker_init,
data_sampler=self.data_sampler)
else:
self.train_dataloader, sample_count = self.dataset_future.result(
timeout=None)
self.logger.info(
f"Data Loading Completed for Pretraining Data from {self.data_file} with {sample_count} samples took {time.time()-start:.2f}s."
)
return self.train_dataloader, sample_count
def release_shard(self):
del self.train_dataloader
self.pool.shutdown()
def prefetch_shard(self, index):
self.data_file = self._get_shard_file(index)
self.dataset_future = self.pool.submit(
create_pretraining_dataset, self.data_file,
self.max_predictions_per_seq, self.num_workers,
self.train_micro_batch_size_per_gpu, self.worker_init,
self.data_sampler)
def get_batch(self, batch_iter):
return batch_iter
def prefetch_batch(self):
pass
def _get_shard_file(self, shard_index):
file_index = self._get_shard_file_index(shard_index, self.global_rank)
return self.dataset_files[file_index]
def _get_shard_file_index(self, shard_index, global_rank):
# if dist.is_initialized() and self.world_size > self.num_files:
# remainder = self.world_size % self.num_files
# file_index = (shard_index * self.world_size) + global_rank + (
# remainder * shard_index)
# else:
# file_index = shard_index * self.world_size + global_rank
return shard_index % self.num_files
def shuffle_dataset(self, epoch):
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(self.num_files, generator=g).tolist()
new_dataset = [self.dataset_files[i] for i in indices]
self.dataset_files = new_dataset

View File

@@ -0,0 +1,112 @@
import transformers
import logging
from colossalai.nn.lr_scheduler import LinearWarmupLR
from transformers import get_linear_schedule_with_warmup
from transformers import BertForPreTraining, RobertaForMaskedLM, RobertaConfig
from transformers import GPT2Config, GPT2LMHeadModel
from transformers import AutoTokenizer, AutoModelForMaskedLM
from colossalai.nn.optimizer import FusedAdam
from torch.optim import AdamW
from colossalai.core import global_context as gpc
import torch
import os
import sys
sys.path.append(os.getcwd())
from model.deberta_v2 import DebertaV2ForMaskedLM
from model.bert import BertForMaskedLM
import torch.nn as nn
from collections import OrderedDict
__all__ = ['get_model', 'get_optimizer', 'get_lr_scheduler', 'get_dataloader_for_pretraining']
def get_new_state_dict(state_dict, start_index=13):
new_state_dict = OrderedDict()
for k, v in state_dict.items():
name = k[start_index:]
new_state_dict[name] = v
return new_state_dict
class LMModel(nn.Module):
def __init__(self, model, config, args):
super().__init__()
self.checkpoint = args.checkpoint_activations
self.config = config
self.model = model
if self.checkpoint:
self.model.gradient_checkpointing_enable()
def forward(self, input_ids, token_type_ids=None, attention_mask=None):
# Only return lm_logits
return self.model(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
def get_model(args, logger):
if args.mlm == 'bert':
config = transformers.BertConfig.from_json_file(args.bert_config)
model = BertForMaskedLM(config)
elif args.mlm == 'deberta_v2':
config = transformers.DebertaV2Config.from_json_file(args.bert_config)
model = DebertaV2ForMaskedLM(config)
else:
raise Exception("Invalid mlm!")
if len(args.load_pretrain_model) > 0:
assert os.path.exists(args.load_pretrain_model)
# load_checkpoint(args.load_pretrain_model, model, strict=False)
m_state_dict = torch.load(args.load_pretrain_model, map_location=torch.device(f"cuda:{torch.cuda.current_device()}"))
# new_state_dict = get_new_state_dict(m_state_dict)
model.load_state_dict(m_state_dict, strict=True) # must insure that every process have identical parameters !!!!!!!
logger.info("load model success")
numel = sum([p.numel() for p in model.parameters()])
if args.checkpoint_activations:
model.gradient_checkpointing_enable()
# model = LMModel(model, config, args)
return config, model, numel
def get_optimizer(model, lr):
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'gamma', 'beta', 'LayerNorm']
# configure the weight decay for bert models
optimizer_grouped_parameters = [{
'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
'weight_decay': 0.1
}, {
'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)],
'weight_decay': 0.0
}]
optimizer = FusedAdam(optimizer_grouped_parameters, lr=lr, betas=[0.9, 0.95])
return optimizer
def get_lr_scheduler(optimizer, total_steps, warmup_steps=2000, last_epoch=-1):
# warmup_steps = int(total_steps * warmup_ratio)
lr_scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps, last_epoch=last_epoch)
# lr_scheduler = LinearWarmupLR(optimizer, total_steps=total_steps, warmup_steps=warmup_steps)
return lr_scheduler
def save_ckpt(model, optimizer, lr_scheduler, path, epoch, shard, global_step):
model_path = path + '_pytorch_model.bin'
optimizer_lr_path = path + '.op_lrs'
checkpoint = {}
checkpoint['optimizer'] = optimizer.state_dict()
checkpoint['lr_scheduler'] = lr_scheduler.state_dict()
checkpoint['epoch'] = epoch
checkpoint['shard'] = shard
checkpoint['global_step'] = global_step
model_state = model.state_dict() #each process must run model.state_dict()
if gpc.get_global_rank() == 0:
torch.save(checkpoint, optimizer_lr_path)
torch.save(model_state, model_path)

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env sh
root_path=$PWD
PY_FILE_PATH="$root_path/run_pretraining.py"
tensorboard_path="$root_path/tensorboard"
log_path="$root_path/exp_log"
ckpt_path="$root_path/ckpt"
colossal_config="$root_path/../configs/colossalai_ddp.py"
mkdir -p $tensorboard_path
mkdir -p $log_path
mkdir -p $ckpt_path
export PYTHONPATH=$PWD
env OMP_NUM_THREADS=40 colossalai run --hostfile ./hostfile \
--include GPU002,GPU003,GPU004,GPU007 \
--nproc_per_node=8 \
$PY_FILE_PATH \
--master_addr GPU007 \
--master_port 20024 \
--lr 2.0e-4 \
--train_micro_batch_size_per_gpu 190 \
--eval_micro_batch_size_per_gpu 20 \
--epoch 15 \
--data_path_prefix /h5 \
--eval_data_path_prefix /eval_h5 \
--tokenizer_path /roberta \
--bert_config /roberta/config.json \
--tensorboard_path $tensorboard_path \
--log_path $log_path \
--ckpt_path $ckpt_path \
--colossal_config $colossal_config \
--log_interval 50 \
--mlm bert \
--wandb \
--checkpoint_activations \

View File

@@ -0,0 +1,43 @@
#!/usr/bin/env sh
root_path=$PWD
PY_FILE_PATH="$root_path/run_pretraining.py"
tensorboard_path="$root_path/tensorboard"
log_path="$root_path/exp_log"
ckpt_path="$root_path/ckpt"
colossal_config="$root_path/../configs/colossalai_ddp.py"
mkdir -p $tensorboard_path
mkdir -p $log_path
mkdir -p $ckpt_path
export PYTHONPATH=$PWD
env OMP_NUM_THREADS=40 colossalai run --hostfile ./hostfile \
--include GPU002,GPU003,GPU004,GPU007 \
--nproc_per_node=8 \
$PY_FILE_PATH \
--master_addr GPU007 \
--master_port 20024 \
--lr 2.0e-4 \
--train_micro_batch_size_per_gpu 190 \
--eval_micro_batch_size_per_gpu 20 \
--epoch 15 \
--data_path_prefix /h5 \
--eval_data_path_prefix /eval_h5 \
--tokenizer_path /roberta \
--bert_config /roberta/config.json \
--tensorboard_path $tensorboard_path \
--log_path $log_path \
--ckpt_path $ckpt_path \
--colossal_config $colossal_config \
--log_interval 50 \
--mlm bert \
--wandb \
--checkpoint_activations \
--resume_train \
--load_pretrain_model /ckpt/1.pt \
--load_optimizer_lr /ckpt/1.op_lrs \

View File

@@ -0,0 +1,226 @@
import colossalai
import math
import torch
from colossalai.context import ParallelMode
from colossalai.core import global_context as gpc
import colossalai.nn as col_nn
from arguments import parse_args
from pretrain_utils import get_model, get_optimizer, get_lr_scheduler, save_ckpt
from utils.exp_util import get_tflops, get_mem_info, throughput_calculator, log_args
from utils.global_vars import set_global_variables, get_timers, get_tensorboard_writer
from utils.logger import Logger
from evaluation import evaluate
from loss import LossForPretraining
from colossalai.zero.init_ctx import ZeroInitContext
from colossalai.zero.shard_utils import TensorShardStrategy
from colossalai.zero.sharded_model import ShardedModelV2
from colossalai.zero.sharded_optim import ShardedOptimizerV2
from nvidia_bert_dataset_provider import NvidiaBertDatasetProvider
from tqdm import tqdm
import os
import time
from functools import partial
from transformers import AutoTokenizer
from colossalai.gemini import ChunkManager, GeminiManager
from colossalai.utils.model.colo_init_context import ColoInitContext
from colossalai.utils import get_current_device
from colossalai.nn.parallel import ZeroDDP
from colossalai.zero import ZeroOptimizer
from colossalai.tensor import ProcessGroup
from colossalai.nn.optimizer import HybridAdam
def main():
args = parse_args()
launch_time = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime())
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_path)
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
logger = Logger(os.path.join(args.log_path, launch_time), cuda=torch.cuda.is_available(), debug=args.vscode_debug)
if args.vscode_debug:
colossalai.launch(config={},
rank=args.rank,
world_size=args.world_size,
host=args.host,
port=args.port,
backend=args.backend)
args.local_rank = -1
args.log_interval = 1
else:
colossalai.launch_from_torch(args.colossal_config) #args.colossal_config
args.local_rank = int(os.environ["LOCAL_RANK"])
logger.info(f'launch_from_torch, world size: {torch.distributed.get_world_size()} | ' +
f'ParallelMode.MODEL: {ParallelMode.MODEL} | ParallelMode.DATA: {ParallelMode.DATA} | ParallelMode.TENSOR: {ParallelMode.TENSOR}')
log_args(logger, args)
args.tokenizer = tokenizer
args.logger = logger
set_global_variables(launch_time, args.tensorboard_path)
use_zero = hasattr(gpc.config, 'zero')
world_size = torch.distributed.get_world_size()
# build model, optimizer and criterion
if use_zero:
shard_strategy = TensorShardStrategy()
with ZeroInitContext(target_device=torch.cuda.current_device(), shard_strategy=shard_strategy,
shard_param=True):
config, model, numel = get_model(args, logger)
# model = ShardedModelV2(model, shard_strategy, tensor_placement_policy='cpu', reuse_fp16_shard=True)
else:
config, model, numel = get_model(args, logger)
logger.info("no_zero")
if torch.distributed.get_rank() == 0:
os.mkdir(os.path.join(args.ckpt_path, launch_time))
logger.info(f'Model numel: {numel}')
get_tflops_func = partial(get_tflops, numel, args.train_micro_batch_size_per_gpu, args.max_seq_length)
steps_per_epoch = 144003367 // world_size // args.train_micro_batch_size_per_gpu // args.gradient_accumulation_steps // args.refresh_bucket_size #len(dataloader)
total_steps = steps_per_epoch * args.epoch
# build optimizer and lr_scheduler
start_epoch = 0
start_shard = 0
global_step = 0
if args.resume_train:
assert os.path.exists(args.load_optimizer_lr)
o_l_state_dict = torch.load(args.load_optimizer_lr, map_location='cpu')
o_l_state_dict['lr_scheduler']['last_epoch'] = o_l_state_dict['lr_scheduler']['last_epoch'] - 1
optimizer = get_optimizer(model, lr=args.lr)
optimizer.load_state_dict(o_l_state_dict['optimizer'])
lr_scheduler = get_lr_scheduler(optimizer, total_steps=total_steps, last_epoch=o_l_state_dict['lr_scheduler']['last_epoch']) #o_l_state_dict['lr_scheduler']['last_epoch']
for state in optimizer.state.values():
for k, v in state.items():
if isinstance(v, torch.Tensor):
state[k] = v.cuda(f"cuda:{torch.cuda.current_device()}")
# if you want delete the above three code, have to move the model to gpu, because in optimizer.step()
lr_scheduler.load_state_dict(o_l_state_dict['lr_scheduler'])
start_epoch = o_l_state_dict['epoch']
start_shard = o_l_state_dict['shard'] + 1
# global_step = o_l_state_dict['global_step'] + 1
logger.info(f'resume from epoch {start_epoch} shard {start_shard} step {lr_scheduler.last_epoch} lr {lr_scheduler.get_last_lr()[0]}')
else:
optimizer = get_optimizer(model, lr=args.lr)
lr_scheduler = get_lr_scheduler(optimizer, total_steps=total_steps, last_epoch=-1)
# optimizer = gpc.config.optimizer.pop('type')(
# model.parameters(), **gpc.config.optimizer)
# optimizer = ShardedOptimizerV2(model, optimizer, initial_scale=2**5)
criterion = LossForPretraining(config.vocab_size)
# build dataloader
pretrain_dataset_provider = NvidiaBertDatasetProvider(args)
# initialize with colossalai
engine, _, _, lr_scheduelr = colossalai.initialize(model=model,
optimizer=optimizer,
criterion=criterion,
lr_scheduler=lr_scheduler)
logger.info(get_mem_info(prefix='After init model, '))
best_loss = None
eval_loss = 0
train_loss = 0
timers = get_timers()
timers('interval_time').start()
timers('epoch_time').start()
timers('shard_time').start()
for epoch in range(start_epoch, args.epoch):
for shard in range(start_shard, len(os.listdir(args.data_path_prefix))):
dataset_iterator, total_length = pretrain_dataset_provider.get_shard(shard)
# pretrain_dataset_provider.prefetch_shard(shard + 1) # may cause cpu memory overload
if torch.distributed.get_rank() == 0:
iterator_data = tqdm(enumerate(dataset_iterator), total=(total_length // args.train_micro_batch_size_per_gpu // world_size), colour='cyan', smoothing=1)
else:
iterator_data = enumerate(dataset_iterator)
engine.train()
for step, batch_data in iterator_data:
# batch_data = pretrain_dataset_provider.get_batch(batch_index)
input_ids = batch_data[0].cuda(f"cuda:{torch.cuda.current_device()}")
attention_mask = batch_data[1].cuda(f"cuda:{torch.cuda.current_device()}")
token_type_ids = batch_data[2].cuda(f"cuda:{torch.cuda.current_device()}")
mlm_label = batch_data[3].cuda(f"cuda:{torch.cuda.current_device()}")
# nsp_label = batch_data[5].cuda()
output = engine(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
loss = engine.criterion(output.logits, mlm_label)
pretrain_dataset_provider.prefetch_batch()
engine.backward(loss)
train_loss += loss.float().item()
# if (step + 1) % args.accumulation_step == 0:
engine.step()
lr_scheduelr.step()
engine.zero_grad()
global_step += 1
if global_step % args.log_interval == 0 and global_step != 0 \
and torch.distributed.get_rank() == 0:
elapsed_time = timers('interval_time').elapsed(reset=False)
elapsed_time_per_iteration = elapsed_time / global_step
samples_per_sec, tflops, approx_parameters_in_billions = throughput_calculator(numel, args, config, elapsed_time, global_step, world_size)
cur_loss = train_loss / args.log_interval
current_lr = lr_scheduelr.get_last_lr()[0]
log_str = f'| epoch: {epoch} | shard: {shard} | step: {global_step} | lr {current_lr:.7f} | elapsed_time: {elapsed_time / 60 :.3f} minutes ' + \
f'| mins/batch: {elapsed_time_per_iteration :.3f} seconds | loss: {cur_loss:.7f} | ppl: {math.exp(cur_loss):.3f} | TFLOPS: {get_tflops_func(elapsed_time_per_iteration):.3f} or {tflops:.3f}'
logger.info(log_str, print_=False)
if args.wandb:
tensorboard_log = get_tensorboard_writer()
tensorboard_log.log_train({
'lr': current_lr,
'loss': cur_loss,
'ppl': math.exp(cur_loss),
'mins_batch': elapsed_time_per_iteration
}, global_step)
train_loss = 0
logger.info(f'epoch {epoch} shard {shard} has cost {timers("shard_time").elapsed() / 60 :.3f} mins')
logger.info('*' * 100)
eval_loss += evaluate(engine, args, logger, global_step)
save_ckpt(engine.model, optimizer, lr_scheduelr, os.path.join(args.ckpt_path, launch_time, f'epoch-{epoch}_shard-{shard}_' + launch_time), epoch, shard, global_step)
eval_loss /= len(os.listdir(args.data_path_prefix))
logger.info(f'epoch {epoch} | shard_length {len(os.listdir(args.data_path_prefix))} | elapsed_time: {timers("epoch_time").elapsed() / 60 :.3f} mins' + \
f'eval_loss: {eval_loss} | ppl: {math.exp(eval_loss)}')
logger.info('-' * 100)
if args.wandb and torch.distributed.get_rank() == 0:
tensorboard_log = get_tensorboard_writer()
tensorboard_log.log_eval({
'all_eval_shard_loss': eval_loss,
}, epoch)
start_shard = 0
eval_loss = 0
pretrain_dataset_provider.release_shard()
logger.info('Congratulation, training has finished!!!')
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,46 @@
import time
import wandb
import os
from torch.utils.tensorboard import SummaryWriter
class WandbLog:
@classmethod
def init_wandb(cls, project, notes=None, name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), config=None):
wandb.init(project=project, notes=notes, name=name, config=config)
@classmethod
def log(cls, result, model=None, gradient=None):
wandb.log(result)
if model:
wandb.watch(model)
if gradient:
wandb.watch(gradient)
class TensorboardLog:
def __init__(self, location, name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), config=None):
if not os.path.exists(location):
os.mkdir(location)
self.writer = SummaryWriter(location, comment=name)
def log_train(self, result, step):
for k, v in result.items():
self.writer.add_scalar(f'{k}/train', v, step)
def log_eval(self, result, step):
for k, v in result.items():
self.writer.add_scalar(f'{k}/eval', v, step)
def log_zeroshot(self, result, step):
for k, v in result.items():
self.writer.add_scalar(f'{k}_acc/eval', v, step)

View File

@@ -0,0 +1,99 @@
import functools
import os, shutil
import torch
import psutil
from colossalai.core import global_context as gpc
def logging(s, log_path, print_=True, log_=True):
if print_:
print(s)
if log_:
with open(log_path, 'a+') as f_log:
f_log.write(s + '\n')
def get_logger(log_path, **kwargs):
return functools.partial(logging, log_path=log_path, **kwargs)
def create_exp_dir(dir_path, scripts_to_save=None, debug=False):
if debug:
print('Debug Mode : no experiment dir created')
return functools.partial(logging, log_path=None, log_=False)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
print('Experiment dir : {}'.format(dir_path))
if scripts_to_save is not None:
script_path = os.path.join(dir_path, 'scripts')
if not os.path.exists(script_path):
os.makedirs(script_path)
for script in scripts_to_save:
dst_file = os.path.join(dir_path, 'scripts', os.path.basename(script))
shutil.copyfile(script, dst_file)
return get_logger(log_path=os.path.join(dir_path, 'log.txt'))
def get_cpu_mem():
return psutil.Process().memory_info().rss / 1024**2
def get_gpu_mem():
return torch.cuda.memory_allocated() / 1024**2
def get_mem_info(prefix=''):
return f'{prefix}GPU memory usage: {get_gpu_mem():.2f} MB, CPU memory usage: {get_cpu_mem():.2f} MB'
def get_tflops(model_numel, batch_size, seq_len, step_time):
return model_numel * batch_size * seq_len * 8 / 1e12 / (step_time + 1e-12)
def get_parameters_in_billions(model, world_size=1):
gpus_per_model = world_size
approx_parameters_in_billions = sum([sum([p.ds_numel if hasattr(p,'ds_id') else p.nelement() for p in model_module.parameters()])
for model_module in model])
return approx_parameters_in_billions * gpus_per_model / (1e9)
def throughput_calculator(numel, args, config, iteration_time, total_iterations, world_size=1):
gpus_per_model = 1
batch_size = args.train_micro_batch_size_per_gpu
samples_per_model = batch_size * args.max_seq_length
model_replica_count = world_size / gpus_per_model
approx_parameters_in_billions = numel
elapsed_time_per_iter = iteration_time / total_iterations
samples_per_second = batch_size / elapsed_time_per_iter
#flops calculator
hidden_size = config.hidden_size
num_layers = config.num_hidden_layers
vocab_size = config.vocab_size
# General TFLOPs formula (borrowed from Equation 3 in Section 5.1 of
# https://arxiv.org/pdf/2104.04473.pdf).
# The factor of 4 is when used with activation check-pointing,
# otherwise it will be 3.
checkpoint_activations_factor = 4 if args.checkpoint_activations else 3
flops_per_iteration = (24 * checkpoint_activations_factor * batch_size * args.max_seq_length * num_layers * (hidden_size**2)) * (1. + (args.max_seq_length / (6. * hidden_size)) + (vocab_size / (16. * num_layers * hidden_size)))
tflops = flops_per_iteration / (elapsed_time_per_iter * (10**12))
return samples_per_second, tflops, approx_parameters_in_billions
def synchronize():
if not torch.distributed.is_available():
return
if not torch.distributed.is_intialized():
return
world_size = torch.distributed.get_world_size()
if world_size == 1:
return
torch.distributed.barrier()
def log_args(logger, args):
logger.info('--------args----------')
message = '\n'.join([f'{k:<30}: {v}' for k, v in vars(args).items()])
message += '\n'
message += '\n'.join([f'{k:<30}: {v}' for k, v in gpc.config.items()])
logger.info(message)
logger.info('--------args----------\n')

View File

@@ -0,0 +1,126 @@
import time
import torch
from .WandbLog import TensorboardLog
_GLOBAL_TIMERS = None
_GLOBAL_TENSORBOARD_WRITER = None
def set_global_variables(launch_time, tensorboard_path):
_set_timers()
_set_tensorboard_writer(launch_time, tensorboard_path)
def _set_timers():
"""Initialize timers."""
global _GLOBAL_TIMERS
_ensure_var_is_not_initialized(_GLOBAL_TIMERS, 'timers')
_GLOBAL_TIMERS = Timers()
def _set_tensorboard_writer(launch_time, tensorboard_path):
"""Set tensorboard writer."""
global _GLOBAL_TENSORBOARD_WRITER
_ensure_var_is_not_initialized(_GLOBAL_TENSORBOARD_WRITER,
'tensorboard writer')
if torch.distributed.get_rank() == 0:
_GLOBAL_TENSORBOARD_WRITER = TensorboardLog(tensorboard_path + f'/{launch_time}', launch_time)
def get_timers():
"""Return timers."""
_ensure_var_is_initialized(_GLOBAL_TIMERS, 'timers')
return _GLOBAL_TIMERS
def get_tensorboard_writer():
"""Return tensorboard writer. It can be None so no need
to check if it is initialized."""
return _GLOBAL_TENSORBOARD_WRITER
def _ensure_var_is_initialized(var, name):
"""Make sure the input variable is not None."""
assert var is not None, '{} is not initialized.'.format(name)
def _ensure_var_is_not_initialized(var, name):
"""Make sure the input variable is not None."""
assert var is None, '{} is already initialized.'.format(name)
class _Timer:
"""Timer."""
def __init__(self, name):
self.name_ = name
self.elapsed_ = 0.0
self.started_ = False
self.start_time = time.time()
def start(self):
"""Start the timer."""
# assert not self.started_, 'timer has already been started'
torch.cuda.synchronize()
self.start_time = time.time()
self.started_ = True
def stop(self):
"""Stop the timer."""
assert self.started_, 'timer is not started'
torch.cuda.synchronize()
self.elapsed_ += (time.time() - self.start_time)
self.started_ = False
def reset(self):
"""Reset timer."""
self.elapsed_ = 0.0
self.started_ = False
def elapsed(self, reset=True):
"""Calculate the elapsed time."""
started_ = self.started_
# If the timing in progress, end it first.
if self.started_:
self.stop()
# Get the elapsed time.
elapsed_ = self.elapsed_
# Reset the elapsed time
if reset:
self.reset()
# If timing was in progress, set it back.
if started_:
self.start()
return elapsed_
class Timers:
"""Group of timers."""
def __init__(self):
self.timers = {}
def __call__(self, name):
if name not in self.timers:
self.timers[name] = _Timer(name)
return self.timers[name]
def write(self, names, writer, iteration, normalizer=1.0, reset=False):
"""Write timers to a tensorboard writer"""
# currently when using add_scalars,
# torch.utils.add_scalars makes each timer its own run, which
# polutes the runs list, so we just add each as a scalar
assert normalizer > 0.0
for name in names:
value = self.timers[name].elapsed(reset=reset) / normalizer
writer.add_scalar(name + '-time', value, iteration)
def log(self, names, normalizer=1.0, reset=True):
"""Log a group of timers."""
assert normalizer > 0.0
string = 'time (ms)'
for name in names:
elapsed_time = self.timers[name].elapsed(
reset=reset) * 1000.0 / normalizer
string += ' | {}: {:.2f}'.format(name, elapsed_time)
if torch.distributed.is_initialized():
if torch.distributed.get_rank() == (
torch.distributed.get_world_size() - 1):
print(string, flush=True)
else:
print(string, flush=True)

View File

@@ -0,0 +1,31 @@
import os
import logging
import torch.distributed as dist
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
datefmt='%m/%d/%Y %H:%M:%S',
level=logging.INFO)
logger = logging.getLogger(__name__)
class Logger():
def __init__(self, log_path, cuda=False, debug=False):
self.logger = logging.getLogger(__name__)
self.cuda = cuda
self.log_path = log_path
self.debug = debug
def info(self, message, log_=True, print_=True, *args, **kwargs):
if (self.cuda and dist.get_rank() == 0) or not self.cuda:
if print_:
self.logger.info(message, *args, **kwargs)
if log_:
with open(self.log_path, 'a+') as f_log:
f_log.write(message + '\n')
def error(self, message, *args, **kwargs):
self.logger.error(message, *args, **kwargs)