mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-06-28 16:28:10 +00:00
[pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
This commit is contained in:
parent
7b921acc8a
commit
0d008110e7
@ -117,20 +117,28 @@ class BaseConsumer:
|
|||||||
# receive data from producers
|
# receive data from producers
|
||||||
for r in range(self.num_producers):
|
for r in range(self.num_producers):
|
||||||
print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}")
|
print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}")
|
||||||
raw_batch = ray_broadcast_tensor_dict(None, src=0, device=self.device, group_name=f"sync_data_{r}")
|
raw_batch = ray_broadcast_tensor_dict(
|
||||||
|
None, src=0, device=self.device, group_name=f"sync_data_{r}"
|
||||||
|
)
|
||||||
# calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete),
|
# calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete),
|
||||||
# we need to calculate the metrics before filtering here for logging
|
# we need to calculate the metrics before filtering here for logging
|
||||||
# [batch_size, num_generations, ...] -> [batch_size * num_generations, ...]
|
# [batch_size, num_generations, ...] -> [batch_size * num_generations, ...]
|
||||||
raw_batch_with_reward = self.calculate_reward({k:v.view(-1, v.size(-1)) if k!='temperature' else v for k, v in raw_batch.items()})
|
raw_batch_with_reward = self.calculate_reward(
|
||||||
raw_batch_with_reward = {k: v.view(-1, self.num_generations, v.size(-1)) if k!='temperature' else v for k, v in raw_batch_with_reward.items()}
|
{k: v.view(-1, v.size(-1)) if k != "temperature" else v for k, v in raw_batch.items()}
|
||||||
|
)
|
||||||
|
raw_batch_with_reward = {
|
||||||
|
k: v.view(-1, self.num_generations, v.size(-1)) if k != "temperature" else v
|
||||||
|
for k, v in raw_batch_with_reward.items()
|
||||||
|
}
|
||||||
# [batch_size, num_generations] -> [batch_size]
|
# [batch_size, num_generations] -> [batch_size]
|
||||||
reward = raw_batch_with_reward["reward"][:, :, 0]
|
reward = raw_batch_with_reward["reward"][:, :, 0]
|
||||||
format_acc = raw_batch_with_reward["format_acc"][:, :, 0]
|
format_acc = raw_batch_with_reward["format_acc"][:, :, 0]
|
||||||
ans_acc = raw_batch_with_reward["ans_acc"][:, :, 0]
|
ans_acc = raw_batch_with_reward["ans_acc"][:, :, 0]
|
||||||
response_len = (
|
response_len = (
|
||||||
(raw_batch_with_reward["response_idx"][:, :, 1] - raw_batch_with_reward["response_idx"][:, :, 0] + 1)
|
raw_batch_with_reward["response_idx"][:, :, 1]
|
||||||
.type(torch.float32)
|
- raw_batch_with_reward["response_idx"][:, :, 0]
|
||||||
)
|
+ 1
|
||||||
|
).type(torch.float32)
|
||||||
effective_group_mask = None
|
effective_group_mask = None
|
||||||
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True):
|
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True):
|
||||||
# filter the group based on the reward and accuracy
|
# filter the group based on the reward and accuracy
|
||||||
@ -142,7 +150,11 @@ class BaseConsumer:
|
|||||||
for group_idx, group_with_reward in enumerate(raw_batch_with_reward):
|
for group_idx, group_with_reward in enumerate(raw_batch_with_reward):
|
||||||
self.buffer.append(
|
self.buffer.append(
|
||||||
[
|
[
|
||||||
group_with_reward if effective_group_mask is None or effective_group_mask[group_idx] else None,
|
(
|
||||||
|
group_with_reward
|
||||||
|
if effective_group_mask is None or effective_group_mask[group_idx]
|
||||||
|
else None
|
||||||
|
),
|
||||||
reward[group_idx],
|
reward[group_idx],
|
||||||
format_acc[group_idx],
|
format_acc[group_idx],
|
||||||
ans_acc[group_idx],
|
ans_acc[group_idx],
|
||||||
@ -160,7 +172,9 @@ class BaseConsumer:
|
|||||||
effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = (
|
effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = (
|
||||||
buffer_idx
|
buffer_idx
|
||||||
)
|
)
|
||||||
print(f"[T{dist.get_rank()}] Collect Effective Prompt: {len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}")
|
print(
|
||||||
|
f"[T{dist.get_rank()}] Collect Effective Prompt: {len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}"
|
||||||
|
)
|
||||||
|
|
||||||
while len(effective_group_to_raw_group_mapping) >= self.dp_size * self.minibatch_size:
|
while len(effective_group_to_raw_group_mapping) >= self.dp_size * self.minibatch_size:
|
||||||
# on each dp_rank, we use minibatch_size effective samples to form a batch
|
# on each dp_rank, we use minibatch_size effective samples to form a batch
|
||||||
|
@ -214,7 +214,9 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", False) == False:
|
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", False) == False:
|
||||||
# filter out samples with reward outside the range
|
# filter out samples with reward outside the range
|
||||||
# if dynamic batching is enabled, we filter out out of range groups before training
|
# if dynamic batching is enabled, we filter out out of range groups before training
|
||||||
group_ans_acc_mean = ans_acc.view(-1, self.num_generations).mean(dim=1).repeat_interleave(self.num_generations, dim=-1)
|
group_ans_acc_mean = (
|
||||||
|
ans_acc.view(-1, self.num_generations).mean(dim=1).repeat_interleave(self.num_generations, dim=-1)
|
||||||
|
)
|
||||||
loss_mask = torch.logical_and(
|
loss_mask = torch.logical_and(
|
||||||
loss_mask,
|
loss_mask,
|
||||||
torch.logical_and(
|
torch.logical_and(
|
||||||
@ -454,7 +456,9 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
raw_batch_ans_acc_mean = torch.cat(self.raw_train_batch_ans_acc, dim=0).mean().cpu().item()
|
raw_batch_ans_acc_mean = torch.cat(self.raw_train_batch_ans_acc, dim=0).mean().cpu().item()
|
||||||
raw_batch_response_len = torch.cat(self.raw_train_batch_response_len, dim=0)
|
raw_batch_response_len = torch.cat(self.raw_train_batch_response_len, dim=0)
|
||||||
raw_batch_response_len_mean = raw_batch_response_len.mean().cpu().item()
|
raw_batch_response_len_mean = raw_batch_response_len.mean().cpu().item()
|
||||||
overlength_samples_ratio = (raw_batch_response_len >= action_mask.size(-1)).to(float).mean().cpu().item() # not an exact figure, but a close estimate
|
overlength_samples_ratio = (
|
||||||
|
(raw_batch_response_len >= action_mask.size(-1)).to(float).mean().cpu().item()
|
||||||
|
) # not an exact figure, but a close estimate
|
||||||
self.raw_train_batch_reward = []
|
self.raw_train_batch_reward = []
|
||||||
self.raw_train_batch_format_acc = []
|
self.raw_train_batch_format_acc = []
|
||||||
self.raw_train_batch_ans_acc = []
|
self.raw_train_batch_ans_acc = []
|
||||||
|
Loading…
Reference in New Issue
Block a user