mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-09-01 17:17:05 +00:00
[autoparallel] add sum handler (#2101)
This commit is contained in:
@@ -15,6 +15,7 @@ from .output_handler import OuputHandler
|
||||
from .placeholder_handler import PlacehodlerHandler
|
||||
from .registry import operator_registry
|
||||
from .reshape_handler import ReshapeHandler
|
||||
from .sum_handler import SumHandler
|
||||
from .tensor_constructor_handler import TensorConstructorHandler
|
||||
from .unary_elementwise_handler import UnaryElementwiseHandler
|
||||
from .where_handler import WhereHandler
|
||||
@@ -25,5 +26,5 @@ __all__ = [
|
||||
'UnaryElementwiseHandler', 'ReshapeHandler', 'PlacehodlerHandler', 'OuputHandler', 'WhereHandler',
|
||||
'NormPoolingHandler', 'BinaryElementwiseHandler', 'MatMulHandler', 'operator_registry', 'ADDMMFunctionHandler',
|
||||
'GetItemHandler', 'GetattrHandler', 'ViewHandler', 'PermuteHandler', 'TensorConstructorHandler',
|
||||
'EmbeddingModuleHandler', 'EmbeddingFunctionHandler'
|
||||
'EmbeddingModuleHandler', 'EmbeddingFunctionHandler', 'SumHandler'
|
||||
]
|
||||
|
@@ -16,6 +16,7 @@ from .output_generator import OutputGenerator
|
||||
from .placeholder_generator import PlaceholderGenerator
|
||||
from .reshape_generator import ReshapeGenerator
|
||||
from .strategy_generator import StrategyGenerator
|
||||
from .sum_generator import SumGenerator
|
||||
from .tensor_constructor_generator import TensorConstructorGenerator
|
||||
from .unary_elementwise_generator import UnaryElementwiseGenerator
|
||||
from .where_generator import WhereGenerator
|
||||
@@ -26,5 +27,5 @@ __all__ = [
|
||||
'BatchNormStrategyGenerator', 'GetItemStrategyGenerator', 'TensorStrategyGenerator', 'TensorTupleStrategyGenerator',
|
||||
'LayerNormGenerator', 'ReshapeGenerator', 'PlaceholderGenerator', 'OutputGenerator', 'WhereGenerator',
|
||||
'ReshapeGenerator', 'NormalPoolStrategyGenerator', 'BinaryElementwiseStrategyGenerator', 'GetattrGenerator',
|
||||
'TensorConstructorGenerator', 'EmbeddingStrategyGenerator'
|
||||
'TensorConstructorGenerator', 'EmbeddingStrategyGenerator', 'SumGenerator'
|
||||
]
|
||||
|
@@ -0,0 +1,113 @@
|
||||
import copy
|
||||
import operator
|
||||
from functools import reduce
|
||||
from typing import List
|
||||
|
||||
from colossalai.auto_parallel.tensor_shard.node_handler.strategy.strategy_generator import FollowingStrategyGenerator
|
||||
from colossalai.auto_parallel.tensor_shard.sharding_strategy import (
|
||||
CommAction,
|
||||
CommType,
|
||||
MemoryCost,
|
||||
ShardingStrategy,
|
||||
TrainCycleItem,
|
||||
)
|
||||
from colossalai.auto_parallel.tensor_shard.utils import (
|
||||
check_keep_sharding_status,
|
||||
detect_reshape_mapping,
|
||||
infer_output_dim_partition_dict,
|
||||
)
|
||||
from colossalai.tensor.shape_consistency import CollectiveCommPattern
|
||||
from colossalai.tensor.sharding_spec import ShardingSpec
|
||||
|
||||
__all__ = ['SumGenerator']
|
||||
|
||||
|
||||
class SumGenerator(FollowingStrategyGenerator):
|
||||
"""
|
||||
SumGenerator deals with the sharding strategies of torch.sum op.
|
||||
"""
|
||||
|
||||
def validate(self) -> bool:
|
||||
return super().validate()
|
||||
|
||||
def update_compute_cost(self, strategy: ShardingStrategy):
|
||||
sharded_input_shape = strategy.sharding_specs[self.op_data['input']].get_sharded_shape_per_device()
|
||||
sharded_output_shape = strategy.sharding_specs[self.op_data['output']].get_sharded_shape_per_device()
|
||||
input_size_product = reduce(operator.mul, sharded_input_shape)
|
||||
output_size_product = reduce(operator.mul, sharded_output_shape)
|
||||
|
||||
compute_cost = TrainCycleItem(fwd=input_size_product,
|
||||
bwd=output_size_product,
|
||||
total=input_size_product + output_size_product)
|
||||
|
||||
strategy.compute_cost = compute_cost
|
||||
|
||||
def update_memory_cost(self, strategy: ShardingStrategy):
|
||||
'''
|
||||
Compute the memory cost per device with this specific strategy.
|
||||
'''
|
||||
forward_size_mapping = {
|
||||
'input': self._compute_size_in_bytes(strategy, "input"),
|
||||
'output': self._compute_size_in_bytes(strategy, "output")
|
||||
}
|
||||
|
||||
backward_size_mapping = copy.deepcopy(forward_size_mapping)
|
||||
backward_size_mapping.pop("output")
|
||||
# compute fwd cost incurred
|
||||
# fwd_cost = input + output
|
||||
fwd_activation_cost = sum([v for k, v in forward_size_mapping.items() if not self.is_param(k)])
|
||||
fwd_parameter_cost = sum([v for k, v in forward_size_mapping.items() if self.is_param(k)])
|
||||
fwd_mem_cost = MemoryCost(activation=fwd_activation_cost, parameter=fwd_parameter_cost)
|
||||
|
||||
# compute bwd cost incurred
|
||||
# bwd_cost = input_grad
|
||||
bwd_activation_cost = sum([v for k, v in backward_size_mapping.items() if not self.is_param(k)])
|
||||
bwd_parameter_cost = sum([v for k, v in backward_size_mapping.items() if self.is_param(k)])
|
||||
bwd_mem_cost = MemoryCost(activation=bwd_activation_cost, parameter=bwd_parameter_cost)
|
||||
|
||||
# compute total cost
|
||||
total_mem_cost = MemoryCost(activation=fwd_activation_cost + bwd_activation_cost,
|
||||
parameter=fwd_parameter_cost + bwd_parameter_cost)
|
||||
memory_cost = TrainCycleItem(fwd=fwd_mem_cost, bwd=bwd_mem_cost, total=total_mem_cost)
|
||||
strategy.memory_cost = memory_cost
|
||||
|
||||
def collate_strategies(self) -> List[ShardingStrategy]:
|
||||
strategy_list = []
|
||||
for index, strategy in enumerate(self.predecessor_node.strategies_vector):
|
||||
dim_partition_dict_mapping = {}
|
||||
communication_action_mapping = {}
|
||||
input_sharding_spec = strategy.output_sharding_specs[self.op_data["input"]]
|
||||
dim_partition_dict_for_input = copy.deepcopy(input_sharding_spec.dim_partition_dict)
|
||||
sum_dims, sum_mapping_dict = self.op_data['sum_info'].data
|
||||
|
||||
# TODO: a better way to handle the distributed sum is sum all the data on chip and then do all reduce
|
||||
# among all the shard groups
|
||||
recover_dims = []
|
||||
dim_partition_dict_for_output = {}
|
||||
for dim in dim_partition_dict_for_input:
|
||||
if dim in sum_dims:
|
||||
recover_dims.append(dim)
|
||||
elif dim in sum_mapping_dict:
|
||||
dim_partition_dict_for_output[sum_mapping_dict[dim]] = dim_partition_dict_for_input[dim]
|
||||
else:
|
||||
raise RuntimeError(f'dim {dim} is not in sum_mapping_dict or sum_dims')
|
||||
|
||||
for dim in recover_dims:
|
||||
dim_partition_dict_for_input.pop(dim)
|
||||
|
||||
dim_partition_dict_mapping = {
|
||||
"input": dim_partition_dict_for_input,
|
||||
"output": dim_partition_dict_for_output,
|
||||
}
|
||||
sharding_spec_mapping = self.to_sharding_spec_mapping(dim_partition_dict_mapping)
|
||||
# add index into name to pass the duplicated check
|
||||
# we keep same strategies with different name for node merging, and it will not increase the searching space,
|
||||
# because in solver, this node will be merged into other nodes, and solver will not create a new variable for this node.
|
||||
name = f'{sharding_spec_mapping["input"].sharding_sequence} -> {sharding_spec_mapping["output"].sharding_sequence}_{index}'
|
||||
|
||||
strategy = self.get_sharding_strategy(name=name,
|
||||
sharding_spec_mapping=sharding_spec_mapping,
|
||||
communication_action_mapping=communication_action_mapping)
|
||||
strategy_list.append(strategy)
|
||||
|
||||
return strategy_list
|
@@ -0,0 +1,81 @@
|
||||
from typing import Dict, List
|
||||
|
||||
import torch
|
||||
|
||||
from ..sharding_strategy import OperationData, OperationDataType
|
||||
from .node_handler import NodeHandler
|
||||
from .registry import operator_registry
|
||||
from .strategy import StrategyGenerator, SumGenerator
|
||||
|
||||
__all__ = ['SumHandler']
|
||||
|
||||
|
||||
@operator_registry.register(torch.Tensor.sum)
|
||||
@operator_registry.register(torch.sum)
|
||||
class SumHandler(NodeHandler):
|
||||
"""
|
||||
A SumHandler which deals with the sharding strategies for torch.sum or torch.Tensor.sum.
|
||||
"""
|
||||
|
||||
def get_strategy_generator(self) -> List[StrategyGenerator]:
|
||||
op_data_mapping = self.get_operation_data_mapping()
|
||||
generators = []
|
||||
generators.append(SumGenerator(op_data_mapping, self.device_mesh, self.node.args[0]))
|
||||
return generators
|
||||
|
||||
def get_operation_data_mapping(self) -> Dict[str, OperationData]:
|
||||
# check if the input operand is a parameter
|
||||
if isinstance(self.node.args[0]._meta_data, torch.nn.parameter.Parameter):
|
||||
data_type = OperationDataType.PARAM
|
||||
else:
|
||||
data_type = OperationDataType.ARG
|
||||
|
||||
input_data = self.node.args[0]._meta_data
|
||||
physical_input_operand = OperationData(name=str(self.node.args[0]), type=data_type, data=input_data)
|
||||
|
||||
if len(self.node.args) > 1:
|
||||
sum_dims = self.node.args[1]
|
||||
else:
|
||||
sum_dims = tuple(range(self.node.args[0]._meta_data.dim()))
|
||||
|
||||
if isinstance(sum_dims, int):
|
||||
sum_dims = (sum_dims,)
|
||||
|
||||
# recover negative value to positive
|
||||
num_dims = self.node.args[0]._meta_data.dim()
|
||||
for i in range(len(sum_dims)):
|
||||
if sum_dims[i] < 0:
|
||||
sum_dims[i] += num_dims
|
||||
|
||||
# mapping the input dims to output dims
|
||||
# For examples:
|
||||
# input: torch.rand(2, 3, 4, 5)
|
||||
# output: torch.sum(input, (0, 2))
|
||||
# sum_mapping_dict = {1: 0, 3: 1}
|
||||
# sum_mapping_dict[1] = 0 means the 0th dim of output is the 1st dim of input
|
||||
# sum_mapping_dict[3] = 1 means the 1st dim of output is the 3rd dim of input
|
||||
sum_mapping_dict = {}
|
||||
if 'keepdim' in self.node.kwargs and self.node.kwargs['keepdim']:
|
||||
for i in range(num_dims):
|
||||
sum_mapping_dict.update({i: i})
|
||||
else:
|
||||
output_index = 0
|
||||
for i in range(num_dims):
|
||||
if i not in sum_dims:
|
||||
sum_mapping_dict.update({i: output_index})
|
||||
output_index += 1
|
||||
assert output_index == self.node._meta_data.dim()
|
||||
|
||||
sum_info = (sum_dims, sum_mapping_dict)
|
||||
physical_shape_operand = OperationData(name='sum_info', type=OperationDataType.ARG, data=sum_info)
|
||||
|
||||
output_data = self.node._meta_data
|
||||
physical_output_operand = OperationData(name=str(self.node), type=OperationDataType.OUTPUT, data=output_data)
|
||||
|
||||
mapping = {
|
||||
"input": physical_input_operand,
|
||||
"sum_info": physical_shape_operand,
|
||||
"output": physical_output_operand
|
||||
}
|
||||
|
||||
return mapping
|
Reference in New Issue
Block a user