Merge pull request #6341 from hpcaitech/grpo-code

[feat] Support Code Generation RFT, Move Reward Calculation to Producer
This commit is contained in:
YeAnbang 2025-06-09 17:02:13 +08:00 committed by GitHub
commit c308b42f38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1029 additions and 130 deletions

4
.gitignore vendored
View File

@ -167,3 +167,7 @@ applications/ColossalChat/wandb
applications/ColossalChat/model
applications/ColossalChat/eval
applications/ColossalChat/rollouts
applications/ColossalChat/*.txt
applications/ColossalChat/*.db
applications/ColossalChat/stdin
applications/ColossalChat/*.zip

View File

@ -367,9 +367,9 @@ def apply_chat_template_and_mask(
}
# Format for RL.
gt_answer = None
if "messages" in chat and "gt_answer" in chat:
gt_answer = chat["gt_answer"]
if "messages" in chat:
gt_answer = chat.get("gt_answer", None)
test_cases = chat.get("test_cases", None)
chat = [chat["messages"]]
tokens = []
@ -402,12 +402,14 @@ def apply_chat_template_and_mask(
labels[~torch.tensor(assistant_mask, dtype=torch.bool)] = ignore_idx
if gt_answer is not None:
gt_answer = tokenizer.encode(
gt_answer, padding="max_length", truncation=True, max_length=128, return_tensors="pt"
)
gt_answer = gt_answer.squeeze(1)
return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels, "gt_answer": gt_answer}
elif test_cases is not None:
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels,
"test_cases": test_cases,
}
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
@ -440,3 +442,20 @@ class RawConversationDataset(Dataset):
tokens = apply_chat_template_and_mask(self.tokenizer, message, self.max_length, self.system_prompt)
self.tokenized_texts[index] = dict(tokens)
return self.tokenized_texts[index]
def collate_fn_grpo(batch):
input_ids = [item["input_ids"] for item in batch]
attention_mask = [item["attention_mask"] for item in batch]
labels = [item["labels"] for item in batch]
# Assume input_ids, attention_mask, labels are already of the same length,
# otherwise use pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
input_ids = torch.stack(input_ids)
attention_mask = torch.stack(attention_mask)
labels = torch.stack(labels)
ret = {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}
if "test_cases" in batch[0]:
ret["test_cases"] = [item["test_cases"] for item in batch]
if "gt_answer" in batch[0]:
ret["gt_answer"] = [item["gt_answer"] for item in batch]
return ret

View File

@ -123,21 +123,16 @@ class BaseConsumer:
# calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete),
# we need to calculate the metrics before filtering here for logging
# [batch_size, num_generations, ...] -> [batch_size * num_generations, ...]
raw_batch_with_reward = self.calculate_reward(
{k: v.view(-1, v.size(-1)) if k != "temperature" else v for k, v in raw_batch.items()}
)
raw_batch_with_reward = {
raw_batch = {
k: v.view(-1, self.num_generations, v.size(-1)) if k != "temperature" else v
for k, v in raw_batch_with_reward.items()
for k, v in raw_batch.items()
}
# [batch_size, num_generations] -> [batch_size]
reward = raw_batch_with_reward["reward"][:, :, 0]
format_acc = raw_batch_with_reward["format_acc"][:, :, 0]
ans_acc = raw_batch_with_reward["ans_acc"][:, :, 0]
reward = raw_batch["reward"][:, :, 0]
format_acc = raw_batch["format_acc"][:, :, 0]
ans_acc = raw_batch["ans_acc"][:, :, 0]
response_len = (
raw_batch_with_reward["response_idx"][:, :, 1]
- raw_batch_with_reward["response_idx"][:, :, 0]
+ 1
raw_batch["response_idx"][:, :, 1] - raw_batch["response_idx"][:, :, 0] + 1
).type(torch.float32)
effective_group_mask = None
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True):
@ -146,8 +141,8 @@ class BaseConsumer:
effective_group_mask = torch.logical_and(
group_ans_acc_mean > self.filter_range[0], group_ans_acc_mean < self.filter_range[1]
)
raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]]
for group_idx, group_with_reward in enumerate(raw_batch_with_reward):
raw_batch = unbind_batch(raw_batch) # List[Dict[str, torch.Tensor]]
for group_idx, group_with_reward in enumerate(raw_batch):
self.buffer.append(
[
(
@ -163,7 +158,7 @@ class BaseConsumer:
)
if effective_group_mask is not None:
print(
f"[T{dist.get_rank()}] Filter recv data: {len(raw_batch_with_reward)} -> {torch.sum(effective_group_mask).cpu().item()} effective groups"
f"[T{dist.get_rank()}] Filter recv data: {len(raw_batch)} -> {torch.sum(effective_group_mask).cpu().item()} effective groups"
)
# mapping the effective group to the raw group for indexing
effective_group_to_raw_group_mapping = {}

View File

@ -1,13 +1,11 @@
from contextlib import nullcontext
from typing import Any, Dict, Optional
from typing import Any, Optional
import ray
import torch
import wandb
from coati.distributed.consumer import BaseConsumer
from coati.distributed.loss import PolicyLoss
from coati.distributed.reward.reward_fn import boxed_math_reward_fn, math_reward_fn
from coati.distributed.reward.verifiable_reward import VerifiableReward
from coati.distributed.utils import calc_action_log_probs
from coati.trainer.utils import all_reduce_mean, all_reduce_sum
from transformers import AutoModelForCausalLM, AutoTokenizer
@ -119,20 +117,7 @@ class GRPOConsumer(BaseConsumer):
"either max_tokens (vllm) or max_new_tokens (transformers) must be set in generate_config."
)
# Initialize verifiable reward.
response_format_tags = grpo_config.get("response_format_tags", None)
reward_model_kwargs = {
k: v
for k, v in grpo_config.items()
if k in ["soft_over_length_punishment", "max_new_tokens", "cache_length"]
}
self.reward_model = VerifiableReward(
reward_fns=[
math_reward_fn if grpo_config.get("reward_fn_type") == "think_answer_tags" else boxed_math_reward_fn
],
tokenizer=self.tokenizer,
tags=response_format_tags,
**reward_model_kwargs,
)
grpo_config.get("response_format_tags", None)
self.global_step = 0
self.lr_scheduler = CosineAnnealingWarmupLR(
@ -498,40 +483,6 @@ class GRPOConsumer(BaseConsumer):
else:
return None
def calculate_reward(self, rollout: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate the group reward for the given rollout group.
Args:
rollout_group (Dict[str, Any]):
a group of samples generated by the model from the same prompt
contain the following keys:
"input_ids": torch.Tensor, [num_of_generation, prompt_length + response_length]
"attention_mask": torch.Tensor, [num_of_generation, prompt_length + response_length]
"action_mask": torch.Tensor, [num_of_generation, response_length]
"action_log_probs": torch.Tensor, [num_of_generation, response_length]
"response_idx": int, torch.Tensor, [num_of_generation, 2]
"gt_answer": torch.Tensor, [num_of_generation, 128]
"temperature": torch.Tensor, [] (scalar)
Returns:
Dict[str, Any]: The new group data with calculated reward.
"""
reward_model_output = self.reward_model(
rollout["input_ids"],
gt_answer=rollout["gt_answer"],
response_idx=rollout["response_idx"],
)
# [num_of_generation]
reward = torch.tensor([value[0] for value in reward_model_output]).to(rollout["input_ids"].device)
format_acc = torch.tensor([value[1] for value in reward_model_output]).to(rollout["input_ids"].device)
ans_acc = torch.tensor([value[2] for value in reward_model_output]).to(rollout["input_ids"].device)
rollout["reward"] = reward.view((-1, 1))
rollout["format_acc"] = format_acc.view((-1, 1))
rollout["ans_acc"] = ans_acc.view((-1, 1))
return rollout
def state_dict(self):
self.policy_model._force_wait_all_gather()
model = self.policy_model.unwrap()

View File

@ -74,9 +74,8 @@ class TransformersInferenceBackend(BaseInferenceBackend):
micro_batch_size = input_ids.size(0)
input_ids = input_ids.to(get_current_device())
attention_mask = attention_mask.to(get_current_device())
gt_answer = None
if "gt_answer" in kwargs:
gt_answer = kwargs.pop("gt_answer")
gt_answer = kwargs.pop("gt_answer", None)
test_cases = kwargs.pop("test_cases", None)
if self.num_generations > 1:
input_ids = input_ids.repeat_interleave(self.num_generations, dim=0)
attention_mask = attention_mask.repeat_interleave(self.num_generations, dim=0)
@ -116,8 +115,9 @@ class TransformersInferenceBackend(BaseInferenceBackend):
data = {k: v.view(micro_batch_size, self.num_generations, v.size(-1)) for k, v in data.items()}
if gt_answer is not None:
# repeat gt_answer for each prompt.
data["gt_answer"] = gt_answer.repeat_interleave(self.num_generations, dim=1)
data["gt_answer"] = gt_answer
if test_cases is not None:
data["test_cases"] = test_cases
data = {k: v.to(get_current_device()) for k, v in data.items()}
return data
@ -269,11 +269,11 @@ class VLLMInferenceBackend(BaseInferenceBackend):
}
data = {k: v.view(micro_batch_size, -1, v.size(-1)) for k, v in data.items()}
if "gt_answer" in kwargs:
# repeat gt_answer for each prompt.
data["gt_answer"] = kwargs["gt_answer"].repeat_interleave(data["input_ids"].size(1), dim=1)
data = {k: v.to(get_current_device()) for k, v in data.items()}
if "gt_answer" in kwargs:
data["gt_answer"] = kwargs["gt_answer"]
if "test_cases" in kwargs:
data["test_cases"] = kwargs["test_cases"]
return data
def load_state_dict(self, state_dict: Dict[str, torch.Tensor]) -> None:

View File

@ -37,7 +37,6 @@ def launch_distributed(
train_batch_size: int,
train_minibatch_size: int,
train_dataset_config: Dict[str, Any],
dataloaders_config: Dict[str, Any],
inference_model_config: Dict[str, Any],
generate_config: Dict[str, Any],
train_model_config: Dict[str, Any],
@ -89,7 +88,6 @@ def launch_distributed(
num_episodes=num_episodes,
batch_size=inference_batch_size,
train_dataset_config=train_dataset_config,
dataloaders_config=dataloaders_config,
model_config=inference_model_config,
generate_config=generate_config,
tokenizer_config=tokenizer_config,
@ -99,8 +97,7 @@ def launch_distributed(
consumer_plugin_config=plugin_config,
eval_dataset_config=eval_dataset_config,
eval_interval=eval_interval,
evaluation_function_type=grpo_config["reward_fn_type"],
response_format_tags=grpo_config["response_format_tags"],
grpo_config=grpo_config,
eval_save_dir=eval_save_dir,
eval_generation_config=eval_generation_config,
project_name=project_name,

View File

@ -8,8 +8,9 @@ import ray.util.collective as cc
import torch
import tqdm
import wandb
from coati.dataset.loader import RawConversationDataset
from coati.distributed.reward.reward_fn import boxed_math_reward_fn, math_reward_fn
from coati.dataset.loader import RawConversationDataset, collate_fn_grpo
from coati.distributed.reward.reward_fn import boxed_math_reward_fn, code_reward_fn, math_reward_fn
from coati.distributed.reward.verifiable_reward import VerifiableReward
from ray.util.collective import allreduce
from ray.util.collective.types import Backend, ReduceOp
from torch.utils.data import DataLoader, DistributedSampler
@ -36,7 +37,6 @@ class BaseProducer:
num_episodes: int,
batch_size: int,
train_dataset_config: Dict[str, Any],
dataloaders_config: Dict[str, Any],
model_config: Dict[str, Any],
generate_config: Dict[str, Any],
tokenizer_config: Optional[Dict[str, Any]] = None,
@ -45,8 +45,7 @@ class BaseProducer:
consumer_plugin_config: Dict[str, Any] = None,
eval_dataset_config=None,
eval_interval=-1, # disable evaluation
evaluation_function_type="think_answer_tags",
response_format_tags=None,
grpo_config: Dict[str, Any] = None,
eval_save_dir: str = "./eval",
project_name: str = None,
run_name: str = None,
@ -75,6 +74,13 @@ class BaseProducer:
self.eval_mode = False
self.log_rollout_interval = log_rollout_interval
self.latest_rollout_log_step = -1
self.grpo_config = grpo_config
reward_model_kwargs = {
k: v
for k, v in grpo_config.items()
if k in ["soft_over_length_punishment", "max_new_tokens", "cache_length"]
}
self.response_format_tags = grpo_config.get("response_format_tags", None)
if producer_idx == 0:
if os.path.exists(rollout_log_file):
raise ValueError(
@ -120,6 +126,7 @@ class BaseProducer:
),
num_workers=4,
drop_last=True,
collate_fn=collate_fn_grpo,
)
self.eval_dataset_config = eval_dataset_config
@ -142,17 +149,25 @@ class BaseProducer:
drop_last=False,
seed=42,
),
collate_fn=collate_fn_grpo,
)
if evaluation_function_type == "think_answer_tags":
if grpo_config["reward_fn_type"] == "think_answer_tags":
self.evaluation_function = math_reward_fn
elif evaluation_function_type == "boxed":
elif grpo_config["reward_fn_type"] == "boxed":
self.evaluation_function = boxed_math_reward_fn
elif grpo_config["reward_fn_type"] == "code":
self.evaluation_function = code_reward_fn
else:
raise ValueError(f"Unknown evaluation function type {evaluation_function_type}")
self.response_format_tags = response_format_tags
raise ValueError(f"Unknown evaluation function type {grpo_config['reward_fn_type']}")
else:
print("No eval dataset provided, skip eval")
self.device = get_current_device()
self.reward_model = VerifiableReward(
reward_fns=[self.evaluation_function], # multiple reward functions can be added here
tokenizer=self.tokenizer,
tags=self.response_format_tags,
**reward_model_kwargs,
)
# init backend
if backend in BACKEND_MAP:
@ -215,7 +230,13 @@ class BaseProducer:
eval_results = eval_results + [
self.evaluation_function(
eval_outputs["input_ids"][m][n],
eval_outputs["gt_answer"][m][n],
eval_outputs[
(
"test_cases"
if self.grpo_config["reward_fn_type"] == "code"
else "gt_answer"
)
][m],
eval_outputs["response_idx"][m][n],
tokenizer=self.tokenizer,
eval_mode=True,
@ -251,6 +272,45 @@ class BaseProducer:
outputs["temperature"] = torch.tensor(
[self.model.generate_config["temperature"]] * outputs["input_ids"].size(0)
).to(outputs["input_ids"].device)
bs, num_gen = outputs["input_ids"].size(0), outputs["input_ids"].size(1)
if self.grpo_config["reward_fn_type"] == "code":
test_cases = []
for prompt_id in range(bs):
test_cases.extend([outputs["test_cases"][prompt_id]] * num_gen)
reward_model_output = self.reward_model(
outputs["input_ids"].view((-1, outputs["input_ids"].size(-1))),
test_cases=test_cases,
response_idx=outputs["response_idx"].view((-1, 2)),
)
else:
gt_answer = []
for prompt_id in range(bs):
gt_answer.extend([outputs["gt_answer"][prompt_id]] * num_gen)
reward_model_output = self.reward_model(
outputs["input_ids"].view((-1, outputs["input_ids"].size(-1))),
gt_answer=gt_answer,
response_idx=outputs["response_idx"].view((-1, 2)),
)
outputs["reward"] = (
torch.tensor([value[0] for value in reward_model_output])
.to(outputs["input_ids"].device)
.view((bs, num_gen, 1))
)
outputs["format_acc"] = (
torch.tensor([value[1] for value in reward_model_output])
.to(outputs["input_ids"].device)
.view((bs, num_gen, 1))
)
outputs["ans_acc"] = (
torch.tensor([value[2] for value in reward_model_output])
.to(outputs["input_ids"].device)
.view((bs, num_gen, 1))
)
if "gt_answer" in outputs:
outputs.pop("gt_answer")
if "test_cases" in outputs:
outputs.pop("test_cases")
print(f"[P{self.producer_idx}] Send data {[(k, v.shape) for k, v in outputs.items()]}")
outputs = pre_send(outputs)
ray_broadcast_tensor_dict(
@ -315,7 +375,6 @@ class SimpleProducer(BaseProducer):
num_episodes,
batch_size,
train_dataset_config,
dataloaders_config,
model_config,
generate_config,
tokenizer_config=None,
@ -325,8 +384,7 @@ class SimpleProducer(BaseProducer):
consumer_plugin_config=None,
eval_dataset_config=None,
eval_interval=-1, # disable evaluation
evaluation_function_type="think_answer_tags",
response_format_tags=None,
grpo_config: Dict[str, Any] = None,
eval_save_dir: str = "./eval",
eval_generation_config={},
project_name: str = None,
@ -342,7 +400,6 @@ class SimpleProducer(BaseProducer):
num_episodes,
batch_size,
train_dataset_config,
dataloaders_config,
model_config,
generate_config,
tokenizer_config,
@ -351,8 +408,7 @@ class SimpleProducer(BaseProducer):
consumer_plugin_config,
eval_dataset_config=eval_dataset_config,
eval_interval=eval_interval,
evaluation_function_type=evaluation_function_type,
response_format_tags=response_format_tags,
grpo_config=grpo_config,
eval_save_dir=eval_save_dir,
project_name=project_name,
run_name=run_name,

View File

@ -0,0 +1,669 @@
# Code from the verl Project (https://github.com/agentica-project/rllm),
# which itself is adapted from Prime (https://github.com/PRIME-RL/PRIME)
#
# Copyright 2024 ByteDance Group
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import faulthandler
import json
import platform
# to run the solution files we're using a timing based approach
import signal
import sys
import traceback
# used for debugging to time steps
from datetime import datetime
from enum import Enum
# for capturing the stdout
from io import StringIO
# used for testing the code that reads from input
from unittest.mock import mock_open, patch
import numpy as np
from pyext import RuntimeModule
def truncatefn(s, length=300):
assert isinstance(s, str)
if len(s) <= length:
return s
return s[: length // 2] + "...(truncated) ..." + s[-length // 2 :]
class CODE_TYPE(Enum):
call_based = 0
standard_input = 1
# used to capture stdout as a list
# from https://stackoverflow.com/a/16571630/6416660
# alternative use redirect_stdout() from contextlib
class Capturing(list):
def __enter__(self):
self._stdout = sys.stdout
sys.stdout = self._stringio = StringIO()
# Make closing the StringIO a no-op
self._stringio.close = lambda x: 1
return self
def __exit__(self, *args):
self.append(self._stringio.getvalue())
del self._stringio # free up some memory
sys.stdout = self._stdout
def only_int_check(val):
return isinstance(val, int)
def string_int_check(val):
return isinstance(val, str) and val.isdigit()
def combined_int_check(val):
return only_int_check(val) or string_int_check(val)
def clean_traceback(error_traceback):
file_start = error_traceback.find('File "<string>"')
# print(file_start)
error_traceback = "Traceback (most recent call last):\n " + error_traceback[file_start:]
return error_traceback
def run_test(in_outs, test=None, debug=False, timeout=15):
"""
if test(generated_code) is not None it'll try to run the code.
otherwise it'll just return an input and output pair.
"""
# Disable functionalities that can make destructive changes to the test.
reliability_guard()
if debug:
print(f"start = {datetime.now().time()}")
if in_outs:
if in_outs.get("fn_name") is None:
which_type = CODE_TYPE.standard_input # Standard input
method_name = None
else:
which_type = CODE_TYPE.call_based # Call-based
method_name = in_outs["fn_name"]
if debug:
print(f"loaded input_output = {datetime.now().time()}")
if test is None:
raise AssertionError("should not happen: test code is none")
elif test is not None:
results = []
sol = "from string import *\nfrom re import *\nfrom datetime import *\nfrom collections import *\nfrom heapq import *\nfrom bisect import *\nfrom copy import *\nfrom math import *\nfrom random import *\nfrom statistics import *\nfrom itertools import *\nfrom functools import *\nfrom operator import *\nfrom io import *\nfrom sys import *\nfrom json import *\nfrom builtins import *\nfrom typing import *\nimport string\nimport re\nimport datetime\nimport collections\nimport heapq\nimport bisect\nimport copy\nimport math\nimport random\nimport statistics\nimport itertools\nimport functools\nimport operator\nimport io\nimport sys\nimport json\nsys.setrecursionlimit(6*10**5)\n" # noqa: E501
if debug:
print(f"loading test code = {datetime.now().time()}")
if which_type == CODE_TYPE.call_based:
sol += test
if debug:
print(f"sol = {sol}")
signal.alarm(timeout)
try:
tmp_sol = RuntimeModule.from_string("tmp_sol", "", sol)
tmp = tmp_sol if "class Solution" not in test else tmp_sol.Solution()
signal.alarm(0)
except Exception as e:
signal.alarm(0)
error_traceback = traceback.format_exc()
if debug:
print(f"type 0 compilation error = {e}")
results.append(-2)
return results, {
"error": repr(e),
# "error_code": -1,
# "error_message": "Compilation Error",
"traceback": clean_traceback(error_traceback),
}
signal.alarm(0)
elif which_type == CODE_TYPE.standard_input:
# sol
# if code has if __name__ == "__main__": then remove it
try:
astree = ast.parse(test)
last_block = astree.body[-1]
if isinstance(last_block, ast.If):
condition = last_block.test
if ast.unparse(condition).strip() == "__name__ == '__main__'":
test = ast.unparse(astree.body[:-1]) + "\n" + ast.unparse(last_block.body)
except Exception:
pass
tmp_test = test.split("\n")
new_test = []
for x in tmp_test:
if (not x.startswith("from ")) and (not x.startswith("import ")):
new_test.append("\t" + x + "\n")
else:
new_test.append(x + "\n")
tmp_test = new_test
new_test = ""
started = False
for i in tmp_test:
if i.startswith("\t") and not started:
new_test += "stdin = sys.stdin\nstdout = sys.stdout\n"
new_test += "def code():\n"
new_test += i
started = True
elif started and ((i.startswith("from ")) or (i.startswith("import "))):
new_test += "\t" + i
else:
new_test += i
tmp_test = new_test
sol += tmp_test
if debug:
print(f"sol = {sol}")
method_name = "code"
signal.alarm(timeout)
try:
tmp_sol = RuntimeModule.from_string("tmp_sol", "", sol)
tmp = tmp_sol
signal.alarm(0)
except Exception as e:
signal.alarm(0)
error_traceback = traceback.format_exc()
if debug:
print(f"type 1 compilation error = {e}")
results.append(-2)
return results, {
"error": repr(e),
# "error_code": -1,
# "error_message": "Compilation Error",
"traceback": clean_traceback(error_traceback),
}
signal.alarm(0)
if debug:
print(f"get method = {datetime.now().time()}")
try:
method = getattr(tmp, method_name) # get_attr second arg must be str
except Exception:
signal.alarm(0)
error_traceback = traceback.format_exc()
error_info = sys.exc_info()
print(f"unable to get function error = {error_info}")
results.append(-2)
return results, {
"error": repr(error_info),
# "error_code": -1,
# "error_message": "Unable to extract code",
"traceback": clean_traceback(error_traceback),
}
for index, inputs in enumerate(in_outs["inputs"]):
raw_inputs = inputs
raw_outputs = in_outs["outputs"][index]
if which_type == CODE_TYPE.call_based:
inputs = [json.loads(line) for line in inputs.split("\n")]
in_outs["outputs"][index] = json.loads(in_outs["outputs"][index])
truncate_line_size = 300 // (raw_inputs.count("\n") + 1)
raw_inputs = "\n".join(
[truncatefn(line, truncate_line_size) for line in raw_inputs.strip().split("\n")]
)
raw_outputs = truncatefn(raw_outputs, 200)
else:
raw_inputs = truncatefn(raw_inputs)
raw_outputs = truncatefn(raw_outputs, 200)
# JSON forces dictionaries to have string keys; this undoes this (assuming a singleton list)
try:
if isinstance(inputs[0], dict):
inputs = [{int(k): v for k, v in inputs[0].items()}]
except Exception:
pass
try:
if isinstance(in_outs["outputs"][index], dict):
in_outs["outputs"][index] = [{int(k): v for k, v in in_outs["outputs"][index].items()}]
except Exception:
pass
try:
if isinstance(in_outs["outputs"][index][0], dict):
in_outs["outputs"][index] = [{int(k): v for k, v in in_outs["outputs"][index][0].items()}]
except Exception:
pass
if debug:
print(
f"time: {datetime.now().time()} testing index = {index} inputs = {inputs}, {type(inputs)}. type = {which_type}"
)
if which_type == CODE_TYPE.call_based: # Call-based
signal.alarm(timeout)
faulthandler.enable()
try:
output = method(*inputs)
raw_true_output = output
raw_true_output_copy = json.dumps(output)
raw_true_output_copy = truncatefn(raw_true_output_copy, 200)
# ground truth sequences are not tuples
if isinstance(output, tuple):
output = list(output)
tmp_result = output == in_outs["outputs"][index]
if isinstance(in_outs["outputs"][index], list) and in_outs["outputs"][index]:
tmp_result = tmp_result or (output == in_outs["outputs"][index][0])
# ground truth sequences are not tuples
try:
if isinstance(output[0], tuple):
tmp_result = tmp_result or ([list(x) for x in output] == in_outs["outputs"][index][0])
except Exception:
pass
results.append(tmp_result)
if tmp_result is not True:
return results, {
"output": raw_true_output_copy,
"expected": raw_outputs,
"inputs": raw_inputs,
# "error_code": -2,
"error_message": "Wrong Answer",
}
# reset the alarm
signal.alarm(0)
except Exception as e:
signal.alarm(0)
error_traceback = traceback.format_exc()
faulthandler.disable()
if debug:
print(f"Standard input runtime error or time limit exceeded error = {e}")
results.append(-1)
return results, {
"error": repr(e),
"traceback": clean_traceback(error_traceback),
}
faulthandler.disable()
signal.alarm(0)
if debug:
print(
f"outputs = {output}, test outputs = {in_outs['outputs'][index]}, inputs = {inputs}, {type(inputs)}, {output == [in_outs['outputs'][index]]}"
)
elif which_type == CODE_TYPE.standard_input: # Standard input
faulthandler.enable()
passed = False
if isinstance(inputs, list):
inputs = "\n".join(inputs)
if isinstance(in_outs["outputs"][index], list):
in_outs["outputs"][index] = "\n".join(in_outs["outputs"][index])
signal.alarm(timeout)
with Capturing() as output:
try:
call_method(method, inputs)
# reset the alarm
signal.alarm(0)
passed = True
except Exception as e:
# runtime error or took too long
signal.alarm(0)
error_traceback = traceback.format_exc()
print(f"Call-based runtime error or time limit exceeded error = {repr(e)}{e}")
results.append(-1)
return results, {
"error": repr(e),
"traceback": clean_traceback(error_traceback),
}
signal.alarm(0)
raw_true_output = output[0]
raw_true_output_copy = truncatefn(raw_true_output, 200)
output = raw_true_output.splitlines()
if not passed:
if debug:
nl = "\n"
if not isinstance(inputs, list):
print(
f"not passed output = {output}, test outputs = {in_outs['outputs'][index]}, inputs = {inputs.replace(nl, ' new-line ')}, {type(inputs)}, {output == [in_outs['outputs'][index]]}"
)
else:
print(
f"not passed output = {output}, test outputs = {in_outs['outputs'][index]}, inputs = {inputs}, {type(inputs)}, {output == [in_outs['outputs'][index]]}"
)
continue
if passed and debug:
print(f"==> output = {output}, test outputs = {in_outs['outputs'][index]}")
if custom_compare_(output, in_outs["outputs"][index]):
tmp_result = True
results.append(tmp_result)
continue
# ground truth sequences are expressed as lists not tuples
if isinstance(output, tuple):
output = list(output)
tmp_result = False
try:
tmp_result = output == [in_outs["outputs"][index]]
if isinstance(in_outs["outputs"][index], list):
tmp_result = tmp_result or (output == in_outs["outputs"][index])
if isinstance(output[0], str):
tmp_result = tmp_result or ([e.strip() for e in output] == in_outs["outputs"][index])
except Exception as e:
if debug:
print(f"Failed check1 exception = {e}")
if tmp_result is True:
results.append(tmp_result)
continue
# try one more time without \n
if isinstance(in_outs["outputs"][index], list):
for tmp_index, i in enumerate(in_outs["outputs"][index]):
in_outs["outputs"][index][tmp_index] = i.split("\n")
in_outs["outputs"][index][tmp_index] = [
x.strip() for x in in_outs["outputs"][index][tmp_index] if x
]
else:
in_outs["outputs"][index] = in_outs["outputs"][index].split("\n")
in_outs["outputs"][index] = list(filter(len, in_outs["outputs"][index]))
in_outs["outputs"][index] = list(map(lambda x: x.strip(), in_outs["outputs"][index]))
try:
tmp_result = output == [in_outs["outputs"][index]]
if isinstance(in_outs["outputs"][index], list):
tmp_result = tmp_result or (output == in_outs["outputs"][index])
except Exception as e:
if debug:
print(f"Failed check2 exception = {e}")
if tmp_result is True:
results.append(tmp_result)
continue
# try by converting the output into a split up list too
if isinstance(output, list):
output = list(filter(len, output))
if debug:
nl = "\n"
if not isinstance(inputs, list):
print(
f"@1 output = {output}, test outputs = {in_outs['outputs'][index]}, inputs = {inputs.replace(nl, ' new-line ')}, {type(inputs)}, {output == [in_outs['outputs'][index]]} {tmp_result=}"
)
else:
print(
f"@1 output = {output}, test outputs = {in_outs['outputs'][index]}, inputs = {inputs}, {type(inputs)}, {output == [in_outs['outputs'][index]]} {tmp_result=}"
)
if debug:
print(f"{tmp_result=} @a")
try:
tmp_result = output == [in_outs["outputs"][index]]
if isinstance(in_outs["outputs"][index], list):
tmp_result = tmp_result or (output == in_outs["outputs"][index])
except Exception as e:
if debug:
print(f"Failed check3 exception = {e}")
if debug:
print(f"{tmp_result=} @b")
try:
all_ints = all(
combined_int_check(e1) and combined_int_check(e2)
for e1, e2 in zip(output, in_outs["outputs"][index])
)
if not all_ints:
if debug:
print(
[
combined_int_check(e1) and combined_int_check(e2)
for e1, e2 in zip(output, in_outs["outputs"][index])
]
)
output_float = [float(e) for e in output]
gt_float = [float(e) for e in in_outs["outputs"][index]]
tmp_result = tmp_result or (
(len(output_float) == len(gt_float)) and np.allclose(output_float, gt_float)
)
except Exception:
pass
if debug:
print(f"{tmp_result=} @c")
try:
if isinstance(output[0], list):
all_ints = all(
combined_int_check(e1) and combined_int_check(e2)
for e1, e2 in zip(output[0], in_outs["outputs"][index])
)
if not all_ints:
output_float = [float(e) for e in output[0]]
gt_float = [float(e) for e in in_outs["outputs"][index][0]]
tmp_result = tmp_result or (
(len(output_float) == len(gt_float)) and np.allclose(output_float, gt_float)
)
except Exception:
pass
if tmp_result is True:
results.append(tmp_result)
continue
if debug:
print(f"{tmp_result=} @d")
# try by converting the stuff into split up list
if isinstance(in_outs["outputs"][index], list):
for tmp_index, i in enumerate(in_outs["outputs"][index]):
in_outs["outputs"][index][tmp_index] = set(i.split())
else:
in_outs["outputs"][index] = set(in_outs["outputs"][index].split())
if debug:
print(f"{tmp_result=} @e")
try:
tmp_result = output == in_outs["outputs"][index]
except Exception as e:
if debug:
print(f"Failed check4 exception = {e}")
continue
if tmp_result is True:
results.append(tmp_result)
continue
if debug:
print(f"{tmp_result=} @f")
# try by converting the output into a split up list too
if isinstance(output, list):
for tmp_index, i in enumerate(output):
output[tmp_index] = i.split()
output = list(filter(len, output))
for tmp_index, i in enumerate(output):
output[tmp_index] = set(i)
else:
output = output.split()
output = list(filter(len, output))
output = set(output)
if debug:
print(f"{tmp_result=} @g")
if tmp_result is True and debug:
print("PASSED")
results.append(tmp_result)
if tmp_result is not True:
return results, {
"output": raw_true_output_copy,
"expected": raw_outputs,
"inputs": raw_inputs,
# "error_code": -2,
"error_message": "Wrong Answer",
}
if debug:
nl = "\n"
if not isinstance(inputs, list):
print(
f"@2 output = {output}, test outputs = {in_outs['outputs'][index]}, inputs = {inputs.replace(nl, ' new-line ')}, {type(inputs)}, {output == [in_outs['outputs'][index]]}"
)
else:
print(
f"@2 output = {output}, test outputs = {in_outs['outputs'][index]}, inputs = {inputs}, {type(inputs)}, {output == [in_outs['outputs'][index]]}"
)
print(f"results = {results}")
return results, {}
def custom_compare_(output, ground_truth):
if isinstance(output, list):
output_1 = "\n".join(output)
if stripped_string_compare(output_1, ground_truth):
return True
if isinstance(output, list):
output_2 = [o.lstrip().rstrip() for o in output]
output_2 = "\n".join(output_2)
if stripped_string_compare(output_2, ground_truth):
return True
return False
def stripped_string_compare(s1, s2):
s1 = s1.lstrip().rstrip()
s2 = s2.lstrip().rstrip()
return s1 == s2
def call_method(method, inputs):
if isinstance(inputs, list):
inputs = "\n".join(inputs)
inputs_line_iterator = iter(inputs.split("\n"))
# sys.setrecursionlimit(10000)
# @patch('builtins.input', side_effect=inputs.split("\n"))
@patch("builtins.open", mock_open(read_data=inputs))
@patch("sys.stdin", StringIO(inputs))
@patch("sys.stdin.readline", lambda *args: next(inputs_line_iterator))
@patch("sys.stdin.readlines", lambda *args: inputs.split("\n"))
@patch("sys.stdin.read", lambda *args: inputs)
# @patch('sys.stdout.write', print)
def _inner_call_method(_method):
try:
return _method()
except SystemExit:
pass
finally:
pass
return _inner_call_method(method)
def reliability_guard(maximum_memory_bytes=None):
"""
This disables various destructive functions and prevents the generated code
from interfering with the test (e.g. fork bomb, killing other processes,
removing filesystem files, etc.)
WARNING
This function is NOT a security sandbox. Untrusted code, including, model-
generated code, should not be blindly executed outside of one. See the
Codex paper for more information about OpenAI's code sandbox, and proceed
with caution.
"""
if maximum_memory_bytes is not None:
import resource
resource.setrlimit(resource.RLIMIT_AS, (maximum_memory_bytes, maximum_memory_bytes))
resource.setrlimit(resource.RLIMIT_DATA, (maximum_memory_bytes, maximum_memory_bytes))
if platform.uname().system != "Darwin":
resource.setrlimit(resource.RLIMIT_STACK, (maximum_memory_bytes, maximum_memory_bytes))
faulthandler.disable()
import builtins
builtins.exit = None
builtins.quit = None
import os
os.environ["OMP_NUM_THREADS"] = "1"
os.kill = None
os.system = None # 防止干扰repl评测
os.putenv = None
os.remove = None
os.removedirs = None
os.rmdir = None
os.fchdir = None
os.setuid = None
os.fork = None
os.forkpty = None
os.killpg = None
os.rename = None
os.renames = None
os.truncate = None
os.replace = None
os.unlink = None
os.fchmod = None
os.fchown = None
os.chmod = None
os.chown = None
os.chroot = None
os.lchflags = None
os.lchmod = None
os.lchown = None
os.getcwd = None
os.chdir = None
import shutil
shutil.rmtree = None
shutil.move = None
shutil.chown = None
import subprocess
subprocess.Popen = None # type: ignore
__builtins__["help"] = None
import sys
sys.modules["ipdb"] = None
sys.modules["joblib"] = None
sys.modules["resource"] = None
sys.modules["psutil"] = None
sys.modules["tkinter"] = None

View File

@ -0,0 +1,61 @@
# Code from the verl Project (https://github.com/agentica-project/rllm),
# which itself is adapted from Prime (https://github.com/PRIME-RL/PRIME)
#
# Copyright 2024 ByteDance Group
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import os
import sys
import traceback
from typing import Optional
from .testing_util import run_test
def _temp_run(sample, generation, debug, result, metadata_list, timeout):
with open(os.devnull, "w") as devnull:
sys.stdout = devnull
sys.stderr = devnull
try:
res, metadata = run_test(in_outs=sample, test=generation, debug=debug, timeout=timeout)
result.append(res)
metadata_list.append(metadata)
except Exception:
# print(e) # some tracebacks are extremely long.
traceback.print_exc(10)
result.append([-1 for i in range(len(sample["inputs"]))])
metadata_list.append({})
def check_correctness(in_outs: Optional[dict], generation, timeout=10, debug=True):
"""Check correctness of code generation with a global timeout.
The global timeout is to catch some extreme/rare cases not handled by the timeouts
inside `run_test`"""
manager = multiprocessing.Manager()
result = manager.list()
metadata_list = manager.list()
p = multiprocessing.Process(target=_temp_run, args=(in_outs, generation, debug, result, metadata_list, timeout))
p.start()
p.join(timeout=timeout + 1)
if p.is_alive():
p.kill()
# p.terminate()
if not result:
# consider that all tests failed
result = [[-1 for i in range(len(in_outs["inputs"]))]]
if debug:
print("global timeout")
return result[0], metadata_list

View File

@ -1,7 +1,30 @@
# Copyright 2024 ByteDance Group
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Some functions in this file are adapted from the verl project
under the Apache License 2.0:
https://github.com/volcengine/verl
"""
import json
import torch
from latex2sympy2_extended import NormalizationConfig
from math_verify import ExprExtractionConfig, LatexExtractionConfig, parse, verify
from .code_reward.utils import check_correctness as check_correctness_code
from .reward_utils import extract_boxed_solution, extract_solution, validate_response_structure
CANNOT_PARSE_GT_ANSWER = -1
@ -98,15 +121,11 @@ def math_reward_fn(input_ids, gt_answer, response_idx, **kwargs):
raise ValueError("no gt_answer is provided, please check your training dataset.")
decoded_final_answer = tokenizer.decode(input_ids[s : e + 1], skip_special_tokens=True)
gt_answer = tokenizer.decode(gt_answer.squeeze(0), skip_special_tokens=True)
final_answer, processed_str = extract_solution(decoded_final_answer)
format_valid = validate_response_structure(processed_str, kwargs["tags"])
# Check format accuracy
if format_valid:
format_acc += 1
# Check answer accuracy, answer is considered correct if the answer is correct and the format is valid
if final_answer is not None:
if eval_mode or format_valid:
@ -114,6 +133,10 @@ def math_reward_fn(input_ids, gt_answer, response_idx, **kwargs):
if not eval_mode:
reward = reward + length_reward
# Check format accuracy
if format_valid:
format_acc += 1
# Check if the sequence is over length
if not eval_mode and res_length >= max_new_tokens:
reward *= 0.0
@ -138,7 +161,6 @@ def boxed_math_reward_fn(input_ids, gt_answer, response_idx, **kwargs):
tokenizer = kwargs["tokenizer"]
eval_mode = kwargs.get("eval_mode", False)
soft_over_length_punishment = kwargs.get("soft_over_length_punishment", False)
format_score = 0.0
acc_score = 10.0
reward = torch.tensor(0.0)
format_acc = torch.tensor(0.0)
@ -161,7 +183,6 @@ def boxed_math_reward_fn(input_ids, gt_answer, response_idx, **kwargs):
decoded_final_answer = tokenizer.decode(input_ids[s : e + 1], skip_special_tokens=True)
gt_answer = tokenizer.decode(gt_answer.squeeze(0), skip_special_tokens=True)
final_answer = extract_boxed_solution(decoded_final_answer)
format_valid = final_answer is not None
if "tags" in kwargs and kwargs["tags"]:
@ -169,10 +190,6 @@ def boxed_math_reward_fn(input_ids, gt_answer, response_idx, **kwargs):
format_valid = format_valid and all(
[decoded_final_answer.count(tags[tag]["text"]) == tags[tag]["num_occur"] for tag in tags]
)
# Check format accuracy
if format_valid:
format_acc += 1
reward += format_score
# Check answer accuracy, answer is considered correct if the answer is correct and the format is valid
if final_answer is not None:
@ -181,6 +198,10 @@ def boxed_math_reward_fn(input_ids, gt_answer, response_idx, **kwargs):
if not eval_mode:
reward = reward + length_reward
# Check format accuracy
if format_valid:
format_acc += 1
# Check if the sequence is over length
if not eval_mode and res_length >= max_new_tokens:
reward *= 0.0
@ -199,3 +220,78 @@ def boxed_math_reward_fn(input_ids, gt_answer, response_idx, **kwargs):
"response_length": res_length,
"reward": reward.item(),
}
def code_reward_fn(input_ids, test_cases, response_idx, **kwargs):
tokenizer = kwargs["tokenizer"]
eval_mode = kwargs.get("eval_mode", False)
soft_over_length_punishment = kwargs.get("soft_over_length_punishment", False)
acc_score = 10.0
reward = torch.tensor(0.0)
format_acc = torch.tensor(0.0)
ans_acc = torch.tensor(0.0)
s, e = response_idx[0], response_idx[1]
length_reward = 0.0
res_length = e.item() - s.item() + 1
if not eval_mode:
max_new_tokens = kwargs["max_new_tokens"]
else:
max_new_tokens = -1 # for eval mode, we don't need to check the length
if not eval_mode and soft_over_length_punishment:
cache_length = kwargs["cache_length"]
if max_new_tokens - cache_length < res_length < max_new_tokens:
length_reward = ((max_new_tokens - cache_length) - res_length) / cache_length * acc_score
# try to get code solution from completion. if the completion is pure code, this will not take effect.
decoded_final_answer = tokenizer.decode(input_ids[s : e + 1], skip_special_tokens=True)
solution = decoded_final_answer.split("```python")[-1].split("```")[0]
format_valid = False
if "```python" in decoded_final_answer:
format_valid = solution is not None
# Check format accuracy
if format_valid:
format_acc += 1
try:
try:
if not isinstance(test_cases, dict):
test_cases = json.loads(test_cases)
except Exception as e:
print(f"Error {e}: Cannot parse test cases.")
raise e
# Complete check on all in-out pairs first. If there is no failure, per-sample test can be skipped.
try:
res, metadata = check_correctness_code(in_outs=test_cases, generation=solution, timeout=10, debug=True)
metadata = dict(enumerate(metadata))[0]
success = all(map(lambda x: x is True, res))
if success:
ans_acc += 1
if eval_mode or format_valid:
reward += acc_score
if not eval_mode:
reward = reward + length_reward
except Exception:
pass
# Check if the sequence is over length
if not eval_mode and res_length >= max_new_tokens:
reward *= 0.0
except Exception:
pass
if not eval_mode:
return torch.tensor([reward, format_acc, ans_acc]).to(input_ids.device)
else:
prompt = tokenizer.decode(input_ids[:s], skip_special_tokens=True)
return {
"prompt": prompt,
"prediction": decoded_final_answer,
"gold": test_cases["outputs"],
"parsed": solution,
"format_valid": format_acc.item(),
"ans_valid": ans_acc.item(),
"response_length": res_length,
"reward": reward.item(),
}

View File

@ -2,6 +2,7 @@
Function-based reward verification module.
"""
import inspect
from typing import Any, Dict, List
import torch
@ -15,7 +16,8 @@ class VerifiableReward:
def __call__(
self,
input_ids: torch.LongTensor,
gt_answer: List[torch.Tensor] = None,
gt_answer: List[str] = None,
test_cases: List[str] = None,
response_idx: List[torch.Tensor] = None,
) -> torch.Tensor:
# Get batch size
@ -26,18 +28,44 @@ class VerifiableReward:
# Loop through reward functions
for reward_fn in self.reward_fns:
# Apply the reward function to the entire batch at once
reward_batch = torch.stack(
[
reward_fn(
input_ids[i],
gt_answer=gt_answer[i],
response_idx=response_idx[i],
**self.kwargs,
)
for i in range(bs)
],
dim=0,
)
if "gt_answer" in inspect.getfullargspec(reward_fn).args:
reward_batch = torch.stack(
[
reward_fn(
input_ids[i],
gt_answer=gt_answer[i],
response_idx=response_idx[i],
**self.kwargs,
)
for i in range(bs)
],
dim=0,
)
elif "test_cases" in inspect.getfullargspec(reward_fn).args:
reward_batch = torch.stack(
[
reward_fn(
input_ids[i],
test_cases=test_cases[i],
response_idx=response_idx[i],
**self.kwargs,
)
for i in range(bs)
],
dim=0,
)
else:
reward_batch = torch.stack(
[
reward_fn(
input_ids[i],
response_idx=response_idx[i],
**self.kwargs,
)
for i in range(bs)
],
dim=0,
)
rewards += reward_batch
return rewards

View File

@ -22,3 +22,6 @@ sentencepiece==0.1.99
flash-attn
tiktoken
jsonlines
math_verify
latex2sympy2_extended
pyext

View File

@ -101,7 +101,7 @@ if __name__ == "__main__":
"--reward-type",
type=str,
default="think_answer_tags",
choices=["think_answer_tags", "boxed"],
choices=["think_answer_tags", "boxed", "code"],
help="Reward type for GRPO.",
)
parser.add_argument(
@ -167,10 +167,29 @@ if __name__ == "__main__":
if args.master_address is None:
# Default settings: Using single machine
ray.init(address="local", namespace="ray-example")
ray.init(
address="local",
namespace="ray-example",
runtime_env={
"env_vars": {
# "RAY_DEBUG_POST_MORTEM": "1" # enable post-mortem debugging with ray
"TOKENIZERS_PARALLELISM": "false"
},
},
)
else:
# For ray distributed multi-machine training, Please change _node_ip_address to your IP address of your master node
ray.init(_node_ip_address=args.master_address, namespace="ray-example", _temp_dir=args.ray_dir)
ray.init(
_node_ip_address=args.master_address,
namespace="ray-example",
_temp_dir=args.ray_dir,
runtime_env={
"env_vars": {
# "RAY_DEBUG_POST_MORTEM": "1" # enable post-mortem debugging with ray
"TOKENIZERS_PARALLELISM": "false"
},
},
)
if args.top_k is None:
if args.backend == "transformers":
@ -178,6 +197,8 @@ if __name__ == "__main__":
elif args.backend == "vllm":
args.top_k = -1
os.environ["TOKENIZERS_PARALLELISM"] = "false" # Disable tokenizers parallelism to avoid deadlock
inference_model_config = dict(path=args.model)
train_model_config = dict(path=args.model, use_flash_attention_2=True, use_cache=False)
generate_config = dict(top_k=args.top_k, top_p=args.top_p, temperature=args.temperature)
@ -288,7 +309,6 @@ if __name__ == "__main__":
"max_length": args.max_prompt_tokens,
"system_prompt": args.system_prompt,
},
dataloaders_config={},
inference_model_config=inference_model_config,
generate_config=generate_config,
num_generations=args.num_generations,