diff --git a/colossalai/cli/cli.py b/colossalai/cli/cli.py index 3e5b9ae63..a94e1150e 100644 --- a/colossalai/cli/cli.py +++ b/colossalai/cli/cli.py @@ -1,7 +1,8 @@ import click -from .launcher import run -from .check import check + from .benchmark import benchmark +from .check import check +from .launcher import run class Arguments(): diff --git a/colossalai/cli/launcher/__init__.py b/colossalai/cli/launcher/__init__.py index 4ada68b4b..8d9ec147d 100644 --- a/colossalai/cli/launcher/__init__.py +++ b/colossalai/cli/launcher/__init__.py @@ -1,7 +1,9 @@ import click -from .run import launch_multi_processes + from colossalai.context import Config +from .run import launch_multi_processes + @click.command(help="Launch distributed training on a single node or multiple nodes", context_settings=dict(ignore_unknown_options=True)) diff --git a/colossalai/context/moe_context.py b/colossalai/context/moe_context.py index 0879f5fd2..1d7a883b1 100644 --- a/colossalai/context/moe_context.py +++ b/colossalai/context/moe_context.py @@ -1,129 +1,129 @@ -import torch -import torch.distributed as dist - -from colossalai.context.parallel_mode import ParallelMode -from colossalai.context.singleton_meta import SingletonMeta -from colossalai.tensor import ProcessGroup - -from typing import Tuple - - -def _check_sanity(): - from colossalai.core import global_context as gpc - if gpc.tensor_parallel_size > 1 or gpc.pipeline_parallel_size > 1: - raise NotImplementedError("Moe is not compatible with tensor or " - "pipeline parallel at present.") - - -class MoeParallelInfo: - """Moe parallelism information, storing parallel sizes and groups. - """ - - def __init__(self, ep_size: int, dp_size: int): - _check_sanity() - self.ep_size = ep_size - self.dp_size = dp_size - self.pg = ProcessGroup(tp_degree=ep_size, dp_degree=dp_size) - self.ep_group = self.pg.tp_process_group() - self.dp_group = self.pg.dp_process_group() - - -class MoeContext(metaclass=SingletonMeta): - """MoE parallel context manager. This class manages different - parallel groups in MoE context and MoE loss in training. - """ - - def __init__(self): - self.world_size = 1 - # Users may want to set maximum expert parallel size smaller than the world size - # since very low bandwidth across nodes may constrain the performance of MoE - # When we have a maximum expert parallel size, we have a minimum data parallel size naturally - self.max_ep_size = 1 - self.min_dp_size = 1 - self.aux_loss = None - self.use_kernel_optim = True - - self.has_setup = False - self._parallel_info_dict = dict() - - @property - def parallel_info_dict(self): - return self._parallel_info_dict - - @property - def is_initialized(self): - return self.has_setup - - def setup(self, seed: int, use_kernel_optim: bool = True): - assert not self.is_initialized, "MoE distributed context shouldn't be set up again" - _check_sanity() - assert torch.cuda.is_available(), "MoE requires to enable CUDA first" - - self.world_size = dist.get_world_size() - - from colossalai.core import global_context as gpc - self.max_ep_size = gpc.config.get('max_ep_size', self.world_size) - assert self.world_size % self.max_ep_size == 0, \ - "Maximum epxert parallel size must be a factor of the number of GPUs" - self.min_dp_size = self.world_size // self.max_ep_size - - # Enabling kernel optimization may raise error in some cases - # Users can close kernel optimization manually - self.use_kernel_optim = use_kernel_optim - - from .random import moe_set_seed - moe_set_seed(seed) - self.has_setup = True - - def get_info(self, num_experts: int) -> Tuple[int, MoeParallelInfo]: - """Calculate the Data Parallel Group and Expert Parallel Group. - - Parameters - ---------- - num_experts : int - The number experts - - Returns - ------- - int, MoeParallelInfo - number of local experts, the MoeParallelInfo of the current ep_size - """ - - gt_flag = num_experts % self.max_ep_size == 0 # check whether num_experts is greater - lt_flag = self.max_ep_size % num_experts == 0 # check whether num_experts is less - - assert gt_flag or lt_flag, "Automatic experts placement dose not not support expert number" \ - " is not a multiple of ep size or vice versa." - - # If the number of experts is greater than maximum expert parallel size. a.k.a ep_size, - # there are multiple experts in each GPU and each GPU has different experts - # So it's data parallel size is 1 - # Otherwise, there is only one expert in each GPU - # The data parallel size should be calculated - dp_size = 1 if gt_flag else self.max_ep_size // num_experts - ep_size = self.max_ep_size // dp_size - - # Calculate the number of experts for each GPU - num_local_experts = 1 if lt_flag else num_experts // self.max_ep_size - - # Don't forget to multiply minimum data parallel size - dp_size *= self.min_dp_size - if not (ep_size in self.parallel_info_dict): - self.parallel_info_dict[ep_size] = MoeParallelInfo(ep_size, dp_size) - - return num_local_experts, self.parallel_info_dict[ep_size] - - def set_kernel_not_use(self): - self.use_kernel_optim = False - - def reset_loss(self): - self.aux_loss = 0 - - def add_loss(self, loss): - self.aux_loss += loss - - def get_loss(self): - return self.aux_loss - - -MOE_CONTEXT = MoeContext() +from typing import Tuple + +import torch +import torch.distributed as dist + +from colossalai.context.parallel_mode import ParallelMode +from colossalai.context.singleton_meta import SingletonMeta +from colossalai.tensor import ProcessGroup + + +def _check_sanity(): + from colossalai.core import global_context as gpc + if gpc.tensor_parallel_size > 1 or gpc.pipeline_parallel_size > 1: + raise NotImplementedError("Moe is not compatible with tensor or " + "pipeline parallel at present.") + + +class MoeParallelInfo: + """Moe parallelism information, storing parallel sizes and groups. + """ + + def __init__(self, ep_size: int, dp_size: int): + _check_sanity() + self.ep_size = ep_size + self.dp_size = dp_size + self.pg = ProcessGroup(tp_degree=ep_size, dp_degree=dp_size) + self.ep_group = self.pg.tp_process_group() + self.dp_group = self.pg.dp_process_group() + + +class MoeContext(metaclass=SingletonMeta): + """MoE parallel context manager. This class manages different + parallel groups in MoE context and MoE loss in training. + """ + + def __init__(self): + self.world_size = 1 + # Users may want to set maximum expert parallel size smaller than the world size + # since very low bandwidth across nodes may constrain the performance of MoE + # When we have a maximum expert parallel size, we have a minimum data parallel size naturally + self.max_ep_size = 1 + self.min_dp_size = 1 + self.aux_loss = None + self.use_kernel_optim = True + + self.has_setup = False + self._parallel_info_dict = dict() + + @property + def parallel_info_dict(self): + return self._parallel_info_dict + + @property + def is_initialized(self): + return self.has_setup + + def setup(self, seed: int, use_kernel_optim: bool = True): + assert not self.is_initialized, "MoE distributed context shouldn't be set up again" + _check_sanity() + assert torch.cuda.is_available(), "MoE requires to enable CUDA first" + + self.world_size = dist.get_world_size() + + from colossalai.core import global_context as gpc + self.max_ep_size = gpc.config.get('max_ep_size', self.world_size) + assert self.world_size % self.max_ep_size == 0, \ + "Maximum epxert parallel size must be a factor of the number of GPUs" + self.min_dp_size = self.world_size // self.max_ep_size + + # Enabling kernel optimization may raise error in some cases + # Users can close kernel optimization manually + self.use_kernel_optim = use_kernel_optim + + from .random import moe_set_seed + moe_set_seed(seed) + self.has_setup = True + + def get_info(self, num_experts: int) -> Tuple[int, MoeParallelInfo]: + """Calculate the Data Parallel Group and Expert Parallel Group. + + Parameters + ---------- + num_experts : int + The number experts + + Returns + ------- + int, MoeParallelInfo + number of local experts, the MoeParallelInfo of the current ep_size + """ + + gt_flag = num_experts % self.max_ep_size == 0 # check whether num_experts is greater + lt_flag = self.max_ep_size % num_experts == 0 # check whether num_experts is less + + assert gt_flag or lt_flag, "Automatic experts placement dose not not support expert number" \ + " is not a multiple of ep size or vice versa." + + # If the number of experts is greater than maximum expert parallel size. a.k.a ep_size, + # there are multiple experts in each GPU and each GPU has different experts + # So it's data parallel size is 1 + # Otherwise, there is only one expert in each GPU + # The data parallel size should be calculated + dp_size = 1 if gt_flag else self.max_ep_size // num_experts + ep_size = self.max_ep_size // dp_size + + # Calculate the number of experts for each GPU + num_local_experts = 1 if lt_flag else num_experts // self.max_ep_size + + # Don't forget to multiply minimum data parallel size + dp_size *= self.min_dp_size + if not (ep_size in self.parallel_info_dict): + self.parallel_info_dict[ep_size] = MoeParallelInfo(ep_size, dp_size) + + return num_local_experts, self.parallel_info_dict[ep_size] + + def set_kernel_not_use(self): + self.use_kernel_optim = False + + def reset_loss(self): + self.aux_loss = 0 + + def add_loss(self, loss): + self.aux_loss += loss + + def get_loss(self): + return self.aux_loss + + +MOE_CONTEXT = MoeContext() diff --git a/colossalai/context/process_group_initializer/initializer_2d.py b/colossalai/context/process_group_initializer/initializer_2d.py index fe0ba553d..7fbe3be59 100644 --- a/colossalai/context/process_group_initializer/initializer_2d.py +++ b/colossalai/context/process_group_initializer/initializer_2d.py @@ -2,10 +2,11 @@ import math import torch.distributed as dist -from colossalai.registry import DIST_GROUP_INITIALIZER -from .process_group_initializer import ProcessGroupInitializer -from ..parallel_mode import ParallelMode from colossalai.global_variables import tensor_parallel_env as env +from colossalai.registry import DIST_GROUP_INITIALIZER + +from ..parallel_mode import ParallelMode +from .process_group_initializer import ProcessGroupInitializer def _check_summa_env_var(summa_dim): diff --git a/colossalai/context/process_group_initializer/initializer_pipeline.py b/colossalai/context/process_group_initializer/initializer_pipeline.py index edd1a3706..0ddb52f63 100644 --- a/colossalai/context/process_group_initializer/initializer_pipeline.py +++ b/colossalai/context/process_group_initializer/initializer_pipeline.py @@ -4,8 +4,9 @@ from torch import distributed as dist from colossalai.registry import DIST_GROUP_INITIALIZER -from .process_group_initializer import ProcessGroupInitializer + from ..parallel_mode import ParallelMode +from .process_group_initializer import ProcessGroupInitializer @DIST_GROUP_INITIALIZER.register_module diff --git a/colossalai/context/process_group_initializer/initializer_sequence.py b/colossalai/context/process_group_initializer/initializer_sequence.py index 682fe4bb7..eaacb14d2 100644 --- a/colossalai/context/process_group_initializer/initializer_sequence.py +++ b/colossalai/context/process_group_initializer/initializer_sequence.py @@ -3,9 +3,10 @@ import torch.distributed as dist from colossalai.registry import DIST_GROUP_INITIALIZER + +from ..parallel_mode import ParallelMode from .initializer_tensor import Initializer_Tensor from .process_group_initializer import ProcessGroupInitializer -from ..parallel_mode import ParallelMode @DIST_GROUP_INITIALIZER.register_module diff --git a/colossalai/engine/gradient_handler/utils.py b/colossalai/engine/gradient_handler/utils.py index e92044b47..fca5f2ec9 100644 --- a/colossalai/engine/gradient_handler/utils.py +++ b/colossalai/engine/gradient_handler/utils.py @@ -1,29 +1,30 @@ -import torch.distributed as dist -import torch.nn as nn -from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors -from typing import Iterable - - -def bucket_allreduce(param_list: Iterable[nn.Parameter], group=None): - # get communication world size - comm_size = dist.get_world_size(group) - # bucketize and all-reduce - buckets = {} - # Pack the buckets. - for param in param_list: - if param.requires_grad and param.grad is not None: - tp = param.data.type() - if tp not in buckets: - buckets[tp] = [] - buckets[tp].append(param) - - # For each bucket, all-reduce and copy all-reduced grads. - for tp in buckets: - bucket = buckets[tp] - grads = [param.grad.data for param in bucket] - coalesced = _flatten_dense_tensors(grads) - coalesced /= comm_size - - dist.all_reduce(coalesced, group=group) - for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)): - buf.copy_(synced) +from typing import Iterable + +import torch.distributed as dist +import torch.nn as nn +from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors + + +def bucket_allreduce(param_list: Iterable[nn.Parameter], group=None): + # get communication world size + comm_size = dist.get_world_size(group) + # bucketize and all-reduce + buckets = {} + # Pack the buckets. + for param in param_list: + if param.requires_grad and param.grad is not None: + tp = param.data.type() + if tp not in buckets: + buckets[tp] = [] + buckets[tp].append(param) + + # For each bucket, all-reduce and copy all-reduced grads. + for tp in buckets: + bucket = buckets[tp] + grads = [param.grad.data for param in bucket] + coalesced = _flatten_dense_tensors(grads) + coalesced /= comm_size + + dist.all_reduce(coalesced, group=group) + for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)): + buf.copy_(synced) diff --git a/colossalai/gemini/gemini_context.py b/colossalai/gemini/gemini_context.py index 98c8a914e..9a7da6b80 100644 --- a/colossalai/gemini/gemini_context.py +++ b/colossalai/gemini/gemini_context.py @@ -1,48 +1,48 @@ -from enum import EnumMeta - - -class GeminiMemoryManager(object): - - def __init__(self, states_cls: EnumMeta): - super().__init__() - self.states_cls = states_cls - self._cnter = 0 # the counter of instances - - self.total_mem = dict() - self.state_mem = dict() - self.state_mem['cpu'] = dict() - self.state_mem['cuda'] = dict() - - self.reset() - - @property - def total_number(self): - return self._cnter - - def reset(self): - self._cnter = 0 # the counter of instances - - self.total_mem['cpu'] = 0 # memory occupation of instances in cpu - self.total_mem['cuda'] = 0 # memory of occupation of instances in cuda - - # memory conditions for all states - for state in self.states_cls: - self.state_mem['cpu'][state] = 0 - self.state_mem['cuda'][state] = 0 - - def register_new_instance(self): - self._cnter += 1 - - def delete_instance(self): - self._cnter -= 1 - - def print_info(self): - print(f"Total number: {self.total_number}", - f"Total CPU memory occupation: {self.total_mem['cpu']}", - f"Total CUDA memory occupation: {self.total_mem['cuda']}\n", - sep='\n') - - for state in self.states_cls: - print(f"{state}: CPU memory occupation: {self.state_mem['cpu'][state]}", - f"{state}: CUDA memory occupation: {self.state_mem['cuda'][state]}\n", - sep='\n') +from enum import EnumMeta + + +class GeminiMemoryManager(object): + + def __init__(self, states_cls: EnumMeta): + super().__init__() + self.states_cls = states_cls + self._cnter = 0 # the counter of instances + + self.total_mem = dict() + self.state_mem = dict() + self.state_mem['cpu'] = dict() + self.state_mem['cuda'] = dict() + + self.reset() + + @property + def total_number(self): + return self._cnter + + def reset(self): + self._cnter = 0 # the counter of instances + + self.total_mem['cpu'] = 0 # memory occupation of instances in cpu + self.total_mem['cuda'] = 0 # memory of occupation of instances in cuda + + # memory conditions for all states + for state in self.states_cls: + self.state_mem['cpu'][state] = 0 + self.state_mem['cuda'][state] = 0 + + def register_new_instance(self): + self._cnter += 1 + + def delete_instance(self): + self._cnter -= 1 + + def print_info(self): + print(f"Total number: {self.total_number}", + f"Total CPU memory occupation: {self.total_mem['cpu']}", + f"Total CUDA memory occupation: {self.total_mem['cuda']}\n", + sep='\n') + + for state in self.states_cls: + print(f"{state}: CPU memory occupation: {self.state_mem['cpu'][state]}", + f"{state}: CUDA memory occupation: {self.state_mem['cuda'][state]}\n", + sep='\n')