From ff4f5d7231d2f2e7387cd6175649fc678aaf6f39 Mon Sep 17 00:00:00 2001 From: sohn <59533593+oikosohn@users.noreply.github.com> Date: Sun, 13 Mar 2022 23:34:34 +0900 Subject: [PATCH 01/13] fix typo in CHANGE_LOG.md - fix typo, `Unifed` -> `Unified` below Added --- CHANGE_LOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGE_LOG.md b/CHANGE_LOG.md index ea69b301d..bbf1d62f9 100644 --- a/CHANGE_LOG.md +++ b/CHANGE_LOG.md @@ -6,7 +6,7 @@ All notable changes to this project will be documented in this file. ### Added -- Unifed distributed layers +- Unified distributed layers - MoE support - DevOps tools such as github action, code review automation, etc. - New project official website @@ -33,4 +33,4 @@ The first beta version of Colossal-AI. Thanks to all contributors for the effort ### Added - Initial architecture of the system -- Features such as tensor parallelism, gradient clipping, gradient accumulation \ No newline at end of file +- Features such as tensor parallelism, gradient clipping, gradient accumulation From 6098bc4ccea9d4be45b05587a733125039699e96 Mon Sep 17 00:00:00 2001 From: github-actions Date: Mon, 14 Mar 2022 00:01:12 +0000 Subject: [PATCH 02/13] Automated submodule synchronization --- benchmark | 2 +- examples | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmark b/benchmark index 62904e4ff..9757a1374 160000 --- a/benchmark +++ b/benchmark @@ -1 +1 @@ -Subproject commit 62904e4ff2f3261c5469c773faa3d9307b6f16f4 +Subproject commit 9757a137495a8fc8b12133087cffe3e4a97ed2cb diff --git a/examples b/examples index d50ef2db5..5345187ad 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit d50ef2db51e7d02ed3f7e9de13f9af86b04eaae9 +Subproject commit 5345187ad55e8c80c111e0c5f7ad9b9241e8f913 From 88804aee4905562ba338385c4562a12d8385bc57 Mon Sep 17 00:00:00 2001 From: ver217 Date: Mon, 14 Mar 2022 14:48:32 +0800 Subject: [PATCH 03/13] add bucket tensor shard strategy --- colossalai/engine/ophooks/zero_hook.py | 27 +++++++++---- colossalai/zero/shard_utils/__init__.py | 7 ++-- .../bucket_tensor_shard_strategy.py | 38 +++++++++++++++++++ 3 files changed, 62 insertions(+), 10 deletions(-) create mode 100644 colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py diff --git a/colossalai/engine/ophooks/zero_hook.py b/colossalai/engine/ophooks/zero_hook.py index e66f90ef5..938826b55 100644 --- a/colossalai/engine/ophooks/zero_hook.py +++ b/colossalai/engine/ophooks/zero_hook.py @@ -1,7 +1,8 @@ import torch from colossalai.registry import OPHOOKS -from colossalai.zero.shard_utils import BaseShardStrategy from colossalai.utils import get_current_device +from colossalai.zero.shard_utils import BaseShardStrategy + from ._base_ophook import BaseOpHook @@ -18,23 +19,32 @@ class ZeroHook(BaseOpHook): self.computing_device = torch.device(f'cuda:{get_current_device()}') def pre_fwd_exec(self, module: torch.nn.Module, *args): + tensor_list = [] for param in module.parameters(): assert hasattr(param, 'col_attr') - self.shard_strategy.gather([param.col_attr.data]) + tensor_list.append(param.col_attr.data) + self.shard_strategy.gather(tensor_list) + for param in module.parameters(): if param.col_attr.data.device != self.computing_device: param.col_attr.data.to(self.computing_device) param.data = param.col_attr.data.payload def post_fwd_exec(self, module: torch.nn.Module, *args): + tensor_list = [] for param in module.parameters(): assert hasattr(param, 'col_attr') - self.shard_strategy.shard([param.col_attr.data]) - param.data = torch.empty([], dtype=param.col_attr.data.dtype, device=param.col_attr.data.payload.device) + tensor_list.append(param.col_attr.data) + self.shard_strategy.shard(tensor_list) + for param in module.parameters(): + param.col_attr.remove_torch_payload() def pre_bwd_exec(self, module: torch.nn.Module, input, output): + tensor_list = [] for param in module.parameters(): assert hasattr(param, 'col_attr') - self.shard_strategy.gather([param.col_attr.data]) + tensor_list.append(param.col_attr.data) + self.shard_strategy.gather(tensor_list) + for param in module.parameters(): if param.col_attr.data.device != self.computing_device: param.col_attr.data.to(self.computing_device) param.data = param.col_attr.data.payload @@ -52,10 +62,13 @@ class ZeroHook(BaseOpHook): param.col_attr.bwd_count += 1 def post_bwd_exec(self, module: torch.nn.Module, input): + tensor_list = [] for param in module.parameters(): assert hasattr(param, 'col_attr') - self.shard_strategy.shard([param.col_attr.data]) - param.data = torch.empty([], dtype=param.col_attr.data.dtype, device=param.col_attr.data.payload.device) + tensor_list.append(param.col_attr.data) + self.shard_strategy.shard(tensor_list) + for param in module.parameters(): + param.col_attr.remove_torch_payload() def pre_iter(self): pass diff --git a/colossalai/zero/shard_utils/__init__.py b/colossalai/zero/shard_utils/__init__.py index 417e201e8..5e5d63a7e 100644 --- a/colossalai/zero/shard_utils/__init__.py +++ b/colossalai/zero/shard_utils/__init__.py @@ -1,4 +1,5 @@ -from colossalai.zero.shard_utils.base_shard_strategy import BaseShardStrategy -from colossalai.zero.shard_utils.tensor_shard_strategy import TensorShardStrategy +from .base_shard_strategy import BaseShardStrategy +from .bucket_tensor_shard_strategy import BucketTensorShardStrategy +from .tensor_shard_strategy import TensorShardStrategy -__all__ = ['BaseShardStrategy', 'TensorShardStrategy'] +__all__ = ['BaseShardStrategy', 'TensorShardStrategy', 'BucketTensorShardStrategy'] diff --git a/colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py b/colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py new file mode 100644 index 000000000..a2b9b0097 --- /dev/null +++ b/colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py @@ -0,0 +1,38 @@ +from typing import List + +import torch +import torch.distributed as dist +from colossalai.utils import get_current_device +from colossalai.zero.sharded_param.sharded_tensor import ShardedTensor +from torch._utils import _flatten_dense_tensors as flatten + +from .tensor_shard_strategy import TensorShardStrategy + + +class BucketTensorShardStrategy(TensorShardStrategy): + + def gather(self, tensor_list: List[ShardedTensor]): + tensor_list: List[ShardedTensor] = [t for t in tensor_list if t.is_sharded] + if len(tensor_list) == 0: + return + target_device = tensor_list[0].device + dtype = tensor_list[0].dtype + buffer_list: List[torch.Tensor] = [] + tensor_numels = [t.payload.numel() for t in tensor_list] + buffer_size = sum(tensor_numels) + for i in range(self.world_size): + if i == self.local_rank: + buffer_list.append(flatten([t.payload for t in tensor_list]).cuda(get_current_device())) + else: + buffer_list.append(torch.zeros(buffer_size, dtype=dtype, device=get_current_device())) + dist.all_gather(buffer_list, buffer_list[self.local_rank], group=self.process_group) + # Move to target device before splitting buffer + # Ensure we utilize maximum PCIE bandwidth + buffer_list = [buffer.to(target_device) for buffer in buffer_list] + offset = 0 + for i, t in enumerate(tensor_list): + gathered_payload = [buffer[offset:offset + tensor_numels[i]] for buffer in buffer_list] + gathered_payload = torch.cat(gathered_payload)[:t.origin_numel].view(t.origin_shape) + t.reset_payload(gathered_payload) + t.is_sharded = False + offset += tensor_numels[i] From 54fd37f0e0256790ac170f7f30dc37b036a05dfa Mon Sep 17 00:00:00 2001 From: ver217 Date: Mon, 14 Mar 2022 15:06:02 +0800 Subject: [PATCH 04/13] polish unit test --- .../test_init_context.py | 24 ++++++++------ .../test_shard_model_v2.py | 32 +++++++++---------- .../test_shard_param.py | 15 +++++---- .../test_sharded_optim_v2.py | 17 ++++++---- .../test_sharded_optim_v2_with_cpu_adam.py | 13 ++++---- .../test_state_dict.py | 16 +++++----- 6 files changed, 64 insertions(+), 53 deletions(-) diff --git a/tests/test_zero_data_parallel/test_init_context.py b/tests/test_zero_data_parallel/test_init_context.py index 335fa9933..a74e6959d 100644 --- a/tests/test_zero_data_parallel/test_init_context.py +++ b/tests/test_zero_data_parallel/test_init_context.py @@ -4,21 +4,20 @@ from functools import partial import colossalai -from colossalai.utils.cuda import get_current_device import pytest import torch import torch.multiprocessing as mp from colossalai.utils import free_port +from colossalai.utils.cuda import get_current_device +from colossalai.utils.memory_tracer.allocator import GLOBAL_MODEL_DATA_TRACER from colossalai.zero.init_ctx import ZeroInitContext -from colossalai.zero.shard_utils.tensor_shard_strategy import \ - TensorShardStrategy +from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from tests.components_to_test.registry import non_distributed_component_funcs from common import CONFIG -from colossalai.utils.memory_tracer.allocator import GLOBAL_MODEL_DATA_TRACER -def run_dist(rank, world_size, port, init_device): +def run_dist(rank, world_size, port, init_device, shard_strategy): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') for get_components_func in non_distributed_component_funcs: @@ -26,7 +25,7 @@ def run_dist(rank, world_size, port, init_device): model_numel_tensor = torch.zeros(1, dtype=torch.int) with ZeroInitContext(convert_fp16=True, target_device=init_device, - shard_strategy=TensorShardStrategy(), + shard_strategy=shard_strategy(), shard_param=True, model_numel_tensor=model_numel_tensor): model = model_builder(checkpoint=True) @@ -50,11 +49,16 @@ def run_dist(rank, world_size, port, init_device): @pytest.mark.dist @pytest.mark.parametrize("world_size", [1, 4]) @pytest.mark.parametrize("init_device", [torch.device('cpu'), torch.device(f'cuda:{get_current_device()}')]) -def test_zero_init_context(world_size, init_device): - run_func = partial(run_dist, world_size=world_size, port=free_port(), init_device=init_device) +@pytest.mark.parametrize("shard_strategy", [TensorShardStrategy, BucketTensorShardStrategy]) +def test_zero_init_context(world_size, init_device, shard_strategy): + run_func = partial(run_dist, + world_size=world_size, + port=free_port(), + init_device=init_device, + shard_strategy=shard_strategy) mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_zero_init_context(2, torch.device('cpu')) - test_zero_init_context(2, torch.device(f'cuda:{get_current_device()}')) + test_zero_init_context(2, torch.device('cpu'), TensorShardStrategy) + test_zero_init_context(2, torch.device(f'cuda:{get_current_device()}'), TensorShardStrategy) diff --git a/tests/test_zero_data_parallel/test_shard_model_v2.py b/tests/test_zero_data_parallel/test_shard_model_v2.py index 23a75cfcd..54ca5ad3c 100644 --- a/tests/test_zero_data_parallel/test_shard_model_v2.py +++ b/tests/test_zero_data_parallel/test_shard_model_v2.py @@ -3,30 +3,28 @@ import copy from functools import partial -import pytest - -import torch -import torch.multiprocessing as mp -from torch.nn.parallel import DistributedDataParallel as DDP import colossalai -from colossalai.zero.init_ctx import ZeroInitContext +import pytest +import torch +import torch.multiprocessing as mp from colossalai.utils import free_port -from colossalai.zero.shard_utils.tensor_shard_strategy import \ - TensorShardStrategy +from colossalai.zero.init_ctx import ZeroInitContext +from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from colossalai.zero.sharded_model import ShardedModelV2 from colossalai.zero.sharded_model._zero3_utils import cast_tensor_to_fp16 - -from tests.components_to_test.registry import non_distributed_component_funcs -from common import CONFIG, check_grads_padding, run_fwd_bwd from colossalai.zero.sharded_model.utils import col_model_deepcopy +from tests.components_to_test.registry import non_distributed_component_funcs +from torch.nn.parallel import DistributedDataParallel as DDP + +from common import CONFIG, check_grads_padding, run_fwd_bwd -def run_dist(rank, world_size, port, use_zero_init_ctx, enable_autocast): +def run_dist(rank, world_size, port, use_zero_init_ctx, enable_autocast, shard_strategy): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') test_models = ['repeated_computed_layers', 'resnet18', 'bert'] - shard_strategy = TensorShardStrategy() + shard_strategy = shard_strategy() for model_name in test_models: get_components_func = non_distributed_component_funcs.get_callable(model_name) model_builder, train_dataloader, _, _, criterion = get_components_func() @@ -66,14 +64,16 @@ def run_dist(rank, world_size, port, use_zero_init_ctx, enable_autocast): @pytest.mark.parametrize("world_size", [1, 2]) @pytest.mark.parametrize("enable_autocast", [True]) @pytest.mark.parametrize("use_zero_init_ctx", [True]) -def test_shard_model_v2(world_size, use_zero_init_ctx, enable_autocast): +@pytest.mark.parametrize("shard_strategy", [TensorShardStrategy, BucketTensorShardStrategy]) +def test_shard_model_v2(world_size, use_zero_init_ctx, enable_autocast, shard_strategy): run_func = partial(run_dist, world_size=world_size, port=free_port(), use_zero_init_ctx=use_zero_init_ctx, - enable_autocast=enable_autocast) + enable_autocast=enable_autocast, + shard_strategy=shard_strategy) mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_shard_model_v2(world_size=2, use_zero_init_ctx=True, enable_autocast=True) + test_shard_model_v2(world_size=2, use_zero_init_ctx=True, enable_autocast=True, shard_strategy=TensorShardStrategy) diff --git a/tests/test_zero_data_parallel/test_shard_param.py b/tests/test_zero_data_parallel/test_shard_param.py index 5c70e5274..bc0564846 100644 --- a/tests/test_zero_data_parallel/test_shard_param.py +++ b/tests/test_zero_data_parallel/test_shard_param.py @@ -10,20 +10,20 @@ import torch import torch.multiprocessing as mp from colossalai.logging import disable_existing_loggers, get_dist_logger from colossalai.utils import free_port -from colossalai.zero.shard_utils import TensorShardStrategy +from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from colossalai.zero.sharded_param import ShardedParam, ShardedTensor from colossalai.zero.sharded_param.sharded_param import ShardedParamV2 -from tests.test_zero_data_parallel.common import CONFIG, allclose from tests.components_to_test.registry import non_distributed_component_funcs +from tests.test_zero_data_parallel.common import CONFIG, allclose -def _run_shard_tensor(rank, world_size, port): +def _run_shard_tensor(rank, world_size, port, shard_strategy): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') t = ShardedTensor(tensor=torch.randn(world_size * 2, 3)) assert list(t.origin_shape) == [world_size * 2, 3] assert list(t.shape) == [world_size * 2, 3] - shard_strategy = TensorShardStrategy(process_group=None) + shard_strategy = shard_strategy(process_group=None) # test shard strategy shard_strategy.shard([t]) @@ -34,8 +34,9 @@ def _run_shard_tensor(rank, world_size, port): @pytest.mark.dist @pytest.mark.parametrize("world_size", [1, 2]) -def test_shard_tensor(world_size): - run_func = partial(_run_shard_tensor, world_size=world_size, port=free_port()) +@pytest.mark.parametrize("shard_strategy", [TensorShardStrategy, BucketTensorShardStrategy]) +def test_shard_tensor(world_size, shard_strategy): + run_func = partial(_run_shard_tensor, world_size=world_size, port=free_port(), shard_strategy=shard_strategy) mp.spawn(run_func, nprocs=world_size) @@ -121,7 +122,7 @@ def test_init_shard_param(world_size): if __name__ == '__main__': - test_shard_tensor(2) + test_shard_tensor(2, TensorShardStrategy) test_shard_param(2) test_shard_param_v2(2) test_init_shard_param(4) diff --git a/tests/test_zero_data_parallel/test_sharded_optim_v2.py b/tests/test_zero_data_parallel/test_sharded_optim_v2.py index aa8735c26..5ecfba71a 100644 --- a/tests/test_zero_data_parallel/test_sharded_optim_v2.py +++ b/tests/test_zero_data_parallel/test_sharded_optim_v2.py @@ -10,7 +10,7 @@ import torch import torch.distributed as dist import torch.multiprocessing as mp from colossalai.utils import free_port -from colossalai.zero.shard_utils import TensorShardStrategy +from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from colossalai.zero.sharded_model import ShardedModelV2 from colossalai.zero.sharded_optim import ShardedOptimizerV2 from tests.components_to_test.registry import non_distributed_component_funcs @@ -38,12 +38,12 @@ def run_step(model, optimizer, data, label, criterion, enable_autocast=False): optimizer.step() -def run_dist(rank, world_size, port, cpu_offload): +def run_dist(rank, world_size, port, cpu_offload, shard_strategy): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') test_models = ['repeated_computed_layers', 'resnet18', 'bert'] + shard_strategy = shard_strategy() for model_name in test_models: get_components_func = non_distributed_component_funcs.get_callable(model_name) - shard_strategy = TensorShardStrategy() model, train_dataloader, test_dataloader, optimizer, criterion = get_components_func() model = model(checkpoint=True).cuda() zero_model = ShardedModelV2(copy.deepcopy(model), @@ -69,10 +69,15 @@ def run_dist(rank, world_size, port, cpu_offload): @pytest.mark.dist @pytest.mark.parametrize("world_size", [1, 2]) @pytest.mark.parametrize("cpu_offload", [True, False]) -def test_sharded_optim_v2(world_size, cpu_offload): - run_func = partial(run_dist, world_size=world_size, port=free_port(), cpu_offload=cpu_offload) +@pytest.mark.parametrize("shard_strategy", [TensorShardStrategy, BucketTensorShardStrategy]) +def test_sharded_optim_v2(world_size, cpu_offload, shard_strategy): + run_func = partial(run_dist, + world_size=world_size, + port=free_port(), + cpu_offload=cpu_offload, + shard_strategy=shard_strategy) mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_sharded_optim_v2(world_size=2, cpu_offload=True) + test_sharded_optim_v2(world_size=2, cpu_offload=True, shard_strategy=TensorShardStrategy) diff --git a/tests/test_zero_data_parallel/test_sharded_optim_v2_with_cpu_adam.py b/tests/test_zero_data_parallel/test_sharded_optim_v2_with_cpu_adam.py index ad0113578..d5daaafcc 100644 --- a/tests/test_zero_data_parallel/test_sharded_optim_v2_with_cpu_adam.py +++ b/tests/test_zero_data_parallel/test_sharded_optim_v2_with_cpu_adam.py @@ -11,7 +11,7 @@ import torch.distributed as dist import torch.multiprocessing as mp from colossalai.nn.optimizer import CPUAdam from colossalai.utils import free_port -from colossalai.zero.shard_utils import TensorShardStrategy +from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from colossalai.zero.sharded_model import ShardedModelV2 from colossalai.zero.sharded_optim import ShardedOptimizerV2 from tests.components_to_test.registry import non_distributed_component_funcs @@ -47,12 +47,12 @@ def run_step_no_criterion(model, optimizer, data, label, enable_autocast=False): optimizer.step() -def run_dist(rank, world_size, port): +def run_dist(rank, world_size, port, shard_strategy): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') test_models = ['repeated_computed_layers', 'resnet18', 'bert'] + shard_strategy = shard_strategy() for model_name in test_models: get_components_func = non_distributed_component_funcs.get_callable(model_name) - shard_strategy = TensorShardStrategy() model, train_dataloader, test_dataloader, optimizer, criterion = get_components_func() model = model(checkpoint=True).cuda() zero_model = ShardedModelV2(copy.deepcopy(model), shard_strategy, offload_config={'device': 'cpu'}) @@ -79,10 +79,11 @@ def run_dist(rank, world_size, port): @pytest.mark.dist @pytest.mark.parametrize("world_size", [1, 2]) -def test_sharded_optim_v2(world_size): - run_func = partial(run_dist, world_size=world_size, port=free_port()) +@pytest.mark.parametrize("shard_strategy", [TensorShardStrategy, BucketTensorShardStrategy]) +def test_sharded_optim_v2(world_size, shard_strategy): + run_func = partial(run_dist, world_size=world_size, port=free_port(), shard_strategy=shard_strategy) mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_sharded_optim_v2(world_size=2) + test_sharded_optim_v2(world_size=2, shard_strategy=TensorShardStrategy) diff --git a/tests/test_zero_data_parallel/test_state_dict.py b/tests/test_zero_data_parallel/test_state_dict.py index a71f59c27..9a3e08267 100644 --- a/tests/test_zero_data_parallel/test_state_dict.py +++ b/tests/test_zero_data_parallel/test_state_dict.py @@ -9,22 +9,21 @@ import pytest import torch import torch.multiprocessing as mp from colossalai.utils import free_port -from colossalai.zero.shard_utils.tensor_shard_strategy import \ - TensorShardStrategy +from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from colossalai.zero.sharded_model import ShardedModelV2 from tests.components_to_test.registry import non_distributed_component_funcs from common import CONFIG -def run_dist(rank, world_size, port): +def run_dist(rank, world_size, port, shard_strategy): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') test_models = ['repeated_computed_layers', 'resnet18'] + shard_strategy = shard_strategy() for model_name in test_models: get_components_func = non_distributed_component_funcs.get_callable(model_name) model_builder, train_dataloader, test_dataloader, optimizer, criterion = get_components_func() model = model_builder() - shard_strategy = TensorShardStrategy() model = model.half().cuda() zero_model = ShardedModelV2(deepcopy(model), shard_strategy) zero_state_dict = zero_model.state_dict() @@ -33,11 +32,12 @@ def run_dist(rank, world_size, port): @pytest.mark.dist -def test_zero_state_dict(): - world_size = 2 - run_func = partial(run_dist, world_size=world_size, port=free_port()) +@pytest.mark.parametrize("world_size", [1, 2]) +@pytest.mark.parametrize("shard_strategy", [TensorShardStrategy, BucketTensorShardStrategy]) +def test_zero_state_dict(world_size, shard_strategy): + run_func = partial(run_dist, world_size=world_size, port=free_port(), shard_strategy=shard_strategy) mp.spawn(run_func, nprocs=world_size) if __name__ == '__main__': - test_zero_state_dict() + test_zero_state_dict(2, TensorShardStrategy) From 63469c0f91941dcf4eec21c58247910e02067290 Mon Sep 17 00:00:00 2001 From: ver217 Date: Mon, 14 Mar 2022 15:48:55 +0800 Subject: [PATCH 05/13] polish code --- colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py b/colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py index a2b9b0097..d5ba72a2e 100644 --- a/colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py +++ b/colossalai/zero/shard_utils/bucket_tensor_shard_strategy.py @@ -23,6 +23,9 @@ class BucketTensorShardStrategy(TensorShardStrategy): for i in range(self.world_size): if i == self.local_rank: buffer_list.append(flatten([t.payload for t in tensor_list]).cuda(get_current_device())) + # Release payload here, to decrease peak memory usage + for t in tensor_list: + t.reset_payload(None) else: buffer_list.append(torch.zeros(buffer_size, dtype=dtype, device=get_current_device())) dist.all_gather(buffer_list, buffer_list[self.local_rank], group=self.process_group) From dfd0363f68ce620c66f9c41767a5fd8ae492bcaa Mon Sep 17 00:00:00 2001 From: HELSON <72907851+1SAA@users.noreply.github.com> Date: Mon, 14 Mar 2022 16:07:45 +0800 Subject: [PATCH 06/13] polished output format for communication profiler and pcie profiler (#404) fixed typing error --- colossalai/utils/profiler/comm_profiler.py | 47 +++++++++------ colossalai/utils/profiler/pcie_profiler.py | 69 ++++++++++++++-------- colossalai/utils/profiler/prof_utils.py | 2 +- 3 files changed, 74 insertions(+), 44 deletions(-) diff --git a/colossalai/utils/profiler/comm_profiler.py b/colossalai/utils/profiler/comm_profiler.py index d672a8e20..93c72cc65 100644 --- a/colossalai/utils/profiler/comm_profiler.py +++ b/colossalai/utils/profiler/comm_profiler.py @@ -6,20 +6,25 @@ from torch.autograd.profiler import profile import torch.distributed as dist from torch.distributed import ReduceOp from colossalai.utils import get_current_device -from .prof_utils import BaseProfiler, _format_time, _format_memory, _format_bandwith +from .prof_utils import BaseProfiler, _format_time, _format_memory, _format_bandwidth from typing import List, Optional def _get_code_location(depth: int): - ret = "" - length = len(inspect.stack()) - for i in range(3, min(length, depth + 1)): + ret = [] + length = min(len(inspect.stack()), depth + 1) + for i in range(3, length): upper_frame = inspect.stack()[i] function_name = inspect.stack()[i - 1].function - info = upper_frame.filename + "(" + str(upper_frame.lineno) + "): " + function_name + "\n" - ret += info + ret.append(upper_frame.filename) + ret.append('(') + ret.append(str(upper_frame.lineno)) + ret.append('): ') + ret.append(function_name) + if i != length - 1: + ret.append('\n') - return ret + return ''.join(ret) torch_all_reduce = dist.all_reduce @@ -100,8 +105,9 @@ class CommProfiler(BaseProfiler): def result_list(self, sep: str = "\n"): res = [] - def append(s: str): - res.append(s) + def append(s: str = None): + if s is not None: + res.append(s) res.append(sep) if self.warn_flag: @@ -110,19 +116,26 @@ class CommProfiler(BaseProfiler): append("Collective communication profiling result:") append("total cuda time: {}".format(_format_time(self.total_cuda_time))) - append("average bandwith: {}".format(_format_bandwith(self.total_comm_vol, self.total_cuda_time))) + append("average bandwidth: {}".format(_format_bandwidth(self.total_comm_vol, self.total_cuda_time))) append("total number of calls: {}".format(self.total_count)) - append("All events:\n----------------------------------------") + append("All events:") + + seperation = '-' * 74 + row_format = '{:^10}' + '{:^12}' * 2 + '{:^16}' + '{:^12}' * 2 + + append(seperation) + append(row_format.format('Location', 'GPU time', 'Percentage', 'Comm volume', 'Bandwidth', 'Num of calls')) + append(seperation) show_list = sorted(self.ops_record.items(), key=lambda kv: -kv[1].self_cuda_time) for location, event in show_list: append(location) - append("self cuda time: {}".format(_format_time(event.self_cuda_time))) - append("{:.1f}% of total communication time".format(event.self_cuda_time / self.total_cuda_time * 100.0)) - append("self communication volme: {}".format(_format_memory(event.self_comm_vol))) - append("average bandwith: {}".format(_format_bandwith(event.self_comm_vol, event.self_cuda_time))) - append("number of calls: {}".format(event.self_count)) - append("----------------------------------------") + append( + row_format.format('', _format_time(event.self_cuda_time), + '{:.1f}%'.format(event.self_cuda_time / self.total_cuda_time * 100.0), + _format_memory(event.self_comm_vol), + _format_bandwidth(event.self_comm_vol, event.self_cuda_time), event.self_count)) + append() return ''.join(res) diff --git a/colossalai/utils/profiler/pcie_profiler.py b/colossalai/utils/profiler/pcie_profiler.py index a01a37489..3a9ec95b4 100644 --- a/colossalai/utils/profiler/pcie_profiler.py +++ b/colossalai/utils/profiler/pcie_profiler.py @@ -1,6 +1,6 @@ from pathlib import Path from torch.autograd.profiler import profile -from .prof_utils import BaseProfiler, _format_time, _format_memory, _format_bandwith +from .prof_utils import BaseProfiler, _format_time, _format_memory, _format_bandwidth from typing import List @@ -24,6 +24,7 @@ def _reduce_location(locations: List[str]) -> str: for lo in locations: ret.append(lo) ret.append("\n") + ret = ret[:-1] return ''.join(ret) @@ -48,18 +49,23 @@ class PcieProfiler(BaseProfiler): TODO: Merge pcie profiler into communication profiler """ - def __init__(self, - dtype: str = "fp32", - depth: int = 1, - total_count: int = 0, - total_pcie_vol: int = 0, - total_cuda_time: int = 0): + def __init__(self, dtype: str = "fp32", depth: int = 1): super().__init__(profiler_name="Pcie", priority=10) self.depth = depth self.data_size = _get_size(dtype) - self.total_count = total_count - self.total_pcie_vol = total_pcie_vol - self.total_cuda_time = total_cuda_time + self.h2d_count = 0 + self.h2d_time = 0 + self.d2h_count = 0 + self.d2h_time = 0 + + self.ops_record = dict() + self.profiler = None + + def reset(self): + self.h2d_count = 0 + self.h2d_time = 0 + self.d2h_count = 0 + self.d2h_time = 0 self.ops_record = dict() self.profiler = None @@ -81,17 +87,20 @@ class PcieProfiler(BaseProfiler): for event in events: if event.name == "aten::copy_": t_shape = event.input_shapes[0] - if len(t_shape) == 0 or event.cuda_time_total == 0: + if len(t_shape) == 0 or event.cuda_time_total == 0 or len(event.stack) == 0: continue current_comm_event = PcieEvent(1, self.data_size * _get_numel(t_shape), event.cuda_time_total) - self.total_count += current_comm_event.count - self.total_pcie_vol += current_comm_event.pcie_vol - self.total_cuda_time += current_comm_event.cuda_time code_location = _reduce_location(event.stack[:self.depth]) if code_location in self.ops_record: self.ops_record[code_location].add(current_comm_event) else: self.ops_record[code_location] = current_comm_event + elif 'Memcpy HtoD' in event.name: + self.h2d_count += 1 + self.h2d_time += event.cuda_time_total + elif 'Memcpy DtoH' in event.name: + self.d2h_count += 1 + self.d2h_time += event.cuda_time_total self.profiler = None @@ -108,24 +117,32 @@ class PcieProfiler(BaseProfiler): def result_list(self, sep: str = "\n"): res = [] - def append(s: str): - res.append(s) + def append(s: str = None): + if s is not None: + res.append(s) res.append(sep) append("Pcie profiling result:") - append("total cuda time: {}".format(_format_time(self.total_cuda_time))) - append("average bandwith: {}".format(_format_bandwith(self.total_pcie_vol, self.total_cuda_time))) - append("total number of calls: {}".format(self.total_count)) - append("All events:\n----------------------------------------") + append("time of data transmission (CPU -> GPU): {}".format(_format_time(self.h2d_time))) + append("number of transmission (CPU -> GPU): {}".format(self.h2d_count)) + append("time of data transmission (GPU -> CPU): {}".format(_format_time(self.d2h_time))) + append("number of transmission (GPU -> CPU): {}".format(self.d2h_count)) + + append("Possible data transmission events in PCIE:") + + seperation = '-' * 62 + row_format = '{:^10}' + '{:^12}' + '{:^16}' + '{:^12}' * 2 + + append(seperation) + append(row_format.format('Location', 'GPU time', 'Trans volume', 'Bandwidth', 'Num of calls')) + append(seperation) show_list = sorted(self.ops_record.items(), key=lambda kv: -kv[1].cuda_time) for location, event in show_list: append(location) - append("cuda time: {}".format(_format_time(event.cuda_time))) - append("{:.1f}% of total pcie time".format(event.cuda_time / self.total_cuda_time * 100.0)) - append("pcie volme: {}".format(_format_memory(event.pcie_vol))) - append("average bandwith: {}".format(_format_bandwith(event.pcie_vol, event.cuda_time))) - append("number of calls: {}".format(event.count)) - append("----------------------------------------") + append( + row_format.format('', _format_time(event.cuda_time), _format_memory(event.pcie_vol), + _format_bandwidth(event.pcie_vol, event.cuda_time), event.count)) + append() return ''.join(res) diff --git a/colossalai/utils/profiler/prof_utils.py b/colossalai/utils/profiler/prof_utils.py index d71906868..641a514cf 100644 --- a/colossalai/utils/profiler/prof_utils.py +++ b/colossalai/utils/profiler/prof_utils.py @@ -32,7 +32,7 @@ def _format_memory(nbytes): return str(nbytes) + ' B' -def _format_bandwith(volme: float or int, time_us: int): +def _format_bandwidth(volme: float or int, time_us: int): sec_div_mb = (1000.0 / 1024.0)**2 mb_per_sec = volme / time_us * sec_div_mb From cf92a779dcfcffe7f50964a1c8d2a25105f7ba3c Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Mon, 14 Mar 2022 16:23:02 +0800 Subject: [PATCH 07/13] added huggingface badge (#407) --- README-zh-Hans.md | 1 + README.md | 1 + 2 files changed, 2 insertions(+) diff --git a/README-zh-Hans.md b/README-zh-Hans.md index 4e4b17302..4077cbb5c 100644 --- a/README-zh-Hans.md +++ b/README-zh-Hans.md @@ -16,6 +16,7 @@ [![codebeat badge](https://codebeat.co/badges/bfe8f98b-5d61-4256-8ad2-ccd34d9cc156)](https://codebeat.co/projects/github-com-hpcaitech-colossalai-main) [![slack badge](https://img.shields.io/badge/Slack-join-blueviolet?logo=slack&)](https://join.slack.com/t/colossalaiworkspace/shared_invite/zt-z7b26eeb-CBp7jouvu~r0~lcFzX832w) [![WeChat badge](https://img.shields.io/badge/微信-加入-green?logo=wechat&)](https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/WeChat.png) + [![HuggingFace badge](https://img.shields.io/badge/%F0%9F%A4%97-HuggingFace-yellow)](https://huggingface.co/hpcai) | [English](README.md) | [中文](README-zh-Hans.md) | diff --git a/README.md b/README.md index 5ed090936..c708b745d 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ [![codebeat badge](https://codebeat.co/badges/bfe8f98b-5d61-4256-8ad2-ccd34d9cc156)](https://codebeat.co/projects/github-com-hpcaitech-colossalai-main) [![slack badge](https://img.shields.io/badge/Slack-join-blueviolet?logo=slack&)](https://join.slack.com/t/colossalaiworkspace/shared_invite/zt-z7b26eeb-CBp7jouvu~r0~lcFzX832w) [![WeChat badge](https://img.shields.io/badge/微信-加入-green?logo=wechat&)](https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/WeChat.png) + [![HuggingFace badge](https://img.shields.io/badge/%F0%9F%A4%97-HuggingFace-yellow)](https://huggingface.co/hpcai) | [English](README.md) | [中文](README-zh-Hans.md) | From 62b08acc720adcd446b94edc24d9e1445f2937c6 Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Mon, 14 Mar 2022 17:07:01 +0800 Subject: [PATCH 08/13] update hf badge link (#410) --- README-zh-Hans.md | 2 +- README.md | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README-zh-Hans.md b/README-zh-Hans.md index 4077cbb5c..f51ebf28c 100644 --- a/README-zh-Hans.md +++ b/README-zh-Hans.md @@ -14,9 +14,9 @@ [![Build](https://github.com/hpcaitech/ColossalAI/actions/workflows/build.yml/badge.svg)](https://github.com/hpcaitech/ColossalAI/actions/workflows/build.yml) [![Documentation](https://readthedocs.org/projects/colossalai/badge/?version=latest)](https://colossalai.readthedocs.io/en/latest/?badge=latest) [![codebeat badge](https://codebeat.co/badges/bfe8f98b-5d61-4256-8ad2-ccd34d9cc156)](https://codebeat.co/projects/github-com-hpcaitech-colossalai-main) + [![HuggingFace badge](https://img.shields.io/badge/%F0%9F%A4%97HuggingFace-Join-yellow)](https://huggingface.co/hpcai-tech) [![slack badge](https://img.shields.io/badge/Slack-join-blueviolet?logo=slack&)](https://join.slack.com/t/colossalaiworkspace/shared_invite/zt-z7b26eeb-CBp7jouvu~r0~lcFzX832w) [![WeChat badge](https://img.shields.io/badge/微信-加入-green?logo=wechat&)](https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/WeChat.png) - [![HuggingFace badge](https://img.shields.io/badge/%F0%9F%A4%97-HuggingFace-yellow)](https://huggingface.co/hpcai) | [English](README.md) | [中文](README-zh-Hans.md) | diff --git a/README.md b/README.md index c708b745d..e98f884d2 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,10 @@ [![Build](https://github.com/hpcaitech/ColossalAI/actions/workflows/build.yml/badge.svg)](https://github.com/hpcaitech/ColossalAI/actions/workflows/build.yml) [![Documentation](https://readthedocs.org/projects/colossalai/badge/?version=latest)](https://colossalai.readthedocs.io/en/latest/?badge=latest) [![codebeat badge](https://codebeat.co/badges/bfe8f98b-5d61-4256-8ad2-ccd34d9cc156)](https://codebeat.co/projects/github-com-hpcaitech-colossalai-main) + [![HuggingFace badge](https://img.shields.io/badge/%F0%9F%A4%97HuggingFace-Join-yellow)](https://huggingface.co/hpcai-tech) [![slack badge](https://img.shields.io/badge/Slack-join-blueviolet?logo=slack&)](https://join.slack.com/t/colossalaiworkspace/shared_invite/zt-z7b26eeb-CBp7jouvu~r0~lcFzX832w) [![WeChat badge](https://img.shields.io/badge/微信-加入-green?logo=wechat&)](https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/WeChat.png) - [![HuggingFace badge](https://img.shields.io/badge/%F0%9F%A4%97-HuggingFace-yellow)](https://huggingface.co/hpcai) + | [English](README.md) | [中文](README-zh-Hans.md) | From 907ac4a2dc71678efe5929da69dded795696c2ee Mon Sep 17 00:00:00 2001 From: 1SAA Date: Mon, 14 Mar 2022 16:43:21 +0800 Subject: [PATCH 09/13] fixed error when no collective communication in CommProfiler --- colossalai/utils/profiler/comm_profiler.py | 11 +++++++---- colossalai/utils/profiler/pcie_profiler.py | 8 ++++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/colossalai/utils/profiler/comm_profiler.py b/colossalai/utils/profiler/comm_profiler.py index 93c72cc65..a4f5729c9 100644 --- a/colossalai/utils/profiler/comm_profiler.py +++ b/colossalai/utils/profiler/comm_profiler.py @@ -93,16 +93,16 @@ class CommProfiler(BaseProfiler): dist.reduce = torch_reduce def to_tensorboard(self, writer): - writer.add_text(tag="Collective Communication", text_string=self.result_list("\n\n")) + writer.add_text(tag="Collective Communication", text_string=self.result_str("\n\n")) def to_file(self, filename: Path): with open(filename, "w") as f: - f.write(self.result_list()) + f.write(self.result_str()) def show(self): - print(self.result_list()) + print(self.result_str()) - def result_list(self, sep: str = "\n"): + def result_str(self, sep: str = "\n"): res = [] def append(s: str = None): @@ -114,6 +114,9 @@ class CommProfiler(BaseProfiler): append("Warnning: there exists multiple communication operations in the same time. As a result, " "the profiling result is not accurate.") + if self.total_cuda_time == 0: + return "No collective communication has been called yet!" + append("Collective communication profiling result:") append("total cuda time: {}".format(_format_time(self.total_cuda_time))) append("average bandwidth: {}".format(_format_bandwidth(self.total_comm_vol, self.total_cuda_time))) diff --git a/colossalai/utils/profiler/pcie_profiler.py b/colossalai/utils/profiler/pcie_profiler.py index 3a9ec95b4..526222941 100644 --- a/colossalai/utils/profiler/pcie_profiler.py +++ b/colossalai/utils/profiler/pcie_profiler.py @@ -105,16 +105,16 @@ class PcieProfiler(BaseProfiler): self.profiler = None def to_tensorboard(self, writer): - writer.add_text(tag="Data Transmission", text_string=self.result_list("\n\n")) + writer.add_text(tag="Data Transmission", text_string=self.result_str("\n\n")) def to_file(self, filename: Path): with open(filename, "w") as f: - f.write(self.result_list()) + f.write(self.result_str()) def show(self): - print(self.result_list()) + print(self.result_str()) - def result_list(self, sep: str = "\n"): + def result_str(self, sep: str = "\n"): res = [] def append(s: str = None): From a9c27be42e1cba258c9aabc0366e7c5cdf4a5342 Mon Sep 17 00:00:00 2001 From: LuGY <74758262+Gy-Lu@users.noreply.github.com> Date: Mon, 14 Mar 2022 18:01:46 +0800 Subject: [PATCH 10/13] Added tensor detector (#393) * Added tensor detector * Added the - states * Allowed change include_cpu when detect() --- colossalai/utils/__init__.py | 2 +- colossalai/utils/tensor_detector/__init__.py | 1 + colossalai/utils/tensor_detector/readme.md | 128 ++++++++++++ .../utils/tensor_detector/tensor_detector.py | 190 ++++++++++++++++++ tests/test_utils/test_tensor_detector.py | 41 ++++ 5 files changed, 361 insertions(+), 1 deletion(-) create mode 100644 colossalai/utils/tensor_detector/__init__.py create mode 100644 colossalai/utils/tensor_detector/readme.md create mode 100644 colossalai/utils/tensor_detector/tensor_detector.py create mode 100644 tests/test_utils/test_tensor_detector.py diff --git a/colossalai/utils/__init__.py b/colossalai/utils/__init__.py index b8536a1d5..fbea2c2c0 100644 --- a/colossalai/utils/__init__.py +++ b/colossalai/utils/__init__.py @@ -10,7 +10,7 @@ from .data_sampler import DataParallelSampler, get_dataloader from .gradient_accumulation import accumulate_gradient from .memory import report_memory_usage from .timer import MultiTimer, Timer -#from .tensor_detector import TensorDetector +from .tensor_detector import TensorDetector __all__ = [ 'checkpoint', 'free_port', 'print_rank_0', 'sync_model_param', 'is_dp_rank_0', 'is_tp_rank_0', diff --git a/colossalai/utils/tensor_detector/__init__.py b/colossalai/utils/tensor_detector/__init__.py new file mode 100644 index 000000000..0d35e6467 --- /dev/null +++ b/colossalai/utils/tensor_detector/__init__.py @@ -0,0 +1 @@ +from .tensor_detector import TensorDetector \ No newline at end of file diff --git a/colossalai/utils/tensor_detector/readme.md b/colossalai/utils/tensor_detector/readme.md new file mode 100644 index 000000000..840dc8f4e --- /dev/null +++ b/colossalai/utils/tensor_detector/readme.md @@ -0,0 +1,128 @@ +# Tensor Detector + +This tool supports you to detect tensors on both CPU and GPU. However, there will always be some strange tensors on CPU, including the rng state of PyTorch. + +## Example + +An example is worth than a thousand words. + +The code below defines a simple MLP module, with which we will show you how to use the tool. + +```python +class MLP(nn.Module): + def __init__(self): + super().__init__() + self.mlp = nn.Sequential(nn.Linear(64, 8), + nn.ReLU(), + nn.Linear(8, 32)) + def forward(self, x): + return self.mlp(x) +``` + +And here is how to use the tool. + +```python +from colossalai.utils import TensorDetector + +# create random data +data = torch.rand(64, requires_grad=True).cuda() +data.retain_grad() +# create the module +model = MLP().cuda() +# create the detector +# by passing the model to the detector, it can distinguish module parameters from common tensors +detector = TensorDetector(include_cpu=False, module=model) +detector.detect() + +out = model(data) + +detector.detect() + +loss = out.sum() +loss.backward() + +detector.detect() +``` + +I have made some comments on the right of the output for your understanding. + +Note that the total `Mem` of all the tensors and parameters is not equal to `Total GPU Memery Allocated`. PyTorch's memory management is really complicated, and for models of a large scale, it's impossible to figure out clearly. + +**The order of print is not equal to the order the tensor creates, but they are really close.** + +```bash +------------------------------------------------------------------------------------------------------------ + Tensor device shape grad dtype Mem +------------------------------------------------------------------------------------------------------------ ++ Tensor cuda:0 (64,) True torch.float32 256 B # data ++ mlp.0.weight cuda:0 (8, 64) True torch.float32 2.0 KB ++ mlp.0.bias cuda:0 (8,) True torch.float32 32 B ++ mlp.2.weight cuda:0 (32, 8) True torch.float32 1.0 KB ++ mlp.2.bias cuda:0 (32,) True torch.float32 128 B +------------------------------------------------------------------------------------------------------------ +Detect Location: "test_tensor_detector.py" line 27 +Totle GPU Memery Allocated on cuda:0 is 4.5 KB +------------------------------------------------------------------------------------------------------------ + + +------------------------------------------------------------------------------------------------------------ + Tensor device shape grad dtype Mem +------------------------------------------------------------------------------------------------------------ ++ Tensor cuda:0 (8,) True torch.float32 32 B # activation ++ Tensor cuda:0 (32,) True torch.float32 128 B # output +------------------------------------------------------------------------------------------------------------ +Detect Location: "test_tensor_detector.py" line 30 +Totle GPU Memery Allocated on cuda:0 is 5.5 KB +------------------------------------------------------------------------------------------------------------ + + +------------------------------------------------------------------------------------------------------------ + Tensor device shape grad dtype Mem +------------------------------------------------------------------------------------------------------------ ++ Tensor cuda:0 () True torch.float32 4 B # loss +------------------------------------------------------------------------------------------------------------ +Detect Location: "test_tensor_detector.py" line 32 +Totle GPU Memery Allocated on cuda:0 is 6.0 KB +------------------------------------------------------------------------------------------------------------ + + +------------------------------------------------------------------------------------------------------------ + Tensor device shape grad dtype Mem +------------------------------------------------------------------------------------------------------------ ++ Tensor (with grad) cuda:0 (64,) True torch.float32 512 B # data with grad ++ mlp.0.weight (with grad) cuda:0 (8, 64) True torch.float32 4.0 KB # for use data.retain_grad() ++ mlp.0.bias (with grad) cuda:0 (8,) True torch.float32 64 B ++ mlp.2.weight (with grad) cuda:0 (32, 8) True torch.float32 2.0 KB ++ mlp.2.bias (with grad) cuda:0 (32,) True torch.float32 256 B + +- mlp.0.weight cuda:0 (8, 64) True torch.float32 2.0 KB +- mlp.0.bias cuda:0 (8,) True torch.float32 32 B +- mlp.2.weight cuda:0 (32, 8) True torch.float32 1.0 KB +- mlp.2.bias cuda:0 (32,) True torch.float32 128 B +- Tensor cuda:0 (64,) True torch.float32 256 B +- Tensor cuda:0 (8,) True torch.float32 32 B # deleted activation +------------------------------------------------------------------------------------------------------------ +Detect Location: "test_tensor_detector.py" line 34 +Totle GPU Memery Allocated on cuda:0 is 10.0 KB +------------------------------------------------------------------------------------------------------------ + + +------------------------------------------------------------------------------------------------------------ + Tensor device shape grad dtype Mem +------------------------------------------------------------------------------------------------------------ ++ Tensor cuda:0 (64,) False torch.float32 256 B ++ Tensor cuda:0 (8, 64) False torch.float32 2.0 KB ++ Tensor cuda:0 (8,) False torch.float32 32 B ++ Tensor cuda:0 (32, 8) False torch.float32 1.0 KB ++ Tensor cuda:0 (32,) False torch.float32 128 B +------------------------------------------------------------------------------------------------------------ +Detect Location: "test_tensor_detector.py" line 36 +Totle GPU Memery Allocated on cuda:0 is 14.0 KB +------------------------------------------------------------------------------------------------------------ +``` + +## Reference + + This tool was inspired by https://github.com/Stonesjtu/pytorch_memlab/blob/master/pytorch_memlab/mem_reporter.py + and https://github.com/Oldpan/Pytorch-Memory-Utils + diff --git a/colossalai/utils/tensor_detector/tensor_detector.py b/colossalai/utils/tensor_detector/tensor_detector.py new file mode 100644 index 000000000..56b7e1d7f --- /dev/null +++ b/colossalai/utils/tensor_detector/tensor_detector.py @@ -0,0 +1,190 @@ +import gc +import inspect +import torch +import torch.nn as nn +from typing import Optional +from collections import defaultdict + + +LINE_WIDTH = 108 +LINE = '-' * LINE_WIDTH + '\n' + +class TensorDetector(): + def __init__(self, + show_info: bool = True, + log: str = None, + include_cpu: bool = False, + module: Optional[nn.Module] = None + ): + """This class is an detector to detect tensor on different devices. + + :param show_info: whether to print the info on screen, default True + :type show_info: bool + :param log: the file name to save the log + :type log: str + :param include_cpu: whether to detect tensor on cpu, default False + :type include_cpu: bool + :param module: when sending an `nn.Module` it, the detector can name the tensors detected better + :type module: Optional[nn.Module] + + """ + self.show_info = show_info + self.log = log + self.include_cpu = include_cpu + self.tensor_info = defaultdict(list) + self.saved_tensor_info = defaultdict(list) + self.order = [] + self.detected = [] + self.devices = [] + self.info = "" + + self.module = module + if isinstance(module, nn.Module): + # if module is an instance of nn.Module, we can name the parameter with its real name + for name, param in module.named_parameters(): + self.tensor_info[id(param)].append(name) + self.tensor_info[id(param)].append(param.device) + self.tensor_info[id(param)].append(param.shape) + self.tensor_info[id(param)].append(param.requires_grad) + self.tensor_info[id(param)].append(param.dtype) + self.tensor_info[id(param)].append(self.get_tensor_mem(param)) + + + def get_tensor_mem(self, tensor): + # calculate the memory occupied by a tensor + memory_size = tensor.element_size() * tensor.storage().size() + if (tensor.is_leaf or tensor.retains_grad) and tensor.grad is not None: + grad_memory_size = tensor.grad.element_size() * tensor.grad.storage().size() + memory_size += grad_memory_size + return self.mem_format(memory_size) + + + def mem_format(self, real_memory_size): + # format the tensor memory into a reasonal magnitude + if real_memory_size >= 2 ** 30: + return str(real_memory_size / (2 ** 30)) + ' GB' + if real_memory_size >= 2 ** 20: + return str(real_memory_size / (2 ** 20)) + ' MB' + if real_memory_size >= 2 ** 10: + return str(real_memory_size / (2 ** 10)) + ' KB' + return str(real_memory_size) + ' B' + + + def collect_tensors_state(self): + for obj in gc.get_objects(): + if torch.is_tensor(obj): + # skip cpu tensor when include_cpu is false and the tensor we have collected before + if (not self.include_cpu) and obj.device == torch.device('cpu'): + continue + self.detected.append(id(obj)) + # skip paramters we had added in __init__ when module is an instance of nn.Module for the first epoch + if id(obj) not in self.tensor_info: + + name = type(obj).__name__ + # after backward, we want to update the records, to show you the change + if isinstance(self.module, nn.Module) and name == 'Parameter': + if obj.grad is not None: + # with grad attached + for par_name, param in self.module.named_parameters(): + if param.requires_grad and param.grad.equal(obj.grad): + name = par_name + ' (with grad)' + else: + # with no grad attached + # there will be no new paramters created during running + # so it must be in saved_tensor_info + continue + # we can also marked common tensors as tensor(with grad) + if name == 'Tensor' and (obj.is_leaf or obj.retains_grad): + if obj.grad is not None: + name = name + ' (with grad)' + # in fact, common tensor have no grad + # unless you set retain_grad() + if id(obj) in self.saved_tensor_info.keys() and name == self.saved_tensor_info[id(obj)][0]: + continue + + self.tensor_info[id(obj)].append(name) + self.tensor_info[id(obj)].append(obj.device) + self.tensor_info[id(obj)].append(obj.shape) + self.tensor_info[id(obj)].append(obj.requires_grad) + self.tensor_info[id(obj)].append(obj.dtype) + self.tensor_info[id(obj)].append(self.get_tensor_mem(obj)) + # recorded the order we got the tensor + # by this we can guess the tensor easily + # it will record every tensor updated this turn + self.order.append(id(obj)) + # recorded all different devices + if obj.device not in self.devices: + self.devices.append(obj.device) + + + def print_tensors_state(self): + template_format = '{:3s}{:<30s}{:>10s}{:>20s}{:>10s}{:>20s}{:>15s}' + self.info += LINE + self.info += template_format.format(' ', 'Tensor', 'device', 'shape', 'grad', 'dtype', 'Mem') + self.info += '\n' + self.info += LINE + + # if a tensor updates this turn, and was recorded before + # it should be updated in the saved_tensor_info as well + outdated = [x for x in self.saved_tensor_info.keys() if x in self.order] + minus = [x for x in self.saved_tensor_info.keys() if x not in self.detected] + minus = outdated + minus + if len(self.order) > 0: + for tensor_id in self.order: + self.info += template_format.format('+', + str(self.tensor_info[tensor_id][0]), + str(self.tensor_info[tensor_id][1]), + str(tuple(self.tensor_info[tensor_id][2])), + str(self.tensor_info[tensor_id][3]), + str(self.tensor_info[tensor_id][4]), + str(self.tensor_info[tensor_id][5])) + self.info += '\n' + if len(self.order) > 0 and len(minus) > 0: + self.info += '\n' + if len(minus) > 0: + for tensor_id in minus: + self.info += template_format.format('-', + str(self.saved_tensor_info[tensor_id][0]), + str(self.saved_tensor_info[tensor_id][1]), + str(tuple(self.saved_tensor_info[tensor_id][2])), + str(self.saved_tensor_info[tensor_id][3]), + str(self.saved_tensor_info[tensor_id][4]), + str(self.saved_tensor_info[tensor_id][5])) + self.info += '\n' + # deleted the updated tensor + self.saved_tensor_info.pop(tensor_id) + + + # trace where is the detect() + locate_info = inspect.stack()[2] + locate_msg = '"' + locate_info.filename + '" line ' + str(locate_info.lineno) + + self.info += LINE + self.info += f"Detect Location: {locate_msg}\n" + for device in self.devices: + if device == torch.device('cpu'): + continue + gpu_mem_alloc = self.mem_format(torch.cuda.memory_allocated(device)) + self.info += f"Totle GPU Memery Allocated on {device} is {gpu_mem_alloc}\n" + self.info += LINE + self.info += '\n\n' + if self.show_info: + print(self.info) + if self.log is not None: + with open(self.log + '.log', 'a') as f: + f.write(self.info) + + + def detect(self, include_cpu = False): + self.include_cpu = include_cpu + self.collect_tensors_state() + self.print_tensors_state() + self.saved_tensor_info.update(self.tensor_info) + self.tensor_info.clear() + self.order = [] + self.detected = [] + self.info = "" + + def close(self): + self.saved_tensor_info.clear() + self.module = None \ No newline at end of file diff --git a/tests/test_utils/test_tensor_detector.py b/tests/test_utils/test_tensor_detector.py new file mode 100644 index 000000000..06b1c1846 --- /dev/null +++ b/tests/test_utils/test_tensor_detector.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +import torch +import torch.nn as nn + +from colossalai.utils import TensorDetector + +class MLP(nn.Module): + def __init__(self): + super().__init__() + self.mlp = nn.Sequential(nn.Linear(64, 8), + nn.ReLU(), + nn.Linear(8, 32)) + + def forward(self, x): + return self.mlp(x) + +def test_tensor_detect(): + + data = torch.rand(64, requires_grad=True).cuda() + data.retain_grad() + model = MLP().cuda() + + detector = TensorDetector(log='test', include_cpu=False, module=model) + + detector.detect() + out = model(data) + + detector.detect() + loss = out.sum() + detector.detect() + loss.backward() + detector.detect() + model = MLP().cuda() + detector.detect() + detector.close() + torch.cuda.empty_cache() + +if __name__ == '__main__': + test_tensor_detect() \ No newline at end of file From 370f567e7d58713de8a8ae0248d5320753c85e80 Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Mon, 14 Mar 2022 20:48:41 +0800 Subject: [PATCH 11/13] [zero] new interface for ShardedOptimv2 (#406) --- .../zero/sharded_optim/sharded_optim_v2.py | 37 ++++++++++++++++--- tests/components_to_test/bert.py | 5 +-- tests/components_to_test/nested_model.py | 5 +-- .../repeated_computed_layer.py | 5 +-- tests/components_to_test/resnet.py | 5 +-- tests/test_engine/test_engine.py | 6 +-- .../test_trainer_with_non_pipe_schedule.py | 4 +- .../test_sharded_optim_v2.py | 12 +++--- .../test_sharded_optim_v2_with_cpu_adam.py | 7 ++-- 9 files changed, 51 insertions(+), 35 deletions(-) diff --git a/colossalai/zero/sharded_optim/sharded_optim_v2.py b/colossalai/zero/sharded_optim/sharded_optim_v2.py index b9be80fed..47c0d26b7 100644 --- a/colossalai/zero/sharded_optim/sharded_optim_v2.py +++ b/colossalai/zero/sharded_optim/sharded_optim_v2.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Dict, Optional +from typing import Callable, Dict, Optional, Union import torch import torch.distributed as dist @@ -15,7 +15,7 @@ from torch import Tensor from torch.distributed import ProcessGroup from torch.nn.parameter import Parameter from torch.optim import Optimizer - +from typing import Type, Any from ._utils import has_inf_or_nan @@ -27,8 +27,8 @@ class OptimState(Enum): class ShardedOptimizerV2(ColossalaiOptimizer): def __init__(self, - optimizer: Optimizer, sharded_model: ShardedModelV2, + optimizer_class: Type[Optimizer], shard_strategy: BaseShardStrategy, cpu_offload: bool = False, initial_scale: float = 2**32, @@ -39,9 +39,34 @@ class ShardedOptimizerV2(ColossalaiOptimizer): hysteresis: float = 2, max_scale: int = 2**32, dp_process_group: Optional[ProcessGroup] = None, - mp_process_group: Optional[ProcessGroup] = None) -> None: + mp_process_group: Optional[ProcessGroup] = None, + **defaults: Any) -> None: + """ + :param sharded_model: A sharded model initialized by class ShardedModelV2 + :type sharded_model: sharded_model + + :param optimizer_class: A type of Optimizer + :type optimizer_class: Type[Optimizer] + + :param shard_strategy: The strategy to shard the sharded_model and optimizer model parameters. + :type shard_strategy: BaseShardStrategy + + :param cpu_offload: is offloading the optimizer states to CPU. + :type cpu_offload: bool + + :param shard_strategy: The strategy to shard the sharded_model and optimizer model parameters. + :type shard_strategy: BaseShardStrategy + :**defaults: any trailing arguments, which are forwarded to the local optimizer. + :type defaults: dict() + """ assert isinstance(sharded_model, ShardedModelV2), 'model must be wrapped with ShardedModel' - super().__init__(optimizer) + + self._optim_defaults = defaults + # initialize the M, V as zeros tensors and initialize param fp32 from sharded_model.parameters() + + self.optimizer = optimizer_class(sharded_model.parameters(), **self._optim_defaults) + + super().__init__(self.optimizer) self.shard_strategy = shard_strategy self.model: ShardedModelV2 = sharded_model if cpu_offload and not sharded_model.cpu_offload: @@ -65,7 +90,7 @@ class ShardedOptimizerV2(ColossalaiOptimizer): # Store fp32 param shards self.master_params: Dict[Parameter, Tensor] = {} - for group in optimizer.param_groups: + for group in self.optimizer.param_groups: for p in group['params']: assert hasattr(p, 'col_attr'), 'The parameter must be wrapped with ShardedParam' is_param_sharded = p.col_attr.data.is_sharded diff --git a/tests/components_to_test/bert.py b/tests/components_to_test/bert.py index cf543d88d..224ae5147 100644 --- a/tests/components_to_test/bert.py +++ b/tests/components_to_test/bert.py @@ -74,8 +74,5 @@ def get_training_components(): sequence_length=sequence_length, is_distrbuted=True) - def get_optim(model): - return torch.optim.Adam(model.parameters(), lr=0.001) - criterion = None - return bert_model_builder, trainloader, testloader, get_optim, criterion + return bert_model_builder, trainloader, testloader, torch.optim.Adam, criterion diff --git a/tests/components_to_test/nested_model.py b/tests/components_to_test/nested_model.py index edf4a1a89..26bfb8ecc 100644 --- a/tests/components_to_test/nested_model.py +++ b/tests/components_to_test/nested_model.py @@ -49,8 +49,5 @@ def get_training_components(): trainloader = DummyDataLoader() testloader = DummyDataLoader() - def optim_builder(model): - return torch.optim.Adam(model.parameters(), lr=0.001) - criterion = torch.nn.CrossEntropyLoss() - return model_builder, trainloader, testloader, optim_builder, criterion + return model_builder, trainloader, testloader, torch.optim.Adam, criterion diff --git a/tests/components_to_test/repeated_computed_layer.py b/tests/components_to_test/repeated_computed_layer.py index bc035d4b5..f70910191 100644 --- a/tests/components_to_test/repeated_computed_layer.py +++ b/tests/components_to_test/repeated_computed_layer.py @@ -43,8 +43,5 @@ def get_training_components(): trainloader = DummyDataLoader() testloader = DummyDataLoader() - def optim_builder(model): - return torch.optim.Adam(model.parameters(), lr=0.001) - criterion = torch.nn.CrossEntropyLoss() - return model_builder, trainloader, testloader, optim_builder, criterion + return model_builder, trainloader, testloader, torch.optim.Adam, criterion diff --git a/tests/components_to_test/resnet.py b/tests/components_to_test/resnet.py index 20a4be8e2..193832ebc 100644 --- a/tests/components_to_test/resnet.py +++ b/tests/components_to_test/resnet.py @@ -29,8 +29,5 @@ def get_resnet_training_components(): trainloader = get_cifar10_dataloader(train=True) testloader = get_cifar10_dataloader(train=False) - def optim_builder(model): - return torch.optim.Adam(model.parameters(), lr=0.001) - criterion = torch.nn.CrossEntropyLoss() - return model_builder, trainloader, testloader, optim_builder, criterion + return model_builder, trainloader, testloader, torch.optim.Adam, criterion diff --git a/tests/test_engine/test_engine.py b/tests/test_engine/test_engine.py index 904c3c4ea..1bcba61f3 100644 --- a/tests/test_engine/test_engine.py +++ b/tests/test_engine/test_engine.py @@ -19,11 +19,11 @@ def run_train(): # FIXME: test bert for model_name in test_models: get_components_func = non_distributed_component_funcs.get_callable(model_name) - model_builder, train_dataloader, _, optimizer_builder, criterion = get_components_func() + model_builder, train_dataloader, _, optimizer_class, criterion = get_components_func() model = model_builder(checkpoint=False) engine, train_dataloader, *args = colossalai.initialize(model=model, - optimizer=optimizer_builder(model), + optimizer=optimizer_class(model.parameters(), lr=1e-3), criterion=criterion, train_dataloader=train_dataloader) @@ -84,7 +84,7 @@ def run_engine(rank, world_size, port): @pytest.mark.dist def test_engine(): - world_size = 4 + world_size = 2 run_func = partial(run_engine, world_size=world_size, port=free_port()) mp.spawn(run_func, nprocs=world_size) diff --git a/tests/test_trainer/test_trainer_with_non_pipe_schedule.py b/tests/test_trainer/test_trainer_with_non_pipe_schedule.py index 9ae21cf77..d226916b5 100644 --- a/tests/test_trainer/test_trainer_with_non_pipe_schedule.py +++ b/tests/test_trainer/test_trainer_with_non_pipe_schedule.py @@ -25,9 +25,9 @@ def run_trainer_no_pipeline(rank, world_size, port): test_models = ['repeated_computed_layers', 'resnet18', 'nested_model'] for name in test_models: get_components_func = non_distributed_component_funcs.get_callable(name) - model_builder, train_dataloader, test_dataloader, optimizer_builder, criterion = get_components_func() + model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func() model = model_builder() - optimizer = optimizer_builder(model) + optimizer = optimizer_class(model.parameters(), lr=1e-3) engine, train_dataloader, *_ = colossalai.initialize(model=model, optimizer=optimizer, criterion=criterion, diff --git a/tests/test_zero_data_parallel/test_sharded_optim_v2.py b/tests/test_zero_data_parallel/test_sharded_optim_v2.py index 5ecfba71a..9371cf66a 100644 --- a/tests/test_zero_data_parallel/test_sharded_optim_v2.py +++ b/tests/test_zero_data_parallel/test_sharded_optim_v2.py @@ -44,19 +44,21 @@ def run_dist(rank, world_size, port, cpu_offload, shard_strategy): shard_strategy = shard_strategy() for model_name in test_models: get_components_func = non_distributed_component_funcs.get_callable(model_name) - model, train_dataloader, test_dataloader, optimizer, criterion = get_components_func() + model, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func() model = model(checkpoint=True).cuda() zero_model = ShardedModelV2(copy.deepcopy(model), shard_strategy, offload_config=dict(device='cpu') if cpu_offload else None) if dist.get_world_size() > 1: model = DDP(model) - optim = Adam(model.parameters(), lr=1e-3) - sharded_optim = ShardedOptimizerV2(Adam(zero_model.parameters(), lr=1e-3), - zero_model, + lr = 1e-3 + optim = optimizer_class(model.parameters(), lr=lr) + sharded_optim = ShardedOptimizerV2(zero_model, + optimizer_class, shard_strategy, cpu_offload=cpu_offload, - initial_scale=2**5) + initial_scale=2**5, + lr=lr) for i, (data, label) in enumerate(train_dataloader): if i > 2: break diff --git a/tests/test_zero_data_parallel/test_sharded_optim_v2_with_cpu_adam.py b/tests/test_zero_data_parallel/test_sharded_optim_v2_with_cpu_adam.py index d5daaafcc..942b46723 100644 --- a/tests/test_zero_data_parallel/test_sharded_optim_v2_with_cpu_adam.py +++ b/tests/test_zero_data_parallel/test_sharded_optim_v2_with_cpu_adam.py @@ -59,11 +59,12 @@ def run_dist(rank, world_size, port, shard_strategy): if dist.get_world_size() > 1: model = DDP(model) optim = Adam(model.parameters(), lr=1e-3) - sharded_optim = ShardedOptimizerV2(CPUAdam(zero_model.parameters(), lr=1e-3), - zero_model, + sharded_optim = ShardedOptimizerV2(zero_model, + CPUAdam, shard_strategy, initial_scale=2**5, - cpu_offload=True) + cpu_offload=True, + lr=1e-3) for i, (data, label) in enumerate(train_dataloader): if i > 2: break From a37bf1bc4239f78498d2229a0fd852c1ea5c3a57 Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Mon, 14 Mar 2022 21:39:48 +0800 Subject: [PATCH 12/13] [hotfix] rm test_tensor_detector.py (#413) --- tests/test_utils/test_tensor_detector.py | 41 ------------------------ 1 file changed, 41 deletions(-) delete mode 100644 tests/test_utils/test_tensor_detector.py diff --git a/tests/test_utils/test_tensor_detector.py b/tests/test_utils/test_tensor_detector.py deleted file mode 100644 index 06b1c1846..000000000 --- a/tests/test_utils/test_tensor_detector.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -import torch -import torch.nn as nn - -from colossalai.utils import TensorDetector - -class MLP(nn.Module): - def __init__(self): - super().__init__() - self.mlp = nn.Sequential(nn.Linear(64, 8), - nn.ReLU(), - nn.Linear(8, 32)) - - def forward(self, x): - return self.mlp(x) - -def test_tensor_detect(): - - data = torch.rand(64, requires_grad=True).cuda() - data.retain_grad() - model = MLP().cuda() - - detector = TensorDetector(log='test', include_cpu=False, module=model) - - detector.detect() - out = model(data) - - detector.detect() - loss = out.sum() - detector.detect() - loss.backward() - detector.detect() - model = MLP().cuda() - detector.detect() - detector.close() - torch.cuda.empty_cache() - -if __name__ == '__main__': - test_tensor_detect() \ No newline at end of file From 21dc54e019d4636a5024e8e41e2a69567cac37dc Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Mon, 14 Mar 2022 22:05:30 +0800 Subject: [PATCH 13/13] [zero] memtracer to record cuda memory usage of model data and overall system (#395) --- colossalai/engine/ophooks/zero_hook.py | 16 +++- colossalai/utils/memory_tracer/allocator.py | 61 +++----------- .../utils/memory_tracer/async_memtracer.py | 4 +- colossalai/utils/memory_tracer/commons.py | 11 +++ .../utils/memory_tracer/memstats_collector.py | 81 +++++++++++++++++++ .../memory_tracer/model_data_memtracer.py | 34 ++++++++ .../memory_tracer/test_memstats_collector.py | 43 ++++++++++ colossalai/zero/init_ctx/init_context.py | 9 ++- .../zero/sharded_model/sharded_model_v2.py | 29 ++++++- .../zero/sharded_optim/sharded_optim_v2.py | 2 +- .../test_activation_checkpointing.py | 1 + .../test_init_context.py | 13 ++- .../test_shard_model_v2.py | 8 +- .../test_sharded_optim_v2.py | 5 +- 14 files changed, 240 insertions(+), 77 deletions(-) create mode 100644 colossalai/utils/memory_tracer/commons.py create mode 100644 colossalai/utils/memory_tracer/memstats_collector.py create mode 100644 colossalai/utils/memory_tracer/model_data_memtracer.py create mode 100644 colossalai/utils/memory_tracer/test_memstats_collector.py diff --git a/colossalai/engine/ophooks/zero_hook.py b/colossalai/engine/ophooks/zero_hook.py index 938826b55..f078bbfa6 100644 --- a/colossalai/engine/ophooks/zero_hook.py +++ b/colossalai/engine/ophooks/zero_hook.py @@ -4,6 +4,9 @@ from colossalai.utils import get_current_device from colossalai.zero.shard_utils import BaseShardStrategy from ._base_ophook import BaseOpHook +from colossalai.utils.memory_tracer.memstats_collector import MemStatsCollector +from colossalai.utils.memory_tracer.model_data_memtracer import ModelDataTracer +from typing import Optional @OPHOOKS.register_module @@ -12,14 +15,17 @@ class ZeroHook(BaseOpHook): A hook to process sharded param for ZeRO method. """ - def __init__(self, shard_strategy: BaseShardStrategy): + def __init__(self, shard_strategy: BaseShardStrategy, memstarts_collector: Optional[MemStatsCollector]): super().__init__() self.shard_strategy = shard_strategy # NOTE(jiaruifang) Now the computing device of FWD and BWD is always on GPU self.computing_device = torch.device(f'cuda:{get_current_device()}') + self._memstarts_collector = memstarts_collector + def pre_fwd_exec(self, module: torch.nn.Module, *args): tensor_list = [] + global_model_data_tracer = ModelDataTracer() for param in module.parameters(): assert hasattr(param, 'col_attr') tensor_list.append(param.col_attr.data) @@ -27,8 +33,12 @@ class ZeroHook(BaseOpHook): for param in module.parameters(): if param.col_attr.data.device != self.computing_device: param.col_attr.data.to(self.computing_device) + global_model_data_tracer.add_tensor(param.col_attr.data.payload) param.data = param.col_attr.data.payload + if self._memstarts_collector: + self._memstarts_collector.sample_memstats() + def post_fwd_exec(self, module: torch.nn.Module, *args): tensor_list = [] for param in module.parameters(): @@ -40,6 +50,7 @@ class ZeroHook(BaseOpHook): def pre_bwd_exec(self, module: torch.nn.Module, input, output): tensor_list = [] + global_model_data_tracer = ModelDataTracer() for param in module.parameters(): assert hasattr(param, 'col_attr') tensor_list.append(param.col_attr.data) @@ -47,6 +58,7 @@ class ZeroHook(BaseOpHook): for param in module.parameters(): if param.col_attr.data.device != self.computing_device: param.col_attr.data.to(self.computing_device) + global_model_data_tracer.add_tensor(param.col_attr.data.payload) param.data = param.col_attr.data.payload # Store local accumulated grad shard if param.grad is not None: @@ -60,6 +72,8 @@ class ZeroHook(BaseOpHook): # The grad here must be locally computed full grad in this backward pass assert param.grad.shape == param.col_attr.data.origin_shape param.col_attr.bwd_count += 1 + if self._memstarts_collector: + self._memstarts_collector.sample_memstats() def post_bwd_exec(self, module: torch.nn.Module, input): tensor_list = [] diff --git a/colossalai/utils/memory_tracer/allocator.py b/colossalai/utils/memory_tracer/allocator.py index 368aae2da..26c36ef79 100644 --- a/colossalai/utils/memory_tracer/allocator.py +++ b/colossalai/utils/memory_tracer/allocator.py @@ -1,60 +1,19 @@ import torch -from colossalai.utils.commons.singleton_meta import SingletonMeta -from colossalai.zero.sharded_param import ShardedTensor - -from typing import Union +from colossalai.utils.memory_tracer.model_data_memtracer import ModelDataTracer -def col_tensor_mem_usage(t: Union[torch.Tensor, ShardedTensor]) -> int: - if isinstance(t, ShardedTensor): - target = t.payload - else: - target = t - return target.numel() * target.element_size() +def col_move_to_cpu(t: torch.Tensor): + assert isinstance(t, torch.Tensor) + if t.device.type == 'cpu': + return + + ModelDataTracer().delete_tensor(t) + t.data = t.data.cpu() -class ModelDataTracer(metaclass=SingletonMeta): - """ - A singleton to trace model data usage during runtime. - """ - - def __init__(self) -> None: - self._cpu_usage = 0 - self._cuda_usage = 0 - - def trace_tensor(self, t: torch.Tensor): - mem_use = col_tensor_mem_usage(t) - if t.device.type == 'cpu': - self._cpu_usage += mem_use - elif t.device.type == 'cuda': - self._cuda_usage += mem_use - else: - raise RuntimeError - - def detach_tensor(self, t: torch.Tensor): - mem_use = col_tensor_mem_usage(t) - if t.device.type == 'cpu': - self._cpu_usage -= mem_use - elif t.device.type == 'cuda': - self._cuda_usage -= mem_use - else: - raise RuntimeError - - @property - def cpu_usage(self): - return self._cpu_usage - - @property - def cuda_usage(self): - return self._cuda_usage - - -GLOBAL_MODEL_DATA_TRACER = ModelDataTracer() - - -def col_allocate_payload(device: torch.device) -> torch.Tensor: +def col_modeldata_allocate(device: torch.device) -> torch.Tensor: pass -def col_release_payload(t: torch.Tensor): +def col_modeldata_release(t: torch.Tensor): pass diff --git a/colossalai/utils/memory_tracer/async_memtracer.py b/colossalai/utils/memory_tracer/async_memtracer.py index 8f968acfb..fe65651ae 100644 --- a/colossalai/utils/memory_tracer/async_memtracer.py +++ b/colossalai/utils/memory_tracer/async_memtracer.py @@ -6,7 +6,7 @@ from colossalai.utils import get_current_device import torch -def _get_cuda_memory_used(device: torch.device) -> int: +def get_cuda_memory_used(device: torch.device) -> int: """ Get the free memory info of device. :param device: device id @@ -87,7 +87,7 @@ class AsyncMemoryMonitor: while self.keep_measuring: max_usage = max( max_usage, - _get_cuda_memory_used(torch.device(f'cuda:{get_current_device()}')), + get_cuda_memory_used(torch.device(f'cuda:{get_current_device()}')), ) sleep(self.interval) return max_usage diff --git a/colossalai/utils/memory_tracer/commons.py b/colossalai/utils/memory_tracer/commons.py new file mode 100644 index 000000000..28fc2abd3 --- /dev/null +++ b/colossalai/utils/memory_tracer/commons.py @@ -0,0 +1,11 @@ +from colossalai.zero.sharded_param import ShardedTensor +from typing import Union +import torch + + +def col_tensor_mem_usage(t: Union[torch.Tensor, ShardedTensor]) -> int: + if isinstance(t, ShardedTensor): + target = t.payload + else: + target = t + return target.numel() * target.element_size() diff --git a/colossalai/utils/memory_tracer/memstats_collector.py b/colossalai/utils/memory_tracer/memstats_collector.py new file mode 100644 index 000000000..6da89f6ba --- /dev/null +++ b/colossalai/utils/memory_tracer/memstats_collector.py @@ -0,0 +1,81 @@ +from colossalai.utils.memory_tracer.model_data_memtracer import ModelDataTracer +from .async_memtracer import get_cuda_memory_used +from colossalai.utils import get_current_device + +import torch + + +class SamplingCounter: + + def __init__(self) -> None: + self._samplint_cnt = 0 + + def advance(self): + self._samplint_cnt += 1 + + @property + def sampling_cnt(self): + return self._samplint_cnt + + def reset(self): + self._samplint_cnt = 0 + + +class MemStatsCollector: + + def __init__(self) -> None: + """ + Collecting Memory Statistics. + It has two phases. + 1. Collection Phase: collect memory usage statistics + 2. Runtime Phase: do not collect statistics. + """ + self._sampling_cnter = SamplingCounter() + self._model_data_cuda = [] + self._overall_cuda = [] + + # TODO(jiaruifang) Now no cpu mem stats collecting + self._model_data_cpu = [] + self._overall_cpu = [] + + self._start_flag = False + + def start_collection(self): + self._start_flag = True + + def finish_collection(self): + self._start_flag = False + + def sample_memstats(self) -> None: + """ + Sampling memory statistics. + Record the current model data CUDA memory usage as well as system CUDA memory usage. + """ + if self._start_flag: + sampling_cnt = self._sampling_cnter.sampling_cnt + assert sampling_cnt == len(self._overall_cuda) + self._model_data_cuda.append(ModelDataTracer().cuda_usage) + self._overall_cuda.append(get_cuda_memory_used(torch.device(f'cuda:{get_current_device()}'))) + self._sampling_cnter.advance() + + def fetch_memstats(self) -> (int, int): + """ + returns cuda usage of model data and overall cuda usage. + """ + sampling_cnt = self._sampling_cnter.sampling_cnt + if len(self._model_data_cuda) < sampling_cnt: + raise RuntimeError + return (self._model_data_cuda[sampling_cnt], self._overall_cuda[sampling_cnt]) + + def reset_sampling_cnter(self) -> None: + self._sampling_cnter.reset() + + def clear(self) -> None: + self._model_data_cuda = [] + self._overall_cuda = [] + + self._model_data_cpu = [] + self._overall_cpu = [] + + self._start_flag = False + self._sampling_cnter.reset() diff --git a/colossalai/utils/memory_tracer/model_data_memtracer.py b/colossalai/utils/memory_tracer/model_data_memtracer.py new file mode 100644 index 000000000..4a3062bb3 --- /dev/null +++ b/colossalai/utils/memory_tracer/model_data_memtracer.py @@ -0,0 +1,34 @@ +from colossalai.utils.commons.singleton_meta import SingletonMeta +from colossalai.utils.memory_tracer.commons import col_tensor_mem_usage +import torch + + +class ModelDataTracer(metaclass=SingletonMeta): + """ + A singleton to trace model data usage during runtime. + We have to trigger our API (trace_tensor, detach_tensor) when do model-data memory operation, + including allocation, releasing and moving. + + NOTE() now the class only trace cuda memory usage + """ + + def __init__(self) -> None: + self._cuda_usage = 0 + + def add_tensor(self, t: torch.Tensor): + assert isinstance(t, torch.Tensor), f"ModelDataTracer add_tensor() should accept a torch.Tensor" + mem_use = col_tensor_mem_usage(t) + self._cuda_usage += mem_use + + def delete_tensor(self, t: torch.Tensor): + assert isinstance(t, torch.Tensor), f"ModelDataTracer delete_tensor() should accept a torch.Tensor" + mem_use = col_tensor_mem_usage(t) + self._cuda_usage -= mem_use + + @property + def cpu_usage(self): + return self._cpu_usage + + @property + def cuda_usage(self): + return self._cuda_usage diff --git a/colossalai/utils/memory_tracer/test_memstats_collector.py b/colossalai/utils/memory_tracer/test_memstats_collector.py new file mode 100644 index 000000000..9c93600b7 --- /dev/null +++ b/colossalai/utils/memory_tracer/test_memstats_collector.py @@ -0,0 +1,43 @@ +from colossalai.utils.memory_tracer.memstats_collector import MemStatsCollector +from colossalai.utils.memory_tracer.model_data_memtracer import ModelDataTracer +import torch + + +def test_mem_collector(): + collector = MemStatsCollector() + + collector.start_collection() + + a = torch.randn(10).cuda() + + # sampling at time 0 + collector.sample_memstats() + + m_a = torch.randn(10).cuda() + ModelDataTracer().add_tensor(m_a) + b = torch.randn(10).cuda() + + # sampling at time 1 + collector.sample_memstats() + + a = b + + # sampling at time 2 + collector.sample_memstats() + + collector.finish_collection() + collector.reset() + + # do nothing after collection, just advance sampling cnter + collector.sample_memstats() + collector.sample_memstats() + + cuda_use, overall_use = collector.fetch_memstats() + print(cuda_use, overall_use) + + print(collector._model_data_cuda) + print(collector._overall_cuda) + + +if __name__ == '__main__': + test_mem_collector() diff --git a/colossalai/zero/init_ctx/init_context.py b/colossalai/zero/init_ctx/init_context.py index 17e89cbf7..64b14c644 100644 --- a/colossalai/zero/init_ctx/init_context.py +++ b/colossalai/zero/init_ctx/init_context.py @@ -3,10 +3,11 @@ import functools import torch from colossalai.zero.shard_utils import BaseShardStrategy from colossalai.zero.sharded_param import ShardedParamV2 -from colossalai.utils.memory_tracer.allocator import GLOBAL_MODEL_DATA_TRACER - +from colossalai.utils.memory_tracer.model_data_memtracer import ModelDataTracer # Inserts _post_init_method at the end of init method + + # for all sub classes of torch.nn.Module class InsertPostInitMethodToModuleSubClasses(object): @@ -152,7 +153,7 @@ class ZeroInitContext(InsertPostInitMethodToModuleSubClasses): if self.shard_param: self.shard_strategy.shard(tensor_list=[param.col_attr._data_sharded_tensor]) - GLOBAL_MODEL_DATA_TRACER.trace_tensor(param.col_attr._data_sharded_tensor.payload) + ModelDataTracer().add_tensor(param.col_attr._data_sharded_tensor.payload) if param.col_attr.grad and self.shard_grad: self.shard_strategy.shard(tensor_list=[param.col_attr._grad_sharded_tensor]) - GLOBAL_MODEL_DATA_TRACER.trace_tensor(param.col_attr._grad_sharded_tensor.payload) + ModelDataTracer().add_tensor(param.col_attr._grad_sharded_tensor.payload) diff --git a/colossalai/zero/sharded_model/sharded_model_v2.py b/colossalai/zero/sharded_model/sharded_model_v2.py index 7510cb68e..87ddb9c63 100644 --- a/colossalai/zero/sharded_model/sharded_model_v2.py +++ b/colossalai/zero/sharded_model/sharded_model_v2.py @@ -17,7 +17,8 @@ from colossalai.zero.sharded_model.reduce_scatter import ReduceScatterBucketer from colossalai.zero.sharded_param import ShardedParamV2 from torch.distributed import ProcessGroup from torch.nn.parameter import Parameter - +from colossalai.utils.memory_tracer.memstats_collector import MemStatsCollector +from colossalai.utils.memory_tracer.allocator import col_move_to_cpu from ._zero3_utils import (cast_float_arguments, cast_tensor_to_fp16, cast_tensor_to_fp32, chunk_and_pad, get_gradient_predivide_factor) @@ -33,7 +34,8 @@ class ShardedModelV2(nn.Module): fp32_reduce_scatter: bool = False, offload_config: Optional[dict] = None, gradient_predivide_factor: Optional[float] = 1.0, - shard_param: bool = True): + shard_param: bool = True, + use_memory_tracer: bool = False): r""" A demo to reconfigure zero1 shared_model. Currently do not consider the Optimizer States. @@ -59,8 +61,16 @@ class ShardedModelV2(nn.Module): if self.shard_param: self.shard_strategy.shard([param.col_attr.data]) + # Init Memory Statistics Collector + self._use_memory_tracer = use_memory_tracer + if self._use_memory_tracer: + self._memstats_collector = MemStatsCollector() + else: + self._memstats_collector = None + self._iter_cnter = 0 + # Register hooks - register_ophooks_recursively(self.module, [ZeroHook(self.shard_strategy)]) + register_ophooks_recursively(self.module, [ZeroHook(self.shard_strategy, self._memstats_collector)]) self.param_hook_mgr = BaseParamHookMgr(list(self.module.parameters())) self.param_hook_mgr.register_backward_hooks(self._grad_post_backward_hook) @@ -84,6 +94,9 @@ class ShardedModelV2(nn.Module): return self._cpu_offload def forward(self, *args: Any, **kwargs: Any) -> torch.Tensor: + if self._iter_cnter == 0 and self._memstats_collector: + # the opeartion will affect the flag in ZeroHook + self._memstats_collector.start_collection() args, kwargs = cast_float_arguments(cast_tensor_to_fp16, *args, **kwargs) outputs = self.module(*args, **kwargs) return outputs @@ -98,6 +111,12 @@ class ShardedModelV2(nn.Module): @torch.no_grad() def _final_backward_hook(self) -> None: + if self._iter_cnter == 0 and self._memstats_collector: + self._memstats_collector.finish_collection() + if self._memstats_collector: + self._memstats_collector.reset_sampling_cnter() + self._iter_cnter += 1 + if self._require_backward_grad_sync: # Flush any unreduced buckets in the post_backward stream. with torch.cuda.stream(self.comm_stream): @@ -185,8 +204,10 @@ class ShardedModelV2(nn.Module): reduced_grad.data = cast_tensor_to_fp32(reduced_grad.data) # Maybe offload + # TODO() optimize GPU->CPU bandwidth utilization if self._cpu_offload: - reduced_grad.data = reduced_grad.data.cpu() + col_move_to_cpu(reduced_grad) + # reduced_grad.data = reduced_grad.data.cpu() if param.col_attr.grad is None: param.col_attr.grad = reduced_grad.data diff --git a/colossalai/zero/sharded_optim/sharded_optim_v2.py b/colossalai/zero/sharded_optim/sharded_optim_v2.py index 47c0d26b7..d78ac3ecc 100644 --- a/colossalai/zero/sharded_optim/sharded_optim_v2.py +++ b/colossalai/zero/sharded_optim/sharded_optim_v2.py @@ -143,7 +143,7 @@ class ShardedOptimizerV2(ColossalaiOptimizer): # We have to use `copy_payload` instead of `reset_payload` # Since p.data is fp32 and p.col_attr.data is fp16 - # TODO() optimize this line + # TODO() optimize this line CPU (fp32) -> GPU (fp16) p.col_attr.data.copy_payload(p.data) if not is_param_sharded: diff --git a/tests/test_utils/test_activation_checkpointing.py b/tests/test_utils/test_activation_checkpointing.py index 619ab4bdc..237b77f06 100644 --- a/tests/test_utils/test_activation_checkpointing.py +++ b/tests/test_utils/test_activation_checkpointing.py @@ -56,6 +56,7 @@ def test_activation_checkpointing(cpu_offload): assert torch.all(data.grad == data_.grad), 'Gradient of the input does not match' torch.cuda.empty_cache() + # as seed manager is singleton # if we don't reset seeds here, # other tests will fail if running together with this test diff --git a/tests/test_zero_data_parallel/test_init_context.py b/tests/test_zero_data_parallel/test_init_context.py index a74e6959d..2e6fede05 100644 --- a/tests/test_zero_data_parallel/test_init_context.py +++ b/tests/test_zero_data_parallel/test_init_context.py @@ -9,12 +9,12 @@ import torch import torch.multiprocessing as mp from colossalai.utils import free_port from colossalai.utils.cuda import get_current_device -from colossalai.utils.memory_tracer.allocator import GLOBAL_MODEL_DATA_TRACER from colossalai.zero.init_ctx import ZeroInitContext from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy) from tests.components_to_test.registry import non_distributed_component_funcs from common import CONFIG +from colossalai.utils.memory_tracer.model_data_memtracer import ModelDataTracer def run_dist(rank, world_size, port, init_device, shard_strategy): @@ -37,13 +37,10 @@ def run_dist(rank, world_size, port, init_device, shard_strategy): assert param.col_attr.data.payload.device.type == init_device.type, \ f'{param.col_attr.data.payload.device.type} vs. {init_device.type}' - print(f'cpu usgae {GLOBAL_MODEL_DATA_TRACER.cpu_usage}') - print(f'cuda usgae {GLOBAL_MODEL_DATA_TRACER.cuda_usage}') + print(f'cuda usgae {ModelDataTracer().cuda_usage}') print(f'numel {model_numel_tensor}') if init_device.type == 'cuda': - assert (GLOBAL_MODEL_DATA_TRACER.cuda_usage > 0) - elif init_device.type == 'cpu': - assert (GLOBAL_MODEL_DATA_TRACER.cpu_usage > 0) + assert (ModelDataTracer().cuda_usage > 0) @pytest.mark.dist @@ -60,5 +57,5 @@ def test_zero_init_context(world_size, init_device, shard_strategy): if __name__ == '__main__': - test_zero_init_context(2, torch.device('cpu'), TensorShardStrategy) - test_zero_init_context(2, torch.device(f'cuda:{get_current_device()}'), TensorShardStrategy) + # test_zero_init_context(2, torch.device('cpu'), TensorShardStrategy) + test_zero_init_context(4, torch.device('cpu'), BucketTensorShardStrategy) diff --git a/tests/test_zero_data_parallel/test_shard_model_v2.py b/tests/test_zero_data_parallel/test_shard_model_v2.py index 54ca5ad3c..a2cae3ee5 100644 --- a/tests/test_zero_data_parallel/test_shard_model_v2.py +++ b/tests/test_zero_data_parallel/test_shard_model_v2.py @@ -18,6 +18,7 @@ from tests.components_to_test.registry import non_distributed_component_funcs from torch.nn.parallel import DistributedDataParallel as DDP from common import CONFIG, check_grads_padding, run_fwd_bwd +from colossalai.zero.sharded_model.utils import col_model_deepcopy def run_dist(rank, world_size, port, use_zero_init_ctx, enable_autocast, shard_strategy): @@ -33,12 +34,12 @@ def run_dist(rank, world_size, port, use_zero_init_ctx, enable_autocast, shard_s if use_zero_init_ctx: with ZeroInitContext(convert_fp16=True, - target_device=torch.device('cpu'), + target_device=torch.device(f'cpu:0'), shard_strategy=shard_strategy, shard_param=True, rm_torch_payload_on_the_fly=rm_torch_payload_on_the_fly): zero_model = model_builder(checkpoint=True) - zero_model = ShardedModelV2(zero_model, shard_strategy) + zero_model = ShardedModelV2(zero_model, shard_strategy, use_memory_tracer=True) model = model_builder(checkpoint=True).half() col_model_deepcopy(zero_model, model) @@ -59,6 +60,9 @@ def run_dist(rank, world_size, port, use_zero_init_ctx, enable_autocast, shard_s check_grads_padding(model, zero_model, loose=True) + print('overall cuda ', zero_model._memstats_collector._overall_cuda) + print('model cuda ', zero_model._memstats_collector._model_data_cuda) + @pytest.mark.dist @pytest.mark.parametrize("world_size", [1, 2]) diff --git a/tests/test_zero_data_parallel/test_sharded_optim_v2.py b/tests/test_zero_data_parallel/test_sharded_optim_v2.py index 9371cf66a..622df5693 100644 --- a/tests/test_zero_data_parallel/test_sharded_optim_v2.py +++ b/tests/test_zero_data_parallel/test_sharded_optim_v2.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - import copy from functools import partial @@ -82,4 +79,4 @@ def test_sharded_optim_v2(world_size, cpu_offload, shard_strategy): if __name__ == '__main__': - test_sharded_optim_v2(world_size=2, cpu_offload=True, shard_strategy=TensorShardStrategy) + test_sharded_optim_v2(world_size=2, cpu_offload=True, shard_strategy=TensorShardStrategy) \ No newline at end of file