From 8d52441f6da3eca3230b28567924b8b9bd595a3d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 29 May 2025 10:16:55 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../coati/distributed/consumer.py | 49 ++++++++++++------- .../coati/distributed/grpo_consumer.py | 41 ++++++++-------- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/applications/ColossalChat/coati/distributed/consumer.py b/applications/ColossalChat/coati/distributed/consumer.py index 593e0f4ec..72f3ecf67 100644 --- a/applications/ColossalChat/coati/distributed/consumer.py +++ b/applications/ColossalChat/coati/distributed/consumer.py @@ -117,36 +117,47 @@ class BaseConsumer: # receive data from producers for r in range(self.num_producers): print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}") - raw_batch = ray_broadcast_tensor_dict(None, src=0, device=self.device, group_name=f"sync_data_{r}") + raw_batch = ray_broadcast_tensor_dict( + None, src=0, device=self.device, group_name=f"sync_data_{r}" + ) # calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete), # we need to calculate the metrics before filtering here for logging # [batch_size, num_generations, ...] -> [batch_size * num_generations, ...] - raw_batch_with_reward = self.calculate_reward({k:v.view(-1, v.size(-1)) if k!='temperature' else v for k, v in raw_batch.items()}) - raw_batch_with_reward = {k: v.view(-1, self.num_generations, v.size(-1)) if k!='temperature' else v for k, v in raw_batch_with_reward.items()} - # [batch_size, num_generations] -> [batch_size] - group_reward_mean = raw_batch_with_reward["reward"][:,:,0].mean(dim=-1) - group_format_acc_mean = raw_batch_with_reward["format_acc"][:,:,0].mean(dim=-1) - group_ans_acc_mean = raw_batch_with_reward["ans_acc"][:,:,0].mean(dim=-1) - group_response_len = ( - (raw_batch_with_reward["response_idx"][:, :, 1] - raw_batch_with_reward["response_idx"][:, :, 0] + 1) - .type(torch.float32) - .mean(dim=-1) + raw_batch_with_reward = self.calculate_reward( + {k: v.view(-1, v.size(-1)) if k != "temperature" else v for k, v in raw_batch.items()} ) + raw_batch_with_reward = { + k: v.view(-1, self.num_generations, v.size(-1)) if k != "temperature" else v + for k, v in raw_batch_with_reward.items() + } + # [batch_size, num_generations] -> [batch_size] + reward = raw_batch_with_reward["reward"][:, :, 0] + format_acc = raw_batch_with_reward["format_acc"][:, :, 0] + ans_acc = raw_batch_with_reward["ans_acc"][:, :, 0] + response_len = ( + raw_batch_with_reward["response_idx"][:, :, 1] + - raw_batch_with_reward["response_idx"][:, :, 0] + + 1 + ).type(torch.float32) effective_group_mask = None if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True): # filter the group based on the reward and accuracy effective_group_mask = torch.logical_and( group_ans_acc_mean > self.filter_range[0], group_ans_acc_mean < self.filter_range[1] ) - raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]] + raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]] for group_idx, group_with_reward in enumerate(raw_batch_with_reward): self.buffer.append( [ - group_with_reward if effective_group_mask is None or effective_group_mask[group_idx] else None, - group_reward_mean[group_idx], - group_format_acc_mean[group_idx], - group_ans_acc_mean[group_idx], - group_response_len[group_idx], + ( + group_with_reward + if effective_group_mask is None or effective_group_mask[group_idx] + else None + ), + reward[group_idx], + format_acc[group_idx], + ans_acc[group_idx], + response_len[group_idx], ] ) if effective_group_mask is not None: @@ -160,7 +171,9 @@ class BaseConsumer: effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = ( buffer_idx ) - pbar.set_postfix({"Collect Effective Prompt": f"{len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}"}) + print( + f"[T{dist.get_rank()}] Collect Effective Prompt: {len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}" + ) while len(effective_group_to_raw_group_mapping) >= self.dp_size * self.minibatch_size: # on each dp_rank, we use minibatch_size effective samples to form a batch diff --git a/applications/ColossalChat/coati/distributed/grpo_consumer.py b/applications/ColossalChat/coati/distributed/grpo_consumer.py index 3458a216f..245d55af6 100644 --- a/applications/ColossalChat/coati/distributed/grpo_consumer.py +++ b/applications/ColossalChat/coati/distributed/grpo_consumer.py @@ -84,12 +84,9 @@ class GRPOConsumer(BaseConsumer): self.project_name = project_name self.effective_sample_count = 0 self.effective_prompt_count = 0 -<<<<<<< HEAD -======= self.total_sample_count = 0 self.overlength_samples = 0 self.total_overlength_samples = 0 ->>>>>>> c8b368c2 (add overlength sample count (#6332)) self.project_name = project_name self.run_name = run_name self.wandb_group_name = wandb_group_name @@ -218,10 +215,18 @@ class GRPOConsumer(BaseConsumer): loss_mask, action_mask[:, -1] == False, ) - - self.overlength_samples = (old_loss_mask & ~loss_mask).sum().item() - self.overlength_samples = all_reduce_sum( - torch.tensor(self.overlength_samples, device=loss_mask.device), self.plugin + if self.filter_range is not None and self.grpo_config.get("dynamic_batching", False) == False: + # filter out samples with reward outside the range + # if dynamic batching is enabled, we filter out out of range groups before training + group_ans_acc_mean = ( + ans_acc.view(-1, self.num_generations).mean(dim=1).repeat_interleave(self.num_generations, dim=-1) + ) + loss_mask = torch.logical_and( + loss_mask, + torch.logical_and( + group_ans_acc_mean > self.filter_range[0], + group_ans_acc_mean < self.filter_range[1], + ), ) self.total_overlength_samples += self.overlength_samples.item() @@ -448,18 +453,12 @@ class GRPOConsumer(BaseConsumer): self.optimizer.step() self.optimizer.zero_grad() self.global_step += 1 -<<<<<<< HEAD - sample_utilization = self.effective_sample_count / len(self.raw_train_batch_reward) / self.num_generations - self.effective_prompt_count = 0 - self.effective_sample_count = 0 -======= sample_utilization = self.effective_sample_count / self.total_sample_count overlength_samples_percentage = self.total_overlength_samples / self.total_sample_count self.effective_prompt_count = 0 self.effective_sample_count = 0 self.total_sample_count = 0 self.total_overlength_samples = 0 ->>>>>>> c8b368c2 (add overlength sample count (#6332)) loss_scalar = self.accum_loss.item() if not self.plugin.pp_size > 1 or ( self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0 @@ -467,14 +466,14 @@ class GRPOConsumer(BaseConsumer): if (not self.plugin.pp_size > 1 and self.rank == 0) or ( self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0 ): - raw_batch_reward_mean = sum(self.raw_train_batch_reward) / len(self.raw_train_batch_reward) - raw_batch_format_acc_mean = sum(self.raw_train_batch_format_acc) / len( - self.raw_train_batch_format_acc - ) - raw_batch_ans_acc_mean = sum(self.raw_train_batch_ans_acc) / len(self.raw_train_batch_ans_acc) - raw_batch_response_len_mean = sum(self.raw_train_batch_response_len) / len( - self.raw_train_batch_response_len - ) + raw_batch_reward_mean = torch.cat(self.raw_train_batch_reward, dim=0).mean().cpu().item() + raw_batch_format_acc_mean = torch.cat(self.raw_train_batch_format_acc, dim=0).mean().cpu().item() + raw_batch_ans_acc_mean = torch.cat(self.raw_train_batch_ans_acc, dim=0).mean().cpu().item() + raw_batch_response_len = torch.cat(self.raw_train_batch_response_len, dim=0) + raw_batch_response_len_mean = raw_batch_response_len.mean().cpu().item() + overlength_samples_ratio = ( + (raw_batch_response_len >= action_mask.size(-1)).to(float).mean().cpu().item() + ) # not an exact figure, but a close estimate self.raw_train_batch_reward = [] self.raw_train_batch_format_acc = [] self.raw_train_batch_ans_acc = []