mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-08-16 15:06:53 +00:00
* init
* rename and remove useless func
* basic chunk
* add evoformer
* align evoformer
* add meta
* basic chunk
* basic memory
* finish basic inference memory estimation
* finish memory estimation
* fix bug
* finish memory estimation
* add part of index tracer
* finish basic index tracer
* add doc string
* add doc str
* polish code
* polish code
* update active log
* polish code
* add possible region search
* finish region search loop
* finish chunk define
* support new op
* rename index tracer
* finishi codegen on msa
* redesign index tracer, add source and change compute
* pass outproduct mean
* code format
* code format
* work with outerproductmean and msa
* code style
* code style
* code style
* code style
* change threshold
* support check_index_duplicate
* support index dupilictae and update loop
* support output
* update memory estimate
* optimise search
* fix layernorm
* move flow tracer
* refactor flow tracer
* format code
* refactor flow search
* code style
* adapt codegen to prepose node
* code style
* remove abandoned function
* remove flow tracer
* code style
* code style
* reorder nodes
* finish node reorder
* update run
* code style
* add chunk select class
* add chunk select
* code style
* add chunksize in emit, fix bug in reassgin shape
* code style
* turn off print mem
* add evoformer openfold init
* init openfold
* add benchmark
* add print
* code style
* code style
* init openfold
* update openfold
* align openfold
* use max_mem to control stratge
* update source add
* add reorder in mem estimator
* improve reorder efficeincy
* support ones_like, add prompt if fit mode search fail
* fix a bug in ones like, dont gen chunk if dim size is 1
* fix bug again
* update min memory stratege, reduce mem usage by 30%
* last version of benchmark
* refactor structure
* restruct dir
* update test
* rename
* take apart chunk code gen
* close mem and code print
* code format
* rename ambiguous variable
* seperate flow tracer
* seperate input node dim search
* seperate prepose_nodes
* seperate non chunk input
* seperate reorder
* rename
* ad reorder graph
* seperate trace flow
* code style
* code style
* fix typo
* set benchmark
* rename test
* update codegen test
* Fix state_dict key missing issue of the ZeroDDP (#2363)
* Fix state_dict output for ZeroDDP duplicated parameters
* Rewrite state_dict based on get_static_torch_model
* Modify get_static_torch_model to be compatible with the lower version (ZeroDDP)
* update codegen test
* update codegen test
* add chunk search test
* code style
* add available
* [hotfix] fix gpt gemini example (#2404)
* [hotfix] fix gpt gemini example
* [example] add new assertions
* remove autochunk_available
* [workflow] added nightly release to pypi (#2403)
* add comments
* code style
* add doc for search chunk
* [doc] updated readme regarding pypi installation (#2406)
* add doc for search
* [doc] updated kernel-related optimisers' docstring (#2385)
* [doc] updated kernel-related optimisers' docstring
* polish doc
* rename trace_index to trace_indice
* rename function from index to indice
* rename
* rename in doc
* [polish] polish code for get_static_torch_model (#2405)
* [gemini] polish code
* [testing] remove code
* [gemini] make more robust
* rename
* rename
* remove useless function
* [worfklow] added coverage test (#2399)
* [worfklow] added coverage test
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* add doc for trace indice
* [docker] updated Dockerfile and release workflow (#2410)
* add doc
* update doc
* add available
* change imports
* add test in import
* [workflow] refactored the example check workflow (#2411)
* [workflow] refactored the example check workflow
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* Update parallel_context.py (#2408)
* [hotfix] add DISTPAN argument for benchmark (#2412)
* change the benchmark config file
* change config
* revert config file
* rename distpan to distplan
* [workflow] added precommit check for code consistency (#2401)
* [workflow] added precommit check for code consistency
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* polish code
* adapt new fx
* [workflow] added translation for non-english comments (#2414)
* [setup] refactored setup.py for dependency graph (#2413)
* change import
* update doc
* [workflow] auto comment if precommit check fails (#2417)
* [hotfix] add norm clearing for the overflow step (#2416)
* [examples] adding tflops to PaLM (#2365)
* [workflow]auto comment with test coverage report (#2419)
* [workflow]auto comment with test coverage report
* polish code
* polish yaml
* [doc] added documentation for CI/CD (#2420)
* [doc] added documentation for CI/CD
* polish markdown
* polish markdown
* polish markdown
* [example] removed duplicated stable diffusion example (#2424)
* [zero] add inference mode and its unit test (#2418)
* [workflow] report test coverage even if below threshold (#2431)
* [example] improved the clarity yof the example readme (#2427)
* [example] improved the clarity yof the example readme
* polish workflow
* polish workflow
* polish workflow
* polish workflow
* polish workflow
* polish workflow
* [ddp] add is_ddp_ignored (#2434)
[ddp] rename to is_ddp_ignored
* [workflow] make test coverage report collapsable (#2436)
* [autoparallel] add shard option (#2423)
* [fx] allow native ckpt trace and codegen. (#2438)
* [cli] provided more details if colossalai run fail (#2442)
* [autoparallel] integrate device mesh initialization into autoparallelize (#2393)
* [autoparallel] integrate device mesh initialization into autoparallelize
* add megatron solution
* update gpt autoparallel examples with latest api
* adapt beta value to fit the current computation cost
* [zero] fix state_dict and load_state_dict for ddp ignored parameters (#2443)
* [ddp] add is_ddp_ignored
[ddp] rename to is_ddp_ignored
* [zero] fix state_dict and load_state_dict
* fix bugs
* [zero] update unit test for ZeroDDP
* [example] updated the hybrid parallel tutorial (#2444)
* [example] updated the hybrid parallel tutorial
* polish code
* [zero] add warning for ignored parameters (#2446)
* [example] updated large-batch optimizer tutorial (#2448)
* [example] updated large-batch optimizer tutorial
* polish code
* polish code
* [example] fixed seed error in train_dreambooth_colossalai.py (#2445)
* [workflow] fixed the on-merge condition check (#2452)
* [workflow] automated the compatiblity test (#2453)
* [workflow] automated the compatiblity test
* polish code
* [autoparallel] update binary elementwise handler (#2451)
* [autoparallel] update binary elementwise handler
* polish
* [workflow] automated bdist wheel build (#2459)
* [workflow] automated bdist wheel build
* polish workflow
* polish readme
* polish readme
* Fix False warning in initialize.py (#2456)
* Update initialize.py
* pre-commit run check
* [examples] update autoparallel tutorial demo (#2449)
* [examples] update autoparallel tutorial demo
* add test_ci.sh
* polish
* add conda yaml
* [cli] fixed hostname mismatch error (#2465)
* [example] integrate autoparallel demo with CI (#2466)
* [example] integrate autoparallel demo with CI
* polish code
* polish code
* polish code
* polish code
* [zero] low level optim supports ProcessGroup (#2464)
* [example] update vit ci script (#2469)
* [example] update vit ci script
* [example] update requirements
* [example] update requirements
* [example] integrate seq-parallel tutorial with CI (#2463)
* [zero] polish low level optimizer (#2473)
* polish pp middleware (#2476)
Co-authored-by: Ziyue Jiang <ziyue.jiang@gmail.com>
* [example] update gpt gemini example ci test (#2477)
* [zero] add unit test for low-level zero init (#2474)
* [workflow] fixed the skip condition of example weekly check workflow (#2481)
* [example] stable diffusion add roadmap
* add dummy test_ci.sh
* [example] stable diffusion add roadmap (#2482)
* [CI] add test_ci.sh for palm, opt and gpt (#2475)
* polish code
* [example] titans for gpt
* polish readme
* remove license
* polish code
* update readme
* [example] titans for gpt (#2484)
* [autoparallel] support origin activation ckpt on autoprallel system (#2468)
* [autochunk] support evoformer tracer (#2485)
support full evoformer tracer, which is a main module of alphafold. previously we just support a simplifed version of it.
1. support some evoformer's op in fx
2. support evoformer test
3. add repos for test code
* [example] fix requirements (#2488)
* [zero] add unit testings for hybrid parallelism (#2486)
* [hotfix] gpt example titans bug #2493
* polish code and fix dataloader bugs
* [hotfix] gpt example titans bug #2493 (#2494)
* [fx] allow control of ckpt_codegen init (#2498)
* [fx] allow control of ckpt_codegen init
Currently in ColoGraphModule, ActivationCheckpointCodeGen will be set automatically in __init__. But other codegen can't be set if so.
So I add an arg to control whether to set ActivationCheckpointCodeGen in __init__.
* code style
* [example] dreambooth example
* add test_ci.sh to dreambooth
* [autochunk] support autochunk on evoformer (#2497)
* Revert "Update parallel_context.py (#2408)"
This reverts commit 7d5640b9db
.
* add avg partition (#2483)
Co-authored-by: Ziyue Jiang <ziyue.jiang@gmail.com>
* [auto-chunk] support extramsa (#3) (#2504)
* [utils] lazy init. (#2148)
* [utils] lazy init.
* [utils] remove description.
* [utils] complete.
* [utils] finalize.
* [utils] fix names.
* [autochunk] support parsing blocks (#2506)
* [zero] add strict ddp mode (#2508)
* [zero] add strict ddp mode
* [polish] add comments for strict ddp mode
* [zero] fix test error
* [doc] update opt and tutorial links (#2509)
* [workflow] fixed changed file detection (#2515)
Co-authored-by: oahzxl <xuanlei.zhao@gmail.com>
Co-authored-by: eric8607242 <e0928021388@gmail.com>
Co-authored-by: HELSON <c2h214748@gmail.com>
Co-authored-by: Frank Lee <somerlee.9@gmail.com>
Co-authored-by: Haofan Wang <haofanwang.ai@gmail.com>
Co-authored-by: Jiarui Fang <fangjiarui123@gmail.com>
Co-authored-by: ZijianYY <119492445+ZijianYY@users.noreply.github.com>
Co-authored-by: YuliangLiu0306 <72588413+YuliangLiu0306@users.noreply.github.com>
Co-authored-by: Super Daniel <78588128+super-dainiu@users.noreply.github.com>
Co-authored-by: ver217 <lhx0217@gmail.com>
Co-authored-by: Ziyue Jiang <ziyue.jiang97@gmail.com>
Co-authored-by: Ziyue Jiang <ziyue.jiang@gmail.com>
Co-authored-by: oahzxl <43881818+oahzxl@users.noreply.github.com>
Co-authored-by: binmakeswell <binmakeswell@gmail.com>
Co-authored-by: Fazzie-Maqianli <55798671+Fazziekey@users.noreply.github.com>
Co-authored-by: アマデウス <kurisusnowdeng@users.noreply.github.com>
249 lines
11 KiB
Python
249 lines
11 KiB
Python
import operator
|
|
from functools import reduce
|
|
from typing import List, Tuple
|
|
|
|
import torch
|
|
import torch.distributed as dist
|
|
|
|
|
|
class DeviceMesh:
|
|
"""A logical view of a physical mesh. The logical view is used in the
|
|
search process.
|
|
A physical mesh can have multiple logical views. (e.g., a 2x8 physical mesh
|
|
can be viewed as a 1x16 or a 4x4 logical mesh). Each mesh dimension has its
|
|
own latency and bandwidth. We use alpha-beta model to model the
|
|
communication cost.
|
|
|
|
Arguments:
|
|
physical_mesh_id (torch.Tensor): physical view of the devices in global rank.
|
|
logical_mesh_id (torch.Tensor): logical view of the devices in global rank.
|
|
mesh_shape (torch.Size, optional): shape of logical view.
|
|
mesh_alpha (List[float], optional): coefficients used for computing
|
|
communication cost (default: None)
|
|
mesh_beta (List[float], optional): coefficients used for computing
|
|
communication cost (default: None)
|
|
init_process_group (bool, optional): initialize logical process group
|
|
during initializing the DeviceMesh instance if the init_process_group set to True.
|
|
Otherwise, users need to call create_process_groups_for_logical_mesh manually to init logical process group.
|
|
(default: False)
|
|
need_flatten(bool, optional): initialize flatten_device_mesh during initializing the DeviceMesh instance if the need_flatten set to True.
|
|
"""
|
|
|
|
def __init__(self,
|
|
physical_mesh_id: torch.Tensor,
|
|
mesh_shape: torch.Size = None,
|
|
logical_mesh_id: torch.Tensor = None,
|
|
mesh_alpha: List[float] = None,
|
|
mesh_beta: List[float] = None,
|
|
init_process_group: bool = False,
|
|
need_flatten: bool = True):
|
|
self.physical_mesh_id = physical_mesh_id
|
|
if logical_mesh_id is None:
|
|
self.mesh_shape = mesh_shape
|
|
self._logical_mesh_id = self.physical_mesh_id.reshape(self.mesh_shape)
|
|
else:
|
|
self._logical_mesh_id = logical_mesh_id
|
|
self.mesh_shape = self._logical_mesh_id.shape
|
|
|
|
# map global rank into logical rank
|
|
self.convert_map = {}
|
|
self._global_rank_to_logical_rank_map(self._logical_mesh_id, [])
|
|
# coefficient for alpha-beta communication model
|
|
if mesh_alpha is None:
|
|
mesh_alpha = [1] * len(self.mesh_shape)
|
|
if mesh_beta is None:
|
|
mesh_beta = [1] * len(self.mesh_shape)
|
|
self.mesh_alpha = tuple(mesh_alpha)
|
|
self.mesh_beta = tuple(mesh_beta)
|
|
self.init_process_group = init_process_group
|
|
self.need_flatten = need_flatten
|
|
if self.init_process_group:
|
|
self.process_groups_dict = self.create_process_groups_for_logical_mesh()
|
|
if self.need_flatten and self._logical_mesh_id.dim() > 1:
|
|
self.flatten_device_mesh = self.flatten()
|
|
# Create a new member `flatten_device_meshes` to distinguish from original flatten methods (Because I'm not sure if there are functions that rely on the self.flatten())
|
|
# self.flatten_device_meshes = FlattenDeviceMesh(self.physical_mesh_id, self.mesh_shape, self.mesh_alpha,
|
|
# self.mesh_beta)
|
|
|
|
@property
|
|
def shape(self):
|
|
return self.mesh_shape
|
|
|
|
@property
|
|
def num_devices(self):
|
|
return reduce(operator.mul, self.physical_mesh_id.shape, 1)
|
|
|
|
@property
|
|
def logical_mesh_id(self):
|
|
return self._logical_mesh_id
|
|
|
|
def __deepcopy__(self, memo):
|
|
cls = self.__class__
|
|
result = cls.__new__(cls)
|
|
memo[id(self)] = result
|
|
for k, v in self.__dict__.items():
|
|
if k != 'process_groups_dict':
|
|
setattr(result, k, __import__("copy").deepcopy(v, memo))
|
|
else:
|
|
setattr(result, k, v)
|
|
|
|
return result
|
|
|
|
def flatten(self):
|
|
"""
|
|
Flatten the logical mesh into an effective 1d logical mesh,
|
|
"""
|
|
flatten_mesh_shape_size = len(self.mesh_shape)
|
|
flatten_mesh_shape = [self.num_devices]
|
|
return DeviceMesh(self.physical_mesh_id,
|
|
tuple(flatten_mesh_shape),
|
|
mesh_alpha=[max(self.mesh_alpha)] * (flatten_mesh_shape_size - 1),
|
|
mesh_beta=[min(self.mesh_beta)] * (flatten_mesh_shape_size - 1),
|
|
init_process_group=self.init_process_group,
|
|
need_flatten=False)
|
|
|
|
def _global_rank_to_logical_rank_map(self, tensor, index_list):
|
|
'''
|
|
This method is a helper function to build convert_map recursively.
|
|
'''
|
|
for index, inner_tensor in enumerate(tensor):
|
|
if inner_tensor.numel() == 1:
|
|
self.convert_map[int(inner_tensor)] = index_list + [index]
|
|
else:
|
|
self._global_rank_to_logical_rank_map(inner_tensor, index_list + [index])
|
|
|
|
def create_process_groups_for_logical_mesh(self):
|
|
'''
|
|
This method is used to initialize the logical process groups which will be used in communications
|
|
among logical device mesh.
|
|
Note: if init_process_group set to False, you have to call this method manually. Otherwise,
|
|
the communication related function, such as ShapeConsistencyManager.apply will raise errors.
|
|
'''
|
|
process_groups_dict = {}
|
|
check_duplicate_list = []
|
|
global_rank_flatten_list = self.physical_mesh_id.view(-1).tolist()
|
|
for global_rank in global_rank_flatten_list:
|
|
process_groups = self.global_rank_to_process_groups_with_global_rank(global_rank)
|
|
for axis, process_group in process_groups.items():
|
|
if axis not in process_groups_dict:
|
|
process_groups_dict[axis] = []
|
|
if process_group not in check_duplicate_list:
|
|
check_duplicate_list.append(process_group)
|
|
process_group_handler = dist.new_group(process_group)
|
|
process_groups_dict[axis].append((process_group, process_group_handler))
|
|
|
|
return process_groups_dict
|
|
|
|
def global_rank_to_logical_rank(self, rank):
|
|
return self.convert_map[rank]
|
|
|
|
def global_rank_to_process_groups_with_logical_rank(self, rank):
|
|
'''
|
|
Give a global rank and return all logical process groups of this rank.
|
|
for example:
|
|
physical_mesh_id = torch.arange(0, 16).reshape(2, 8)
|
|
mesh_shape = (4, 4)
|
|
# [[0, 1, 2, 3],
|
|
# [4, 5, 6, 7],
|
|
# [8, 9, 10,11],
|
|
# [12,13,14,15]]
|
|
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape)
|
|
print(device_mesh.global_rank_to_process_groups_with_logical_rank(0))
|
|
output:
|
|
# key is axis name
|
|
# value is a list of logical ranks in same axis with rank 0
|
|
{0: [[0, 0], [1, 0], [2, 0], [3, 0]], 1: [[0, 0], [0, 1], [0, 2], [0, 3]]}
|
|
'''
|
|
process_groups = {}
|
|
for d in range(self.logical_mesh_id.dim()):
|
|
for replacer in range(self.logical_mesh_id.shape[d]):
|
|
if d not in process_groups:
|
|
process_groups[d] = []
|
|
process_group_member = self.convert_map[rank].copy()
|
|
process_group_member[d] = replacer
|
|
process_groups[d].append(process_group_member)
|
|
return process_groups
|
|
|
|
def global_rank_to_process_groups_with_global_rank(self, rank):
|
|
'''
|
|
Give a global rank and return all process groups of this rank.
|
|
for example:
|
|
physical_mesh_id = torch.arange(0, 16).reshape(2, 8)
|
|
mesh_shape = (4, 4)
|
|
# [[0, 1, 2, 3],
|
|
# [4, 5, 6, 7],
|
|
# [8, 9, 10,11],
|
|
# [12,13,14,15]]
|
|
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape)
|
|
print(device_mesh.global_rank_to_process_groups_with_global_rank(0))
|
|
output:
|
|
# key is axis name
|
|
# value is a list of global ranks in same axis with rank 0
|
|
{0: [0, 4, 8, 12], 1: [0, 1, 2, 3]}
|
|
'''
|
|
logical_process_groups = self.global_rank_to_process_groups_with_logical_rank(rank)
|
|
process_groups = {}
|
|
for dim, logical_ranks in logical_process_groups.items():
|
|
process_groups[dim] = []
|
|
for logical_rank in logical_ranks:
|
|
for g_rank, l_rank in self.convert_map.items():
|
|
if l_rank == logical_rank:
|
|
process_groups[dim].append(g_rank)
|
|
return process_groups
|
|
|
|
def all_gather_cost(self, num_bytes, mesh_dim):
|
|
num_devices = self.logical_mesh_id.shape[mesh_dim]
|
|
return (self.mesh_alpha[mesh_dim] + self.mesh_beta[mesh_dim] * (num_devices - 1) / num_devices * num_bytes +
|
|
0.1)
|
|
|
|
def all_reduce_cost(self, num_bytes, mesh_dim):
|
|
num_devices = self.logical_mesh_id.shape[mesh_dim]
|
|
return (self.mesh_alpha[mesh_dim] + self.mesh_beta[mesh_dim] * 2 * (num_devices - 1) / num_devices * num_bytes +
|
|
0.01)
|
|
|
|
def reduce_scatter_cost(self, num_bytes, mesh_dim):
|
|
num_devices = self.logical_mesh_id.shape[mesh_dim]
|
|
return (self.mesh_alpha[mesh_dim] + self.mesh_beta[mesh_dim] * (num_devices - 1) / num_devices * num_bytes +
|
|
0.001)
|
|
|
|
def all_to_all_cost(self, num_bytes, mesh_dim):
|
|
num_devices = self.logical_mesh_id.shape[mesh_dim]
|
|
penalty_factor = num_devices / 2.0
|
|
return (self.mesh_alpha[mesh_dim] + self.mesh_beta[mesh_dim] *
|
|
(num_devices - 1) / num_devices / num_devices * num_bytes * penalty_factor + 0.001)
|
|
|
|
|
|
class FlattenDeviceMesh(DeviceMesh):
|
|
|
|
def __init__(self, physical_mesh_id, mesh_shape, mesh_alpha=None, mesh_beta=None):
|
|
super().__init__(physical_mesh_id,
|
|
mesh_shape,
|
|
mesh_alpha,
|
|
mesh_beta,
|
|
init_process_group=False,
|
|
need_flatten=False)
|
|
# Different from flatten(), mesh_shape leaves unchanged, mesh_alpha and mesh_beta are scalars
|
|
self.mesh_alpha = max(self.mesh_alpha)
|
|
self.mesh_beta = min(self.mesh_beta)
|
|
# Different from original process_groups_dict, rank_list is not stored
|
|
self.process_number_dict = self.create_process_numbers_for_logical_mesh()
|
|
|
|
def create_process_numbers_for_logical_mesh(self):
|
|
'''
|
|
Build 1d DeviceMesh in column-major(0) and row-major(1)
|
|
for example:
|
|
mesh_shape = (2,4)
|
|
# [[0, 1, 2, 3],
|
|
# [4, 5, 6, 7]]
|
|
# return {0: [0, 4, 1, 5, 2, 6, 3, 7], 1: [0, 1, 2, 3, 4, 5, 6, 7]}
|
|
'''
|
|
num_devices = reduce(operator.mul, self.mesh_shape, 1)
|
|
process_numbers_dict = {}
|
|
process_numbers_dict[0] = torch.arange(num_devices).reshape(self.mesh_shape).transpose(1, 0).flatten().tolist()
|
|
process_numbers_dict[1] = torch.arange(num_devices).reshape(self.mesh_shape).flatten().tolist()
|
|
return process_numbers_dict
|
|
|
|
def mix_gather_cost(self, num_bytes):
|
|
num_devices = reduce(operator.mul, self.mesh_shape, 1)
|
|
return (self.mesh_alpha + self.mesh_beta * (num_devices - 1) / num_devices * num_bytes + 0.1)
|