1
0
mirror of https://github.com/hpcaitech/ColossalAI.git synced 2025-09-18 16:00:49 +00:00
Files
.github
applications
colossalai
docker
docs
examples
extensions
requirements
tests
kit
test_analyzer
test_auto_parallel
test_autochunk
test_booster
test_checkpoint_io
test_cluster
test_config
test_device
test_fx
test_infer
test_lazy
test_legacy
test_lora
test_moe
test_optimizer
test_pipeline
test_shardformer
test_hybrid_parallel_grad_clip_norm
test_layer
test_model
__init__.py
test_flash_attention.py
test_shard_utils.py
test_with_torch_ddp.py
test_smoothquant
test_tensor
test_zero
__init__.py
.clang-format
.compatibility
.coveragerc
.cuda_ext.json
.gitignore
.gitmodules
.isort.cfg
.pre-commit-config.yaml
CHANGE_LOG.md
CONTRIBUTING.md
LICENSE
MANIFEST.in
README.md
pytest.ini
setup.py
version.txt
ColossalAI/tests/test_shardformer/test_with_torch_ddp.py
Hongxin Liu 7f8b16635b [misc] refactor launch API and tensor constructor ()
* [misc] remove config arg from initialize

* [misc] remove old tensor contrusctor

* [plugin] add npu support for ddp

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* [devops] fix doc test ci

* [test] fix test launch

* [doc] update launch doc

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2024-04-29 10:40:11 +08:00

87 lines
2.6 KiB
Python

from contextlib import nullcontext
import pytest
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import colossalai
from colossalai.cluster import DistCoordinator
from colossalai.lazy import LazyInitContext
from colossalai.logging import disable_existing_loggers
from colossalai.shardformer import ShardConfig, ShardFormer
from colossalai.testing import clear_cache_before_run, parameterize, rerun_if_address_is_in_use, spawn
from tests.kit.model_zoo import model_zoo
@parameterize("lazy_init", [True, False])
def check_shardformer_with_ddp(lazy_init: bool):
sub_model_zoo = model_zoo.get_sub_registry("transformers_gpt", exclude="transformers_gptj")
# create shardformer
# ranks: [0, 1, 2, 3]
# tp ranks = [0, 1], [2, 3]
# dp ranks = [0, 2], [1, 3]
dp_process_group_1 = dist.new_group([0, 2])
dp_process_group_2 = dist.new_group([1, 3])
tp_process_group_1 = dist.new_group([0, 1])
tp_process_group_2 = dist.new_group([2, 3])
coordinator = DistCoordinator()
if coordinator.rank in [0, 1]:
tp_process_group = tp_process_group_1
else:
tp_process_group = tp_process_group_2
if coordinator.rank in [0, 2]:
dp_process_group = dp_process_group_1
else:
dp_process_group = dp_process_group_2
shard_config = ShardConfig(tensor_parallel_process_group=tp_process_group, enable_fused_normalization=True)
shardformer = ShardFormer(shard_config=shard_config)
ctx = LazyInitContext() if lazy_init else nullcontext()
for name, (model_fn, data_gen_fn, output_transform_fn, loss_fn, _) in sub_model_zoo.items():
# create and shard model
with ctx:
model = model_fn().cuda()
sharded_model, _ = shardformer.optimize(model)
# add ddp
sharded_ddp_model = DDP(sharded_model, process_group=dp_process_group)
# prepare input
data = data_gen_fn()
data = {k: v.cuda() for k, v in data.items()}
# switch to train mode
sharded_ddp_model.train()
# run forward
output = sharded_ddp_model(**data)
loss = loss_fn(output)
# backward
loss.backward()
torch.cuda.empty_cache()
def run_dist(rank, world_size, port):
disable_existing_loggers()
colossalai.launch(rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl")
check_shardformer_with_ddp()
@pytest.mark.dist
@rerun_if_address_is_in_use()
@clear_cache_before_run()
def test_gpt2():
spawn(run_dist, 4)
if __name__ == "__main__":
test_gpt2()