From 6be9862aafb4230ead0bcb95c65b5568043e2c13 Mon Sep 17 00:00:00 2001 From: wangbluo <2538539015@qq.com> Date: Tue, 15 Oct 2024 11:56:49 +0800 Subject: [PATCH] fix --- .../booster/plugin/hybrid_parallel_plugin.py | 1 + colossalai/shardformer/layer/attn.py | 14 +++++++++----- colossalai/shardformer/modeling/gpt2.py | 1 + colossalai/shardformer/modeling/llama.py | 1 + colossalai/shardformer/shard/shard_config.py | 1 + .../test_shardformer/test_layer/test_ring_attn.py | 11 ++++++++++- 6 files changed, 23 insertions(+), 6 deletions(-) diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index d15a9f397..de0cd242f 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -1181,6 +1181,7 @@ class HybridParallelPlugin(PipelinePluginBase): fp8_communication=fp8_communication, inner_ring_size=inner_ring_size, pg_mesh=self.pg_mesh, + sp_axis=self.sp_axis, ) self.amp_config = dict( diff --git a/colossalai/shardformer/layer/attn.py b/colossalai/shardformer/layer/attn.py index 6a972f075..1d8a89ce0 100644 --- a/colossalai/shardformer/layer/attn.py +++ b/colossalai/shardformer/layer/attn.py @@ -431,7 +431,7 @@ class RingAttention(torch.autograd.Function): INTER_RING_GROUP_COPY: dist.ProcessGroup = None @staticmethod - def get_double_ring_groups(sp_group, pg_mesh, inner_ring_size=None): + def get_double_ring_groups(sp_axis, pg_mesh, inner_ring_size=None): """ Get 2D ring groups for the given process group. Generally, to avoid congestion, the inner ring size shouldn't be larger than the number of NICs on each node. @@ -441,6 +441,9 @@ class RingAttention(torch.autograd.Function): Returns: Tuple[dist.ProcessGroup, dist.ProcessGroup]: Inner-ring process group and inter-ring process group. """ + assert pg_mesh is not None, f"Error: The pg mesh is None! please check the process group initialization." + + sp_group = pg_mesh.get_group_along_axis(sp_axis) sp_size = dist.get_world_size(sp_group) sp_rank = dist.get_rank(sp_group) @@ -496,6 +499,7 @@ class RingAttention(torch.autograd.Function): return_softmax=False, inner_ring_size=None, pg_mesh=None, + sp_axis=None, **kwargs, ): """ @@ -506,7 +510,7 @@ class RingAttention(torch.autograd.Function): q (torch.Tensor): Query tensor. Shape should be [B, nHeads, Sq, D] k (torch.Tensor): Key tensor. Shape should be [B, nHeads, Sq, Sq, D] v (torch.Tensor): Value tensor. Shape should be [B, nHeads, Sq, Sq, D] - sp_group (Optional[dist.ProcessGroup]): Process group for sequence parallelism + sp_axis (Optional[int]): Sp axis for the global pg mesh. sp_tream (torch.cuda.Stream): An different stream for output correction. cu_seqlens (Optional[torch.Tensor], optional): The cumulative sequence lengths of the sequences in the batch, used to index into q. @@ -539,13 +543,13 @@ class RingAttention(torch.autograd.Function): attention_mask_type in RingAttention.SUPPORTED_MASK_TYPES ), f"Mask type {attention_mask_type} is not supported yet." + assert pg_mesh is not None, f"Error: The pg mesh is None! please check the process group initialization." + clone_pg = lambda pg: dist.new_group(dist.get_process_group_ranks(pg)) if inner_ring_size != None: RingAttention.SP_GROUP = sp_group - inner_ring_group, inter_ring_group = RingAttention.get_double_ring_groups( - sp_group, pg_mesh, inner_ring_size - ) + inner_ring_group, inter_ring_group = RingAttention.get_double_ring_groups(sp_axis, pg_mesh, inner_ring_size) RingAttention.INNER_RING_GROUP = inner_ring_group RingAttention.INTER_RING_GROUP = inter_ring_group else: diff --git a/colossalai/shardformer/modeling/gpt2.py b/colossalai/shardformer/modeling/gpt2.py index d75e9b14a..1eaed167c 100644 --- a/colossalai/shardformer/modeling/gpt2.py +++ b/colossalai/shardformer/modeling/gpt2.py @@ -869,6 +869,7 @@ def get_gpt2_flash_attention_forward(shard_config: Optional[ShardConfig] = None) scale=scale, inner_ring_size=shard_config.inner_ring_size, pg_mesh=shard_config.pg_mesh, + sp_axis=shard_config.sp_axis, ) else: attn_output = ColoAttention.attention(query, key, value, **attention_mask, dropout_p=dropout_p, scale=scale) diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index ff7390643..ca0751f04 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -572,6 +572,7 @@ def get_llama_flash_attention_forward(shard_config: ShardConfig, sp_mode=None, s **attention_mask, inner_ring_size=shard_config.inner_ring_size, pg_mesh=shard_config.pg_mesh, + sp_axis=shard_config.sp_axis, ) elif shard_config.enable_flash_attention: diff --git a/colossalai/shardformer/shard/shard_config.py b/colossalai/shardformer/shard/shard_config.py index 14963f7a5..7b31e6928 100644 --- a/colossalai/shardformer/shard/shard_config.py +++ b/colossalai/shardformer/shard/shard_config.py @@ -51,6 +51,7 @@ class ShardConfig: extra_kwargs: Dict[str, Any] = field(default_factory=dict) # For ring attention + sp_axis: Optional[int] = None pg_mesh: Optional[int] = None inner_ring_size: Optional[int] = None # for moe related diff --git a/tests/test_shardformer/test_layer/test_ring_attn.py b/tests/test_shardformer/test_layer/test_ring_attn.py index 48f25dea3..23e8e5b78 100644 --- a/tests/test_shardformer/test_layer/test_ring_attn.py +++ b/tests/test_shardformer/test_layer/test_ring_attn.py @@ -24,6 +24,7 @@ def check_ring_attn(seq_len, bs, nheads, d, dtype, inner_ring_size): sp_group = dist.group.WORLD dp_size, pp_size, tp_size = 1, 1, 1 sp_size = dist.get_world_size() + sp_axis = 2 pg_mesh = ProcessGroupMesh(dp_size, pp_size, sp_size, tp_size) # Some outliers may seem large, but our errors are still lower than # than Megatron-LM context parallel's @@ -40,7 +41,15 @@ def check_ring_attn(seq_len, bs, nheads, d, dtype, inner_ring_size): # Ring attention vs single GPU ring_out, ring_lse = RingAttention.attention( - q, k, v, sp_group, AttnMaskType.CAUSAL, return_softmax=True, inner_ring_size=inner_ring_size, pg_mesh=pg_mesh + q, + k, + v, + sp_group, + AttnMaskType.CAUSAL, + return_softmax=True, + inner_ring_size=inner_ring_size, + pg_mesh=pg_mesh, + sp_axis=sp_axis, ) ring_out = ring_out.transpose(1, 2) out, lse, _ = flash_attn_qkvpacked_func(