mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-09-10 21:40:02 +00:00
[autochunk] refactor chunk memory estimation (#2762)
* refact memory code * dont log free var memory * add memory align * update chunk target * update setting for new memory * finish test * update tracer * update typo * update test
This commit is contained in:
@@ -2,11 +2,11 @@ import copy
|
||||
from typing import Any, Callable, Dict, Iterable, List, Tuple
|
||||
|
||||
import torch
|
||||
from torch.fx.node import Node, map_arg
|
||||
from torch.fx.node import Node
|
||||
|
||||
from colossalai.fx.profiler import activation_size, parameter_size
|
||||
|
||||
from .utils import NodeMgr, delete_free_var_from_last_use, get_node_shape, is_non_memory_node
|
||||
from .utils import NodeMgr, get_node_shape, is_non_memory_node
|
||||
|
||||
|
||||
class EstimateMemory(object):
|
||||
@@ -14,102 +14,85 @@ class EstimateMemory(object):
|
||||
Estimate memory with chunk
|
||||
"""
|
||||
|
||||
def __init__(self, node_mgr: NodeMgr) -> None:
|
||||
self.node_mgr = node_mgr
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def _get_meta_node_size(self, x):
|
||||
def _get_node_size(self, x: Node) -> float:
|
||||
"""
|
||||
return node size in MB
|
||||
"""
|
||||
x = x.meta["tensor_meta"]
|
||||
x = x.numel * torch.tensor([], dtype=x.dtype).element_size()
|
||||
return x
|
||||
if not hasattr(x, "numel"):
|
||||
out = sum([i.numel * torch.tensor([], dtype=i.dtype).element_size() for i in x])
|
||||
else:
|
||||
out = x.numel * torch.tensor([], dtype=x.dtype).element_size()
|
||||
out = float(out) / 1024**2
|
||||
return out
|
||||
|
||||
def _get_output_node(self, n):
|
||||
out_size = activation_size(n.meta["fwd_out"])
|
||||
out_node = [n.name] if out_size > 0 else []
|
||||
return out_size, out_node
|
||||
def _add_active_node(self, n: Node, active_nodes: Dict, chunk_ratio: float) -> None:
|
||||
"""
|
||||
add an active node and its shape to active node dict
|
||||
"""
|
||||
if get_node_shape(n) is None:
|
||||
return
|
||||
if n.op == "placeholder":
|
||||
return
|
||||
if n not in active_nodes:
|
||||
node_size = self._get_node_size(n) * chunk_ratio
|
||||
active_nodes[n] = node_size
|
||||
|
||||
def _get_output_node_size(self, n):
|
||||
return self._get_output_node(n)[0]
|
||||
def _build_delete_node_dict(self, node_mgr: NodeMgr) -> Dict:
|
||||
"""
|
||||
build delete node dict, means node should be deleted at what time
|
||||
"""
|
||||
delete_node_dict = {}
|
||||
for idx, node in enumerate(node_mgr.get_node_list()):
|
||||
# skip non shape node
|
||||
if get_node_shape(node) is None:
|
||||
continue
|
||||
# dont remove free nodes
|
||||
elif node.op == "placeholder":
|
||||
delete_node_dict[node] = len(node_mgr.get_node_list())
|
||||
# node no user
|
||||
elif len(node.users) == 0:
|
||||
delete_node_dict[node] = idx
|
||||
# log max use
|
||||
else:
|
||||
node_user_idx = [node_mgr.find_node_idx(i) for i in node.users.keys()]
|
||||
delete_node_dict[node] = max(node_user_idx)
|
||||
return delete_node_dict
|
||||
|
||||
def _add_active_node(self, n, active_list):
|
||||
new_active = self._get_output_node(n)[1]
|
||||
if n.op == "placeholder" and get_node_shape(n) is not None:
|
||||
new_active.append(n.name)
|
||||
for i in new_active:
|
||||
if i not in active_list and get_node_shape(n) is not None:
|
||||
active_list.append(i)
|
||||
def _remove_deactive_node(self,
|
||||
user_idx: int,
|
||||
user: Node,
|
||||
active_nodes: List,
|
||||
delete_node_dict: List,
|
||||
kept_nodes: List = None) -> None:
|
||||
"""
|
||||
remove deactivate nodes from active nodes
|
||||
"""
|
||||
if kept_nodes is None:
|
||||
kept_nodes = []
|
||||
if user.op in ("output",):
|
||||
return
|
||||
|
||||
def _get_delete_node(self, user, user_to_last_uses, to_keep=None):
|
||||
delete_size = 0
|
||||
delete_node = []
|
||||
if user.op not in ("output",):
|
||||
nodes_to_delete = user_to_last_uses.get(user, [])
|
||||
if len(user.users) == 0:
|
||||
nodes_to_delete.append(user)
|
||||
if to_keep is not None:
|
||||
keep_list = []
|
||||
for n in nodes_to_delete:
|
||||
if n.name in to_keep:
|
||||
keep_list.append(n)
|
||||
for n in keep_list:
|
||||
if n in nodes_to_delete:
|
||||
nodes_to_delete.remove(n)
|
||||
if len(nodes_to_delete):
|
||||
out_node = [self._get_output_node(i) for i in nodes_to_delete]
|
||||
delete_size = sum([i[0] for i in out_node])
|
||||
for i in range(len(out_node)):
|
||||
if out_node[i][0] > 0:
|
||||
delete_node.append(out_node[i][1][0])
|
||||
elif nodes_to_delete[i].op == "placeholder":
|
||||
delete_node.append(nodes_to_delete[i].name)
|
||||
# elif any(j in nodes_to_delete[i].name for j in ['transpose', 'permute', 'view']):
|
||||
# delete_node.append(nodes_to_delete[i].name)
|
||||
return delete_size, delete_node
|
||||
for node in list(active_nodes.keys()):
|
||||
# dont delete kept nodes
|
||||
if node in kept_nodes:
|
||||
continue
|
||||
# should be deleted
|
||||
if delete_node_dict[node] <= user_idx:
|
||||
active_nodes.pop(node)
|
||||
|
||||
def _get_delete_node_size(self, user, user_to_last_uses, to_keep):
|
||||
return self._get_delete_node(user, user_to_last_uses, to_keep)[0]
|
||||
|
||||
def _remove_deactive_node(self, user, user_to_last_uses, active_list):
|
||||
delete_node = self._get_delete_node(user, user_to_last_uses)[1]
|
||||
for i in delete_node:
|
||||
if i in active_list:
|
||||
active_list.remove(i)
|
||||
|
||||
def _get_chunk_inputs_size(self, chunk_inputs, chunk_inputs_non_chunk, node_list, chunk_end_idx):
|
||||
nodes_to_delete = []
|
||||
for chunk_input in chunk_inputs + chunk_inputs_non_chunk:
|
||||
chunk_input_users = chunk_input.users.keys()
|
||||
chunk_input_users_idx = [self.node_mgr.find_node_idx(i) for i in chunk_input_users]
|
||||
if all(i <= chunk_end_idx for i in chunk_input_users_idx):
|
||||
if chunk_input not in nodes_to_delete:
|
||||
nodes_to_delete.append(chunk_input)
|
||||
out_node = [self._get_output_node(i) for i in nodes_to_delete]
|
||||
delete_size = sum([i[0] for i in out_node])
|
||||
return delete_size
|
||||
|
||||
def _get_last_usr(self, nodes):
|
||||
node_to_last_use: Dict[Node, Node] = {}
|
||||
user_to_last_uses: Dict[Node, List[Node]] = {}
|
||||
|
||||
def register_last_uses(n: Node, user: Node):
|
||||
if n not in node_to_last_use:
|
||||
node_to_last_use[n] = user
|
||||
user_to_last_uses.setdefault(user, []).append(n)
|
||||
|
||||
for node in reversed(nodes):
|
||||
map_arg(node.args, lambda n: register_last_uses(n, node))
|
||||
map_arg(node.kwargs, lambda n: register_last_uses(n, node))
|
||||
return user_to_last_uses
|
||||
|
||||
def _get_contiguous_memory(self, node, not_contiguous_list, delete=False):
|
||||
def _get_tmp_memory(self, node, not_contiguous_list, delete=False):
|
||||
mem = 0
|
||||
not_contiguous_ops = ["permute"]
|
||||
inherit_contiguous_ops = ["transpose", "view"]
|
||||
|
||||
if node.op == "call_function" and any(n in node.name for n in ["matmul", "reshape"]):
|
||||
for n in node.args:
|
||||
if n in not_contiguous_list:
|
||||
# matmul won't change origin tensor, but create a tmp copy
|
||||
mem += self._get_output_node_size(n)
|
||||
mem += self._get_node_size(n)
|
||||
elif node.op == "call_module":
|
||||
for n in node.args:
|
||||
if n in not_contiguous_list:
|
||||
@@ -129,31 +112,7 @@ class EstimateMemory(object):
|
||||
if chunk_dim is None:
|
||||
return 1.0
|
||||
else:
|
||||
return float(chunk_size) / node_shape[chunk_dim]
|
||||
|
||||
def _get_chunk_delete_node_size(self, user, user_to_last_uses, chunk_ratio, chunk_inputs_names):
|
||||
# if any(j in user.name for j in ['transpose', 'permute', 'view']):
|
||||
# return 0
|
||||
if user.op in ("placeholder", "output"):
|
||||
return 0
|
||||
nodes_to_delete = user_to_last_uses.get(user, [])
|
||||
if len(user.users) == 0:
|
||||
nodes_to_delete.append(user)
|
||||
delete_size = 0
|
||||
for n in nodes_to_delete:
|
||||
if n.name in chunk_inputs_names:
|
||||
continue
|
||||
delete_size += self._get_output_node_size(n) * chunk_ratio
|
||||
return delete_size
|
||||
|
||||
def _print_mem_log(self, log, nodes, title=None):
|
||||
if title:
|
||||
print(title)
|
||||
for idx, (l, n) in enumerate(zip(log, nodes)):
|
||||
print("%s:%.2f \t" % (n.name, l), end="")
|
||||
if (idx + 1) % 3 == 0:
|
||||
print("")
|
||||
print("\n")
|
||||
return chunk_size / float(node_shape[chunk_dim])
|
||||
|
||||
def _print_compute_op_mem_log(self, log, nodes, title=None):
|
||||
if title:
|
||||
@@ -168,12 +127,22 @@ class EstimateMemory(object):
|
||||
print("")
|
||||
print("\n")
|
||||
|
||||
def estimate_chunk_inference_mem(
|
||||
self,
|
||||
node_list: List,
|
||||
chunk_infos=None,
|
||||
print_mem=False,
|
||||
):
|
||||
def _add_active_nodes_from_list(self, active_nodes: List, nodes: List) -> List:
|
||||
"""
|
||||
add active nodes from nodes
|
||||
"""
|
||||
for n in nodes:
|
||||
self._add_active_node(n, active_nodes, 1)
|
||||
|
||||
def _get_memory_from_active_nodes(self, active_nodes: Dict) -> float:
|
||||
"""
|
||||
sum all memory of active nodes
|
||||
"""
|
||||
out = [i for i in active_nodes.values()]
|
||||
out = sum(out)
|
||||
return out
|
||||
|
||||
def estimate_chunk_inference_mem(self, node_list: List, chunk_infos: Dict = None, print_mem: bool = False):
|
||||
"""
|
||||
Estimate inference memory with chunk
|
||||
|
||||
@@ -191,18 +160,17 @@ class EstimateMemory(object):
|
||||
act_memory = 0.0
|
||||
act_memory_peak_log = []
|
||||
act_memory_after_node_log = []
|
||||
active_node_list = []
|
||||
active_node_list_log = []
|
||||
active_nodes = {}
|
||||
active_nodes_log = []
|
||||
not_contiguous_list = []
|
||||
user_to_last_uses = self._get_last_usr(node_list)
|
||||
user_to_last_uses_no_free_var = self._get_last_usr(node_list)
|
||||
delete_free_var_from_last_use(user_to_last_uses_no_free_var)
|
||||
node_mgr = NodeMgr(node_list)
|
||||
delete_node_dict = self._build_delete_node_dict(node_mgr)
|
||||
|
||||
use_chunk = True if chunk_infos is not None else False
|
||||
chunk_within = False
|
||||
chunk_region_idx = None
|
||||
chunk_ratio = 1 # use it to estimate chunk mem
|
||||
chunk_inputs_names = []
|
||||
chunk_inputs_all = []
|
||||
|
||||
if use_chunk:
|
||||
chunk_regions = [i["region"] for i in chunk_infos]
|
||||
@@ -210,30 +178,30 @@ class EstimateMemory(object):
|
||||
chunk_ends = [i[1] for i in chunk_regions]
|
||||
chunk_inputs = [i["inputs"] for i in chunk_infos]
|
||||
chunk_inputs_non_chunk = [i["inputs_non_chunk"] for i in chunk_infos]
|
||||
chunk_inputs_names = [j.name for i in chunk_inputs for j in i
|
||||
] + [j.name for i in chunk_inputs_non_chunk for j in i]
|
||||
chunk_inputs_all = [j for i in chunk_inputs for j in i] + [j for i in chunk_inputs_non_chunk for j in i]
|
||||
chunk_outputs = [i["outputs"] for i in chunk_infos]
|
||||
chunk_node_dim = [i["node_chunk_dim"] for i in chunk_infos]
|
||||
chunk_sizes = [i["chunk_size"] if "chunk_size" in i else 1 for i in chunk_infos]
|
||||
|
||||
for idx, node in enumerate(node_list):
|
||||
for idx, node in enumerate(node_mgr.get_node_list()):
|
||||
|
||||
# if node in chunk start nodes, change chunk ratio and add chunk_tensor
|
||||
if use_chunk and idx in chunk_starts:
|
||||
chunk_within = True
|
||||
chunk_region_idx = chunk_starts.index(idx)
|
||||
act_memory += sum(self._get_output_node_size(i) for i in chunk_outputs[chunk_region_idx]) / (1024**2)
|
||||
self._add_active_nodes_from_list(active_nodes, chunk_outputs[chunk_region_idx])
|
||||
|
||||
# determine chunk ratio for current node
|
||||
if chunk_within:
|
||||
chunk_ratio = self._get_chunk_ratio(
|
||||
node,
|
||||
chunk_node_dim[chunk_region_idx],
|
||||
chunk_sizes[chunk_region_idx],
|
||||
)
|
||||
chunk_ratio = self._get_chunk_ratio(node, chunk_node_dim[chunk_region_idx],
|
||||
chunk_sizes[chunk_region_idx])
|
||||
|
||||
# add current node as active node
|
||||
self._add_active_node(node, active_nodes, chunk_ratio)
|
||||
act_memory = self._get_memory_from_active_nodes(active_nodes)
|
||||
|
||||
# if node is placeholder, just add the size of the node
|
||||
if node.op == "placeholder":
|
||||
act_memory += self._get_meta_node_size(node) * chunk_ratio / (1024**2)
|
||||
act_memory_peak_log.append(act_memory)
|
||||
# skip output
|
||||
elif node.op == "output":
|
||||
@@ -241,83 +209,32 @@ class EstimateMemory(object):
|
||||
# no change for non compute node
|
||||
elif is_non_memory_node(node):
|
||||
act_memory_peak_log.append(act_memory)
|
||||
# node is a compute op
|
||||
# calculate tmp, output node and delete node memory
|
||||
# node is a compute op, calculate tmp
|
||||
else:
|
||||
# forward memory
|
||||
# TODO: contiguous_memory still not accurate for matmul, view, reshape and transpose
|
||||
act_memory += (self._get_contiguous_memory(node, not_contiguous_list) * chunk_ratio / (1024**2))
|
||||
act_memory += (self._get_output_node_size(node) * chunk_ratio / (1024**2))
|
||||
tmp_memory = self._get_tmp_memory(node, not_contiguous_list, delete=True) * chunk_ratio
|
||||
# record max act memory
|
||||
act_memory_peak_log.append(act_memory)
|
||||
# delete useless memory
|
||||
act_memory -= (self._get_contiguous_memory(node, not_contiguous_list, delete=True) * chunk_ratio /
|
||||
(1024**2))
|
||||
# delete unused vars not in chunk_input_list
|
||||
# we can't delete input nodes until chunk ends
|
||||
if chunk_within:
|
||||
act_memory -= self._get_chunk_delete_node_size(
|
||||
node,
|
||||
user_to_last_uses_no_free_var,
|
||||
chunk_ratio,
|
||||
chunk_inputs_names,
|
||||
) / (1024**2)
|
||||
else:
|
||||
act_memory -= self._get_delete_node_size(node, user_to_last_uses_no_free_var,
|
||||
chunk_inputs_names) / (1024**2)
|
||||
act_memory_peak_log.append(act_memory + tmp_memory)
|
||||
|
||||
# log active node, only effective without chunk
|
||||
self._add_active_node(node, active_node_list)
|
||||
self._remove_deactive_node(node, user_to_last_uses, active_node_list)
|
||||
# remove_deactive_node
|
||||
self._remove_deactive_node(idx, node, active_nodes, delete_node_dict, kept_nodes=chunk_inputs_all)
|
||||
|
||||
# if node in chunk end nodes, restore chunk settings
|
||||
if use_chunk and idx in chunk_ends:
|
||||
act_memory -= (self._get_output_node_size(node) * chunk_ratio / (1024**2))
|
||||
act_memory -= self._get_chunk_inputs_size(
|
||||
chunk_inputs[chunk_region_idx],
|
||||
chunk_inputs_non_chunk[chunk_region_idx],
|
||||
node_list,
|
||||
chunk_regions[chunk_region_idx][1],
|
||||
) / (1024**2)
|
||||
self._remove_deactive_node(idx, node, active_nodes, delete_node_dict) # dont provide kept nodes now
|
||||
chunk_within = False
|
||||
chunk_ratio = 1
|
||||
chunk_region_idx = None
|
||||
|
||||
act_memory = self._get_memory_from_active_nodes(active_nodes)
|
||||
act_memory_after_node_log.append(act_memory)
|
||||
active_node_list_log.append(copy.deepcopy(active_node_list))
|
||||
active_nodes_log.append(active_nodes.copy())
|
||||
|
||||
if print_mem:
|
||||
print("with chunk" if use_chunk else "without chunk")
|
||||
# self._print_mem_log(act_memory_peak_log, node_list, "peak")
|
||||
# self._print_mem_log(act_memory_after_node_log, node_list, "after")
|
||||
self._print_compute_op_mem_log(act_memory_peak_log, node_list, "peak")
|
||||
# self._print_compute_op_mem_log(
|
||||
# act_memory_after_node_log, node_list, "after"
|
||||
# )
|
||||
self._print_compute_op_mem_log(act_memory_peak_log, node_mgr.get_node_list(), "peak")
|
||||
|
||||
# param_memory = parameter_size(gm)
|
||||
# all_memory = act_memory + param_memory
|
||||
return act_memory_peak_log, act_memory_after_node_log, active_node_list_log
|
||||
|
||||
def get_active_nodes(self, node_list: List) -> List:
|
||||
"""
|
||||
Get active nodes for every node
|
||||
|
||||
Args:
|
||||
node_list (List): _description_
|
||||
|
||||
Returns:
|
||||
active_node_list_log (List): active nodes of every node. active nodes refer to
|
||||
nodes generated but not deleted.
|
||||
"""
|
||||
active_node_list = []
|
||||
active_node_list_log = []
|
||||
user_to_last_uses = self._get_last_usr(node_list)
|
||||
user_to_last_uses_no_free_var = self._get_last_usr(node_list)
|
||||
delete_free_var_from_last_use(user_to_last_uses_no_free_var)
|
||||
for _, node in enumerate(node_list):
|
||||
# log active node, only effective without chunk
|
||||
self._add_active_node(node, active_node_list)
|
||||
self._remove_deactive_node(node, user_to_last_uses, active_node_list)
|
||||
active_node_list_log.append(copy.deepcopy(active_node_list))
|
||||
return active_node_list_log
|
||||
return act_memory_peak_log, act_memory_after_node_log, active_nodes_log
|
||||
|
Reference in New Issue
Block a user