mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-09-08 20:40:34 +00:00
[autoparallel] add output handler and placeholder handler (#1694)
* [autoparallel] add output handler and placeholder handler * Delete test_solver_with_resnet.py * fix test bugs
This commit is contained in:
@@ -5,6 +5,7 @@ from colossalai.tensor.shape_consistency import ShapeConsistencyManager
|
||||
from typing import Dict, List, Union
|
||||
from ..sharding_strategy import ShardingStrategy_V2, StrategiesVector, OperationData, TrainCycleItem
|
||||
from ..strategy import StrategyGenerator_V2
|
||||
from .._utils import generate_resharding_costs
|
||||
|
||||
|
||||
class NodeHandler(ABC):
|
||||
@@ -52,19 +53,22 @@ class NodeHandler(ABC):
|
||||
|
||||
# create data structrure to store costs
|
||||
if op_data not in resharding_costs:
|
||||
resharding_costs[op_data] = {}
|
||||
resharding_costs[node] = []
|
||||
|
||||
# for each sharding spec generated by the predecessor's node handler
|
||||
# compute the resharding cost to switch to the sharding spec generated
|
||||
# by the current node handler
|
||||
for prev_sharding_spec in prev_sharding_specs:
|
||||
fwd_cost = shape_consistency_manager.shape_consistency(prev_sharding_spec, current_sharding_spec)
|
||||
bwd_cost = shape_consistency_manager.shape_consistency(current_sharding_spec, prev_sharding_spec)
|
||||
resharding_cost = TrainCycleItem(fwd=fwd_cost, bwd=bwd_cost, total=fwd_cost + bwd_cost)
|
||||
resharding_costs[op_data][prev_sharding_spec] = resharding_cost
|
||||
_, _, resharding_cost = shape_consistency_manager.shape_consistency(prev_sharding_spec,
|
||||
current_sharding_spec)
|
||||
resharding_cost = TrainCycleItem(fwd=resharding_cost["forward"],
|
||||
bwd=resharding_cost["backward"],
|
||||
total=resharding_cost["total"])
|
||||
resharding_costs[node].append(resharding_cost)
|
||||
strategy.resharding_costs = resharding_costs
|
||||
return strategy
|
||||
|
||||
def register_strategy(self, compute_resharding_cost: bool = False) -> StrategiesVector:
|
||||
def register_strategy(self, compute_resharding_cost: bool = True) -> StrategiesVector:
|
||||
"""
|
||||
Register different sharding strategies for the current node.
|
||||
"""
|
||||
@@ -86,7 +90,8 @@ class NodeHandler(ABC):
|
||||
# compute the resharding costs based on the previous node
|
||||
# strategies if specified
|
||||
if compute_resharding_cost:
|
||||
post_processed_strategies = list(map(self.update_resharding_cost, post_processed_strategies))
|
||||
updated_strategies = map(self.update_resharding_cost, strategies)
|
||||
strategies = list(updated_strategies)
|
||||
|
||||
self.strategies_vector.extend(post_processed_strategies)
|
||||
|
||||
|
39
colossalai/auto_parallel/solver/op_handler/output_handler.py
Normal file
39
colossalai/auto_parallel/solver/op_handler/output_handler.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import torch
|
||||
from .node_handler import NodeHandler
|
||||
from ..sharding_strategy import ShardingStrategy_V2, OperationDataType, OperationData, StrategiesVector
|
||||
from colossalai.auto_parallel.solver.strategy import StrategyGenerator_V2
|
||||
from colossalai.auto_parallel.solver.strategy.output_generator import OutputGenerator
|
||||
from typing import List, Dict
|
||||
from .registry import operator_registry
|
||||
|
||||
__all__ = ['OuputHandler']
|
||||
|
||||
|
||||
class OuputHandler(NodeHandler):
|
||||
"""
|
||||
A OuputHandler which deals with the sharding strategies for Output Node.
|
||||
"""
|
||||
|
||||
def get_strategy_generator(self) -> List[StrategyGenerator_V2]:
|
||||
op_data_mapping = self.get_operation_data_mapping()
|
||||
generators = []
|
||||
generators.append(OutputGenerator(op_data_mapping, self.device_mesh, self.predecessor_node))
|
||||
return generators
|
||||
|
||||
def get_operation_data_mapping(self) -> Dict[str, OperationData]:
|
||||
# use transposed shape for strategies
|
||||
# the strategies will be transformed back to its original shape in self.post_process
|
||||
dummy_output = torch.empty(1,).to("meta")
|
||||
physical_output = OperationData(name=str(self.node), type=OperationDataType.OUTPUT, data=dummy_output)
|
||||
|
||||
mapping = {"output": physical_output}
|
||||
for index, input_node in enumerate(self.predecessor_node):
|
||||
if not hasattr(input_node, "_meta_data"):
|
||||
print(input_node.name)
|
||||
physical_inputs = OperationData(name=str(input_node),
|
||||
type=OperationDataType.ARG,
|
||||
data=input_node._meta_data)
|
||||
name_key = f'input_{index}'
|
||||
mapping[name_key] = physical_inputs
|
||||
|
||||
return mapping
|
@@ -0,0 +1,30 @@
|
||||
import torch
|
||||
from .node_handler import NodeHandler
|
||||
from ..sharding_strategy import ShardingStrategy_V2, OperationDataType, OperationData
|
||||
from colossalai.auto_parallel.solver.strategy import StrategyGenerator_V2
|
||||
from colossalai.auto_parallel.solver.strategy.placeholder_generator import PlaceholderGenerator
|
||||
from typing import List, Dict
|
||||
from .registry import operator_registry
|
||||
|
||||
__all__ = ['PlacehodlerHandler']
|
||||
|
||||
|
||||
class PlacehodlerHandler(NodeHandler):
|
||||
"""
|
||||
A PlacehodlerHandler which deals with the sharding strategies for Placeholder Node.
|
||||
"""
|
||||
|
||||
def get_strategy_generator(self) -> List[StrategyGenerator_V2]:
|
||||
op_data_mapping = self.get_operation_data_mapping()
|
||||
generators = []
|
||||
generators.append(PlaceholderGenerator(op_data_mapping, self.device_mesh))
|
||||
return generators
|
||||
|
||||
def get_operation_data_mapping(self) -> Dict[str, OperationData]:
|
||||
# use transposed shape for strategies
|
||||
# the strategies will be transformed back to its original shape in self.post_process
|
||||
physical_output = OperationData(name=str(self.node), type=OperationDataType.OUTPUT, data=self.node._meta_data)
|
||||
|
||||
mapping = {"output": physical_output}
|
||||
|
||||
return mapping
|
@@ -129,7 +129,7 @@ class ShardingStrategy_V2:
|
||||
communication_cost: TrainCycleItem = None
|
||||
memory_cost: TrainCycleItem = None
|
||||
communication_actions: Dict[OperationData, CommSpec] = None
|
||||
resharding_costs: Dict[OperationData, Dict[ShardingSpec, TrainCycleItem]] = None
|
||||
resharding_costs: Dict[Node, List[TrainCycleItem]] = None
|
||||
|
||||
@property
|
||||
def input_sharding_specs(self) -> Dict[OperationData, ShardingSpec]:
|
||||
|
59
colossalai/auto_parallel/solver/strategy/output_generator.py
Normal file
59
colossalai/auto_parallel/solver/strategy/output_generator.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import operator
|
||||
from functools import reduce
|
||||
from ..sharding_strategy import ShardingStrategy_V2, TrainCycleItem, MemoryCost
|
||||
from colossalai.tensor.shape_consistency import CollectiveCommPattern
|
||||
from .strategy_generator import OutputStrategyGenerator
|
||||
from typing import List
|
||||
from .._utils import exception_handler
|
||||
import copy
|
||||
|
||||
__all__ = ['OutputGenerator']
|
||||
|
||||
|
||||
class OutputGenerator(OutputStrategyGenerator):
|
||||
"""
|
||||
OutputGenerator is a generic class to generate strategies for Output Node.
|
||||
"""
|
||||
|
||||
def validate(self) -> bool:
|
||||
return super().validate()
|
||||
|
||||
def update_compute_cost(self, strategy: ShardingStrategy_V2):
|
||||
compute_cost = TrainCycleItem(fwd=10, bwd=10, total=20)
|
||||
strategy.compute_cost = compute_cost
|
||||
|
||||
def update_memory_cost(self, strategy: ShardingStrategy_V2):
|
||||
'''
|
||||
Compute the memory cost per device with this specific strategy.
|
||||
'''
|
||||
fwd_mem_cost = MemoryCost(activation=0, parameter=0)
|
||||
|
||||
bwd_mem_cost = MemoryCost(activation=0, parameter=0)
|
||||
|
||||
# compute total cost
|
||||
total_mem_cost = MemoryCost(activation=0, parameter=0)
|
||||
memory_cost = TrainCycleItem(fwd=fwd_mem_cost, bwd=bwd_mem_cost, total=total_mem_cost)
|
||||
strategy.memory_cost = memory_cost
|
||||
|
||||
def generate(self):
|
||||
dim_partition_dict_mapping = {
|
||||
"output": {},
|
||||
}
|
||||
for index, _ in enumerate(self.predecessor_nodes):
|
||||
mapping_name = f"input_{index}"
|
||||
dim_partition_dict_mapping[mapping_name] = {}
|
||||
|
||||
communication_action_mapping = {}
|
||||
sharding_spec_mapping = self.to_sharding_spec_mapping(dim_partition_dict_mapping)
|
||||
|
||||
name = f'Replica Output'
|
||||
|
||||
strategy = self.get_sharding_strategy(name=name,
|
||||
sharding_spec_mapping=sharding_spec_mapping,
|
||||
communication_action_mapping=communication_action_mapping)
|
||||
|
||||
self.update_communication_cost(strategy)
|
||||
self.update_compute_cost(strategy)
|
||||
self.update_memory_cost(strategy)
|
||||
|
||||
return [strategy]
|
@@ -0,0 +1,60 @@
|
||||
import operator
|
||||
from functools import reduce
|
||||
from ..sharding_strategy import ShardingStrategy_V2, TrainCycleItem, MemoryCost
|
||||
from colossalai.tensor.shape_consistency import CollectiveCommPattern
|
||||
from .strategy_generator import StrategyGenerator_V2
|
||||
from typing import List
|
||||
from .._utils import exception_handler
|
||||
import copy
|
||||
|
||||
__all__ = ['PlaceholderGenerator']
|
||||
|
||||
|
||||
class PlaceholderGenerator(StrategyGenerator_V2):
|
||||
"""
|
||||
PlaceholderGenerator is a generic class to generate strategies for placeholder node.
|
||||
"""
|
||||
|
||||
def validate(self) -> bool:
|
||||
return super().validate()
|
||||
|
||||
def update_compute_cost(self, strategy: ShardingStrategy_V2):
|
||||
compute_cost = TrainCycleItem(fwd=10, bwd=10, total=20)
|
||||
strategy.compute_cost = compute_cost
|
||||
|
||||
def update_memory_cost(self, strategy: ShardingStrategy_V2):
|
||||
'''
|
||||
Compute the memory cost per device with this specific strategy.
|
||||
'''
|
||||
forward_size_mapping = {'output': self._compute_size_in_bytes(strategy, "output")}
|
||||
|
||||
# compute fwd cost incurred
|
||||
# fwd_cost = output
|
||||
fwd_activation_cost = sum([v for k, v in forward_size_mapping.items()])
|
||||
fwd_mem_cost = MemoryCost(activation=fwd_activation_cost, parameter=0)
|
||||
|
||||
bwd_mem_cost = MemoryCost(activation=0, parameter=0)
|
||||
|
||||
# compute total cost
|
||||
total_mem_cost = MemoryCost(activation=fwd_activation_cost, parameter=0)
|
||||
memory_cost = TrainCycleItem(fwd=fwd_mem_cost, bwd=bwd_mem_cost, total=total_mem_cost)
|
||||
strategy.memory_cost = memory_cost
|
||||
|
||||
def generate(self):
|
||||
dim_partition_dict_mapping = {
|
||||
"output": {},
|
||||
}
|
||||
communication_action_mapping = {}
|
||||
sharding_spec_mapping = self.to_sharding_spec_mapping(dim_partition_dict_mapping)
|
||||
|
||||
name = f'Replica Placeholder'
|
||||
|
||||
strategy = self.get_sharding_strategy(name=name,
|
||||
sharding_spec_mapping=sharding_spec_mapping,
|
||||
communication_action_mapping=communication_action_mapping)
|
||||
|
||||
self.update_communication_cost(strategy)
|
||||
self.update_compute_cost(strategy)
|
||||
self.update_memory_cost(strategy)
|
||||
|
||||
return [strategy]
|
@@ -169,3 +169,15 @@ class FollowingStrategyGenerator(StrategyGenerator_V2):
|
||||
self.op_data = operation_data_mapping
|
||||
self.device_mesh = device_mesh
|
||||
self.predecessor_node = predecessor_node
|
||||
|
||||
|
||||
class OutputStrategyGenerator(StrategyGenerator_V2):
|
||||
"""
|
||||
OutputStrategyGenerator is used to generate the sharding strategies for Output Node.
|
||||
"""
|
||||
|
||||
def __init__(self, operation_data_mapping: Dict[str, OperationData], device_mesh: DeviceMesh,
|
||||
predecessor_nodes: List[Node]):
|
||||
self.op_data = operation_data_mapping
|
||||
self.device_mesh = device_mesh
|
||||
self.predecessor_nodes = predecessor_nodes
|
||||
|
Reference in New Issue
Block a user