address conversation

This commit is contained in:
YeAnbang
2025-05-29 10:25:59 +08:00
parent 58f8c9bb43
commit ee939d9aa5
3 changed files with 36 additions and 68 deletions

View File

@@ -118,48 +118,49 @@ class BaseConsumer:
for r in range(self.num_producers):
print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}")
raw_batch = ray_broadcast_tensor_dict(None, src=0, device=self.device, group_name=f"sync_data_{r}")
recv_effective_count = 0
# calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete),
# we need to calculate the metrics before filtering here for logging
raw_batch_with_reward = unbind_batch(self.calculate_reward(raw_batch))
for group_with_reward in raw_batch_with_reward:
group_reward_mean = group_with_reward["reward"].mean().cpu().item()
group_format_acc_mean = group_with_reward["format_acc"].mean().cpu().item()
group_ans_acc_mean = group_with_reward["ans_acc"].mean().cpu().item()
group_response_len = (
(
group_with_reward["response_idx"][:, 1]
- group_with_reward["response_idx"][:, 0]
+ 1
)
.type(torch.float32)
.mean()
.cpu()
.item()
# [batch_size, num_generations, ...] -> [batch_size * num_generations, ...]
raw_batch_with_reward = self.calculate_reward({k:v.view(-1, v.size(-1)) if k!='temperature' else v for k, v in raw_batch.items()})
raw_batch_with_reward = {k: v.view(-1, self.num_generations, v.size(-1)) if k!='temperature' else v for k, v in raw_batch_with_reward.items()}
# [batch_size, num_generations] -> [batch_size]
group_reward_mean = raw_batch_with_reward["reward"][:,:,0].mean(dim=-1)
group_format_acc_mean = raw_batch_with_reward["format_acc"][:,:,0].mean(dim=-1)
group_ans_acc_mean = raw_batch_with_reward["ans_acc"][:,:,0].mean(dim=-1)
group_response_len = (
(raw_batch_with_reward["response_idx"][:, :, 1] - raw_batch_with_reward["response_idx"][:, :, 0] + 1)
.type(torch.float32)
.mean(dim=-1)
)
effective_group_mask = None
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True):
# filter the group based on the reward and accuracy
effective_group_mask = torch.logical_and(
group_ans_acc_mean > self.filter_range[0], group_ans_acc_mean < self.filter_range[1]
)
if self.grpo_config.get("dynamic_batching", True):
filtered_group = self.prompt_level_filtering(group_with_reward)
recv_effective_count += 1 if filtered_group is not None else 0
raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]]
for group_idx, group_with_reward in enumerate(raw_batch_with_reward):
self.buffer.append(
[
filtered_group,
group_reward_mean,
group_format_acc_mean,
group_ans_acc_mean,
group_response_len,
group_with_reward if effective_group_mask is None or effective_group_mask[group_idx] else None,
group_reward_mean[group_idx],
group_format_acc_mean[group_idx],
group_ans_acc_mean[group_idx],
group_response_len[group_idx],
]
)
if self.filter_range is not None:
if effective_group_mask is not None:
print(
f"[T{dist.get_rank()}] Filter recv data: {len(raw_batch)} -> {recv_effective_count}"
f"[T{dist.get_rank()}] Filter recv data: {len(raw_batch)} -> {torch.sum(effective_group_mask).cpu().item()} effective groups"
)
# mapping the effective group to the raw group for indexing
effective_group_to_raw_group_mapping = {}
for buffer_idx in range(len(self.buffer)):
if self.buffer[buffer_idx][0] is not None:
effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = (
buffer_idx
)
# mapping the effective group to the raw group for indexing
effective_group_to_raw_group_mapping = {}
for buffer_idx in range(len(self.buffer)):
if self.buffer[buffer_idx][0] is not None:
effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = (
buffer_idx
)
pbar.set_postfix({"Collect Effective Prompt": f"{len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}"})
while len(effective_group_to_raw_group_mapping) >= self.dp_size * self.minibatch_size:
# on each dp_rank, we use minibatch_size effective samples to form a batch