ColossalAI/tests/test_pipeline/test_pipeline_process_group.py
Kirigaya Kazuto f1e1836218
[pipeline/pipleline_process_group] finish PipelineProcessGroup to manage local abd global rank in TP,DP and PP (#1508)
* support p2p communication with any type of object | pass test

* reconstruct pipeline schedule with p2p_v2.py(support communication with List[Any]) | pass test

* [engin/schedule] use p2p_v2 to recontruct pipeline_schedule

* [pipeline/rpc] implement a demo for PP with cuda rpc framework

* [pipeline/rpc] support interleaving | fix checkpoint bug | change logic when dispatch data in work_list to ensure steady 1F1B

* [pipeline/rpc] implement distributed optimizer | test with assert_close

* [pipeline/rpc] implement distributed optimizer | test with assert_close

* [pipeline/rpc] update outstanding mechanism | optimize dispatching strategy

* [pipeline/rpc] update outstanding mechanism | optimize dispatching strategy

* [pipeline/rpc] update outstanding mechanism | optimize dispatching strategy

* [pipeline/pipleline_process_group] finish PipelineProcessGroup to manage local abd global rank in TP,DP and PP

* [pipeline/pipleline_process_group] remove comment

* [pipeline/pipleline_process_group] remove comment

* [pipeline/pipleline_process_group] skip process group test

* [pipeline/pipleline_process_group] remove test named function
2022-09-01 17:45:47 +08:00

43 lines
1.3 KiB
Python

import os
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import pytest
from colossalai.pipeline.pipeline_process_group import PipelineProcessGroup
from colossalai.initialize import launch
from colossalai.logging import disable_existing_loggers
from rpc_test_utils import pg_parse_args, rpc_is_initialized
def run_worker(rank, args):
os.environ['MASTER_ADDR'] = args.master_addr
os.environ['MASTER_PORT'] = args.master_port
device = args.device
world_size = args.world_size
dp_degree = args.dp_degree
tp_degree = args.tp_degree
num_worker_threads = args.num_worker_threads
host = args.master_addr
port = args.master_port
backend = 'nccl' if device == 'cuda' else 'gloo'
disable_existing_loggers()
launch(dict(), rank, world_size, host, int(port), backend, verbose=False)
pg = PipelineProcessGroup(rank=rank,
world_size=world_size,
dp_degree=dp_degree,
tp_degree=tp_degree,
num_worker_threads=num_worker_threads,
device=device)
if rpc_is_initialized():
rpc.shutdown()
if __name__ == "__main__":
args = pg_parse_args()
world_size = args.world_size
mp.spawn(run_worker, args=(args,), nprocs=world_size)