mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-09-12 12:47:21 +00:00
[gemini] init genimi individual directory (#754)
This commit is contained in:
@@ -22,8 +22,8 @@ from colossalai.zero.sharded_model.reduce_scatter import ReduceScatterBucketer
|
||||
from colossalai.zero.sharded_param.tensorful_state import TensorState
|
||||
from torch.distributed import ProcessGroup
|
||||
from torch.nn.parameter import Parameter
|
||||
from colossalai.zero.utils.stateful_tensor_mgr import StatefulTensorMgr
|
||||
from colossalai.zero.utils.tensor_placement_policy import TensorPlacementPolicyFactory, TensorPlacementPolicy
|
||||
from colossalai.gemini.stateful_tensor_mgr import StatefulTensorMgr
|
||||
from colossalai.gemini.tensor_placement_policy import TensorPlacementPolicyFactory, TensorPlacementPolicy
|
||||
|
||||
from ._utils import (cast_float_arguments, cast_tensor_to_fp16, cast_tensor_to_fp32, chunk_and_pad, free_storage,
|
||||
get_gradient_predivide_factor)
|
||||
|
@@ -21,7 +21,7 @@ from torch import Tensor
|
||||
from torch.distributed import ProcessGroup
|
||||
from torch.nn.parameter import Parameter
|
||||
from torch.optim import Optimizer
|
||||
from colossalai.zero.utils.tensor_placement_policy import AutoTensorPlacementPolicy
|
||||
from colossalai.gemini.tensor_placement_policy import AutoTensorPlacementPolicy
|
||||
|
||||
|
||||
class OptimState(Enum):
|
||||
|
@@ -1,5 +1,3 @@
|
||||
from .stateful_tensor_mgr import StatefulTensorMgr
|
||||
from .tensor_placement_policy import TensorPlacementPolicyFactory
|
||||
from .zero_hook import ZeroHook
|
||||
|
||||
__all__ = ['StatefulTensorMgr', 'ZeroHook', 'TensorPlacementPolicyFactory']
|
||||
__all__ = ['ZeroHook']
|
@@ -1,79 +0,0 @@
|
||||
import functools
|
||||
import torch
|
||||
import types
|
||||
from colossalai.utils.cuda import get_current_device
|
||||
from colossalai.zero.sharded_param.sharded_param import ShardedParamV2
|
||||
from colossalai.zero.sharded_param.tensorful_state import StatefulTensor, TensorState
|
||||
from colossalai.zero.sharded_param.tensor_utils import colo_model_data_tensor_move_inline, colo_tensor_mem_usage
|
||||
from colossalai.zero.utils.tensor_placement_policy import TensorPlacementPolicy
|
||||
from typing import List
|
||||
from colossalai.logging import get_dist_logger
|
||||
|
||||
|
||||
class StatefulTensorMgr(object):
|
||||
"""
|
||||
Stateful Tensor Manager, inspired from PatrickStar
|
||||
|
||||
PatrickStar: Parallel Training of Pre-trained Models via Chunk-based Memory Management
|
||||
https://arxiv.org/abs/2108.05818
|
||||
"""
|
||||
|
||||
def __init__(self, tensor_placement_policy: TensorPlacementPolicy) -> None:
|
||||
self._tensor_placement_policy: TensorPlacementPolicy = tensor_placement_policy
|
||||
self._stateful_tensor_list: List[StatefulTensor] = []
|
||||
self._logger = get_dist_logger("StatefulTensorMgr")
|
||||
|
||||
self._warmup = True
|
||||
|
||||
self._compute_list: List[StatefulTensor] = []
|
||||
self._compute_idx: int = -1
|
||||
|
||||
def register_stateful_param(self, param: ShardedParamV2) -> None:
|
||||
assert isinstance(param, ShardedParamV2)
|
||||
for t in param.get_payload_tensors():
|
||||
assert isinstance(t, StatefulTensor)
|
||||
self._stateful_tensor_list.append(t)
|
||||
t.trans_state = types.MethodType(functools.partial(self._trans_state, t.trans_state), t)
|
||||
|
||||
def adjust_layout(self) -> None:
|
||||
""" Adjust the layout of statefuil tensor according to the information provided
|
||||
by mem_stats_collector, which should belongs to a Sharded Model.
|
||||
"""
|
||||
# find stateful tensor in state COMPUTE
|
||||
cuda_demand = 0
|
||||
move_to_cuda_tensor_list = []
|
||||
hold_cuda_tensor_list = []
|
||||
for tensor in self._stateful_tensor_list:
|
||||
if tensor.state == TensorState.FREE:
|
||||
continue
|
||||
|
||||
if tensor.device.type == 'cuda':
|
||||
if tensor.state in [TensorState.HOLD, TensorState.HOLD_AFTER_BWD, TensorState.HOLD_AFTER_FWD]:
|
||||
hold_cuda_tensor_list.append(tensor)
|
||||
elif tensor.device.type == 'cpu':
|
||||
if tensor.state == TensorState.COMPUTE:
|
||||
move_to_cuda_tensor_list.append(tensor)
|
||||
cuda_demand += colo_tensor_mem_usage(tensor.payload)[1]
|
||||
else:
|
||||
raise RuntimeError
|
||||
self._tensor_placement_policy.evict_tensors(hold_cuda_tensor_list,
|
||||
cuda_demand=cuda_demand,
|
||||
warmup=self._warmup,
|
||||
compute_list=self._compute_list,
|
||||
compute_idx=self._compute_idx)
|
||||
# move COMPUTE tensors to CUDA
|
||||
for t in move_to_cuda_tensor_list:
|
||||
colo_model_data_tensor_move_inline(t, get_current_device())
|
||||
|
||||
def reset(self):
|
||||
"""This function must be called when each iteration finishes
|
||||
"""
|
||||
self._warmup = False
|
||||
self._compute_idx = -1
|
||||
|
||||
def _trans_state(self, trans_state_func, stateful_tensor, state):
|
||||
trans_state_func(state)
|
||||
if state == TensorState.COMPUTE:
|
||||
self._compute_idx += 1
|
||||
if self._warmup:
|
||||
self._compute_list.append(stateful_tensor)
|
@@ -1,103 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List, Optional
|
||||
import torch
|
||||
from colossalai.utils import get_current_device
|
||||
from colossalai.zero.sharded_param.tensor_utils import colo_model_data_tensor_move_inline, colo_tensor_mem_usage
|
||||
from colossalai.utils.memory import colo_device_memory_capacity
|
||||
from colossalai.zero.sharded_param.tensorful_state import StatefulTensor
|
||||
from colossalai.utils.memory_tracer import MemStatsCollector
|
||||
from colossalai.utils.memory_tracer.model_data_memtracer import GLOBAL_MODEL_DATA_TRACER
|
||||
from typing import Type
|
||||
|
||||
|
||||
class TensorPlacementPolicy(ABC):
|
||||
|
||||
def __init__(self, device: Optional[torch.device], mem_stats_collector: Optional[MemStatsCollector] = None) -> None:
|
||||
self.device: Optional[torch.device] = device
|
||||
self.mem_stats_collector: Optional[MemStatsCollector] = mem_stats_collector
|
||||
|
||||
@abstractmethod
|
||||
def evict_tensors(self, hold_cuda_tensor_list: List[StatefulTensor], **kwargs) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class CPUTensorPlacementPolicy(TensorPlacementPolicy):
|
||||
|
||||
def __init__(self, mem_stats_collector: Optional[MemStatsCollector] = None) -> None:
|
||||
super().__init__(torch.device('cpu'), mem_stats_collector=mem_stats_collector)
|
||||
|
||||
def evict_tensors(self, hold_cuda_tensor_list: List[StatefulTensor], **kwargs) -> None:
|
||||
for t in hold_cuda_tensor_list:
|
||||
colo_model_data_tensor_move_inline(t, self.device)
|
||||
|
||||
|
||||
class CUDATensorPlacementPolicy(TensorPlacementPolicy):
|
||||
|
||||
def __init__(self, mem_stats_collector: Optional[MemStatsCollector] = None) -> None:
|
||||
assert torch.cuda.is_available(), 'Cannot use CUDATensorPlacementPolicy when CUDA is not available'
|
||||
super().__init__(get_current_device(), mem_stats_collector=mem_stats_collector)
|
||||
|
||||
def evict_tensors(self, hold_cuda_tensor_list: List[StatefulTensor], **kwargs) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class AutoTensorPlacementPolicy(TensorPlacementPolicy):
|
||||
|
||||
def __init__(self, mem_stats_collector: Optional[MemStatsCollector] = None) -> None:
|
||||
super().__init__(None, mem_stats_collector=mem_stats_collector)
|
||||
# model data will use 1-self._warmup_non_model_data_ratio CUDA memory in warmup phase
|
||||
self._warmup_non_model_data_ratio: float = 0.8
|
||||
|
||||
def evict_tensors(self,
|
||||
hold_cuda_tensor_list: List[StatefulTensor],
|
||||
cuda_demand: int = 0,
|
||||
warmup: bool = True,
|
||||
compute_list: List[StatefulTensor] = [],
|
||||
compute_idx: int = 0,
|
||||
**kwargs) -> None:
|
||||
cuda_capacity = colo_device_memory_capacity(get_current_device())
|
||||
used_cuda_model_data = GLOBAL_MODEL_DATA_TRACER.cuda_usage
|
||||
if warmup:
|
||||
# We designate a part of CUDA memory for model data in warmup iterations.
|
||||
max_cuda_non_model_data_per_period = cuda_capacity * self._warmup_non_model_data_ratio
|
||||
else:
|
||||
# max non-model-data cuda memory consumption of this sampling moment and the next sampling moment.
|
||||
max_cuda_non_model_data_per_period = self.mem_stats_collector.next_period_non_model_data_usage('cuda')
|
||||
total_cuda_model_data = cuda_capacity - max_cuda_non_model_data_per_period
|
||||
avail_cuda_model_data = total_cuda_model_data - used_cuda_model_data
|
||||
if avail_cuda_model_data < cuda_demand:
|
||||
# Move cuda_demand - avail_cuda_model_data volume of tensors
|
||||
# to_free_cuda_model_data = cuda_demand - avail_cuda_model_data
|
||||
to_free_cuda_model_data = cuda_demand - avail_cuda_model_data
|
||||
freed_cuda_model_data = 0
|
||||
to_free_tensor_list = hold_cuda_tensor_list
|
||||
if not warmup:
|
||||
next_compute_idx = {t: len(compute_list) for t in hold_cuda_tensor_list}
|
||||
for i in range(len(compute_list) - 1, compute_idx, -1):
|
||||
if compute_list[i] in next_compute_idx:
|
||||
next_compute_idx[compute_list[i]] = i
|
||||
next_compute_idx = sorted(next_compute_idx.items(), key=lambda pair: pair[1], reverse=True)
|
||||
to_free_tensor_list = [t for (t, idx) in next_compute_idx]
|
||||
for t in to_free_tensor_list:
|
||||
if freed_cuda_model_data >= to_free_cuda_model_data:
|
||||
break
|
||||
freed_cuda_model_data += colo_tensor_mem_usage(t)[0]
|
||||
colo_model_data_tensor_move_inline(t, torch.device('cpu'))
|
||||
if freed_cuda_model_data < to_free_cuda_model_data:
|
||||
raise RuntimeError(
|
||||
f"Adjust layout failed! No enough CUDA memory! Need {to_free_cuda_model_data}, freed {freed_cuda_model_data}"
|
||||
)
|
||||
|
||||
|
||||
class TensorPlacementPolicyFactory:
|
||||
|
||||
@staticmethod
|
||||
def create(policy_name: str) -> Type[TensorPlacementPolicy]:
|
||||
if policy_name == 'cpu':
|
||||
return CPUTensorPlacementPolicy
|
||||
elif policy_name == 'cuda':
|
||||
return CUDATensorPlacementPolicy
|
||||
elif policy_name == 'auto':
|
||||
return AutoTensorPlacementPolicy
|
||||
else:
|
||||
raise TypeError(f"Unknown tensor placement policy {policy_name}")
|
@@ -9,8 +9,7 @@ from colossalai.utils.memory_tracer.memstats_collector import MemStatsCollector
|
||||
|
||||
from colossalai.zero.shard_utils import BaseShardStrategy
|
||||
from colossalai.zero.sharded_param.tensorful_state import TensorState
|
||||
from colossalai.zero.utils.stateful_tensor_mgr import StatefulTensorMgr
|
||||
from colossalai.zero.sharded_param.tensor_utils import colo_model_data_tensor_move_inline
|
||||
from colossalai.gemini.stateful_tensor_mgr import StatefulTensorMgr
|
||||
|
||||
from colossalai.engine.ophooks import BaseOpHook
|
||||
|
||||
|
Reference in New Issue
Block a user