[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot] 2025-05-29 10:16:55 +00:00 committed by YeAnbang
parent a246bf25c3
commit 8d52441f6d
2 changed files with 51 additions and 39 deletions

View File

@ -117,36 +117,47 @@ class BaseConsumer:
# receive data from producers # receive data from producers
for r in range(self.num_producers): for r in range(self.num_producers):
print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}") print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}")
raw_batch = ray_broadcast_tensor_dict(None, src=0, device=self.device, group_name=f"sync_data_{r}") raw_batch = ray_broadcast_tensor_dict(
None, src=0, device=self.device, group_name=f"sync_data_{r}"
)
# calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete), # calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete),
# we need to calculate the metrics before filtering here for logging # we need to calculate the metrics before filtering here for logging
# [batch_size, num_generations, ...] -> [batch_size * num_generations, ...] # [batch_size, num_generations, ...] -> [batch_size * num_generations, ...]
raw_batch_with_reward = self.calculate_reward({k:v.view(-1, v.size(-1)) if k!='temperature' else v for k, v in raw_batch.items()}) raw_batch_with_reward = self.calculate_reward(
raw_batch_with_reward = {k: v.view(-1, self.num_generations, v.size(-1)) if k!='temperature' else v for k, v in raw_batch_with_reward.items()} {k: v.view(-1, v.size(-1)) if k != "temperature" else v for k, v in raw_batch.items()}
# [batch_size, num_generations] -> [batch_size]
group_reward_mean = raw_batch_with_reward["reward"][:,:,0].mean(dim=-1)
group_format_acc_mean = raw_batch_with_reward["format_acc"][:,:,0].mean(dim=-1)
group_ans_acc_mean = raw_batch_with_reward["ans_acc"][:,:,0].mean(dim=-1)
group_response_len = (
(raw_batch_with_reward["response_idx"][:, :, 1] - raw_batch_with_reward["response_idx"][:, :, 0] + 1)
.type(torch.float32)
.mean(dim=-1)
) )
raw_batch_with_reward = {
k: v.view(-1, self.num_generations, v.size(-1)) if k != "temperature" else v
for k, v in raw_batch_with_reward.items()
}
# [batch_size, num_generations] -> [batch_size]
reward = raw_batch_with_reward["reward"][:, :, 0]
format_acc = raw_batch_with_reward["format_acc"][:, :, 0]
ans_acc = raw_batch_with_reward["ans_acc"][:, :, 0]
response_len = (
raw_batch_with_reward["response_idx"][:, :, 1]
- raw_batch_with_reward["response_idx"][:, :, 0]
+ 1
).type(torch.float32)
effective_group_mask = None effective_group_mask = None
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True): if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True):
# filter the group based on the reward and accuracy # filter the group based on the reward and accuracy
effective_group_mask = torch.logical_and( effective_group_mask = torch.logical_and(
group_ans_acc_mean > self.filter_range[0], group_ans_acc_mean < self.filter_range[1] group_ans_acc_mean > self.filter_range[0], group_ans_acc_mean < self.filter_range[1]
) )
raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]] raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]]
for group_idx, group_with_reward in enumerate(raw_batch_with_reward): for group_idx, group_with_reward in enumerate(raw_batch_with_reward):
self.buffer.append( self.buffer.append(
[ [
group_with_reward if effective_group_mask is None or effective_group_mask[group_idx] else None, (
group_reward_mean[group_idx], group_with_reward
group_format_acc_mean[group_idx], if effective_group_mask is None or effective_group_mask[group_idx]
group_ans_acc_mean[group_idx], else None
group_response_len[group_idx], ),
reward[group_idx],
format_acc[group_idx],
ans_acc[group_idx],
response_len[group_idx],
] ]
) )
if effective_group_mask is not None: if effective_group_mask is not None:
@ -160,7 +171,9 @@ class BaseConsumer:
effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = ( effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = (
buffer_idx buffer_idx
) )
pbar.set_postfix({"Collect Effective Prompt": f"{len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}"}) print(
f"[T{dist.get_rank()}] Collect Effective Prompt: {len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}"
)
while len(effective_group_to_raw_group_mapping) >= self.dp_size * self.minibatch_size: while len(effective_group_to_raw_group_mapping) >= self.dp_size * self.minibatch_size:
# on each dp_rank, we use minibatch_size effective samples to form a batch # on each dp_rank, we use minibatch_size effective samples to form a batch

View File

@ -84,12 +84,9 @@ class GRPOConsumer(BaseConsumer):
self.project_name = project_name self.project_name = project_name
self.effective_sample_count = 0 self.effective_sample_count = 0
self.effective_prompt_count = 0 self.effective_prompt_count = 0
<<<<<<< HEAD
=======
self.total_sample_count = 0 self.total_sample_count = 0
self.overlength_samples = 0 self.overlength_samples = 0
self.total_overlength_samples = 0 self.total_overlength_samples = 0
>>>>>>> c8b368c2 (add overlength sample count (#6332))
self.project_name = project_name self.project_name = project_name
self.run_name = run_name self.run_name = run_name
self.wandb_group_name = wandb_group_name self.wandb_group_name = wandb_group_name
@ -218,10 +215,18 @@ class GRPOConsumer(BaseConsumer):
loss_mask, loss_mask,
action_mask[:, -1] == False, action_mask[:, -1] == False,
) )
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", False) == False:
self.overlength_samples = (old_loss_mask & ~loss_mask).sum().item() # filter out samples with reward outside the range
self.overlength_samples = all_reduce_sum( # if dynamic batching is enabled, we filter out out of range groups before training
torch.tensor(self.overlength_samples, device=loss_mask.device), self.plugin group_ans_acc_mean = (
ans_acc.view(-1, self.num_generations).mean(dim=1).repeat_interleave(self.num_generations, dim=-1)
)
loss_mask = torch.logical_and(
loss_mask,
torch.logical_and(
group_ans_acc_mean > self.filter_range[0],
group_ans_acc_mean < self.filter_range[1],
),
) )
self.total_overlength_samples += self.overlength_samples.item() self.total_overlength_samples += self.overlength_samples.item()
@ -448,18 +453,12 @@ class GRPOConsumer(BaseConsumer):
self.optimizer.step() self.optimizer.step()
self.optimizer.zero_grad() self.optimizer.zero_grad()
self.global_step += 1 self.global_step += 1
<<<<<<< HEAD
sample_utilization = self.effective_sample_count / len(self.raw_train_batch_reward) / self.num_generations
self.effective_prompt_count = 0
self.effective_sample_count = 0
=======
sample_utilization = self.effective_sample_count / self.total_sample_count sample_utilization = self.effective_sample_count / self.total_sample_count
overlength_samples_percentage = self.total_overlength_samples / self.total_sample_count overlength_samples_percentage = self.total_overlength_samples / self.total_sample_count
self.effective_prompt_count = 0 self.effective_prompt_count = 0
self.effective_sample_count = 0 self.effective_sample_count = 0
self.total_sample_count = 0 self.total_sample_count = 0
self.total_overlength_samples = 0 self.total_overlength_samples = 0
>>>>>>> c8b368c2 (add overlength sample count (#6332))
loss_scalar = self.accum_loss.item() loss_scalar = self.accum_loss.item()
if not self.plugin.pp_size > 1 or ( if not self.plugin.pp_size > 1 or (
self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0 self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0
@ -467,14 +466,14 @@ class GRPOConsumer(BaseConsumer):
if (not self.plugin.pp_size > 1 and self.rank == 0) or ( if (not self.plugin.pp_size > 1 and self.rank == 0) or (
self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0 self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0
): ):
raw_batch_reward_mean = sum(self.raw_train_batch_reward) / len(self.raw_train_batch_reward) raw_batch_reward_mean = torch.cat(self.raw_train_batch_reward, dim=0).mean().cpu().item()
raw_batch_format_acc_mean = sum(self.raw_train_batch_format_acc) / len( raw_batch_format_acc_mean = torch.cat(self.raw_train_batch_format_acc, dim=0).mean().cpu().item()
self.raw_train_batch_format_acc raw_batch_ans_acc_mean = torch.cat(self.raw_train_batch_ans_acc, dim=0).mean().cpu().item()
) raw_batch_response_len = torch.cat(self.raw_train_batch_response_len, dim=0)
raw_batch_ans_acc_mean = sum(self.raw_train_batch_ans_acc) / len(self.raw_train_batch_ans_acc) raw_batch_response_len_mean = raw_batch_response_len.mean().cpu().item()
raw_batch_response_len_mean = sum(self.raw_train_batch_response_len) / len( overlength_samples_ratio = (
self.raw_train_batch_response_len (raw_batch_response_len >= action_mask.size(-1)).to(float).mean().cpu().item()
) ) # not an exact figure, but a close estimate
self.raw_train_batch_reward = [] self.raw_train_batch_reward = []
self.raw_train_batch_format_acc = [] self.raw_train_batch_format_acc = []
self.raw_train_batch_ans_acc = [] self.raw_train_batch_ans_acc = []