mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-06-24 14:33:20 +00:00
add pp support
This commit is contained in:
parent
d961a5f725
commit
09a3173a49
1
.gitignore
vendored
1
.gitignore
vendored
@ -164,3 +164,4 @@ coverage.xml
|
|||||||
applications/ColossalChat/logs
|
applications/ColossalChat/logs
|
||||||
applications/ColossalChat/tests/logs
|
applications/ColossalChat/tests/logs
|
||||||
applications/ColossalChat/wandb
|
applications/ColossalChat/wandb
|
||||||
|
applications/ColossalChat/model
|
||||||
|
@ -94,7 +94,6 @@ class BaseConsumer:
|
|||||||
i = 0
|
i = 0
|
||||||
for _ in range(self.num_recv_per_update):
|
for _ in range(self.num_recv_per_update):
|
||||||
# receive data from producers
|
# receive data from producers
|
||||||
|
|
||||||
for r in range(self.num_producers):
|
for r in range(self.num_producers):
|
||||||
print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}")
|
print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}")
|
||||||
self.buffer.extend(
|
self.buffer.extend(
|
||||||
|
@ -94,9 +94,7 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
|
|
||||||
self.policy_loss_fn = PolicyLoss()
|
self.policy_loss_fn = PolicyLoss()
|
||||||
self.global_step = 0
|
self.global_step = 0
|
||||||
if use_wandb and self.rank == 0:
|
self.use_wandb = use_wandb
|
||||||
name = f"{generate_config['backend']}_bs_{self.batch_size*self.world_size}_temp_{generate_config['temperature']:.01f}_top_p_{generate_config['top_p']:.02f}"
|
|
||||||
self.wandb_run = wandb.init(project="GRPO-V1-PP", sync_tensorboard=True, dir="./wandb", name=name)
|
|
||||||
|
|
||||||
self.lr_scheduler = CosineAnnealingWarmupLR(
|
self.lr_scheduler = CosineAnnealingWarmupLR(
|
||||||
optimizer=self.optimizer,
|
optimizer=self.optimizer,
|
||||||
@ -107,10 +105,19 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
super().setup()
|
super().setup()
|
||||||
|
if self.use_wandb and (
|
||||||
|
(not self.plugin.pp_size > 1 and self.rank == 0)
|
||||||
|
or (self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage())
|
||||||
|
):
|
||||||
|
# Initialize wandb.
|
||||||
|
name = f"{self.generate_config['backend']}_bs_{self.batch_size*self.dp_size}_temp_{self.generate_config['temperature']:.01f}_top_p_{self.generate_config['top_p']:.02f}"
|
||||||
|
self.wandb_run = wandb.init(project="GRPO-V1-PP", sync_tensorboard=True, dir="./wandb", name=name)
|
||||||
|
|
||||||
self.policy_model, self.optimizer, _, _, self.lr_scheduler = self.booster.boost(
|
self.policy_model, self.optimizer, _, _, self.lr_scheduler = self.booster.boost(
|
||||||
self.policy_model, self.optimizer, lr_scheduler=self.lr_scheduler
|
self.policy_model, self.optimizer, lr_scheduler=self.lr_scheduler
|
||||||
)
|
)
|
||||||
self.reference_model, *_ = self.booster.boost(self.reference_model)
|
self.reference_model, *_ = self.booster.boost(self.reference_model)
|
||||||
|
self.plugin.logger.set_level("ERROR")
|
||||||
|
|
||||||
def step(self, step_idx: int, **kwargs) -> Optional[float]:
|
def step(self, step_idx: int, **kwargs) -> Optional[float]:
|
||||||
"""
|
"""
|
||||||
@ -168,54 +175,7 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
).repeat_interleave(self.num_generations, dim=0)
|
).repeat_interleave(self.num_generations, dim=0)
|
||||||
)
|
)
|
||||||
mean_kl, mean_loss = [], []
|
mean_kl, mean_loss = [], []
|
||||||
if self.plugin.pp_size > 1:
|
|
||||||
# Support training with PP.
|
|
||||||
data_iter = iter([data])
|
|
||||||
|
|
||||||
with torch.no_grad():
|
|
||||||
reference_model_outputs = self.booster.execute_pipeline(
|
|
||||||
data_iter,
|
|
||||||
self.reference_model,
|
|
||||||
criterion=lambda outputs, inputs: outputs.logits.mean(), # dummy criterion
|
|
||||||
optimizer=None,
|
|
||||||
return_loss=False,
|
|
||||||
return_outputs=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.booster.plugin.stage_manager.is_last_stage():
|
|
||||||
reference_model_logits = reference_model_outputs["outputs"]["logits"]
|
|
||||||
reference_action_log_probs = calc_action_log_probs(
|
|
||||||
reference_model_logits / self.generate_config["temperature"],
|
|
||||||
data["input_ids"],
|
|
||||||
num_action,
|
|
||||||
self.plugin.shard_config,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Dummy reference logprobs for data iterator.
|
|
||||||
reference_action_log_probs = torch.zeros(
|
|
||||||
(old_action_log_probs.size(0), old_action_log_probs.size(1))
|
|
||||||
)
|
|
||||||
|
|
||||||
data["reference_action_log_probs"] = reference_action_log_probs
|
|
||||||
|
|
||||||
data_iter = iter([data])
|
|
||||||
|
|
||||||
def _criterion(outputs, inputs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
outputs = self.booster.execute_pipeline(
|
|
||||||
data_iter,
|
|
||||||
self.policy_model,
|
|
||||||
criterion=_criterion,
|
|
||||||
optimizer=self.optimizer,
|
|
||||||
return_loss=True,
|
|
||||||
)
|
|
||||||
loss = outputs["loss"]
|
|
||||||
|
|
||||||
if self.booster.plugin.stage_manager.is_last_stage():
|
|
||||||
loss = all_reduce_mean(loss, self.plugin)
|
|
||||||
mean_loss.append(loss.data)
|
|
||||||
else:
|
|
||||||
for forward_micro_batch_start in range(0, data["input_ids"].size(0), forward_batch_size):
|
for forward_micro_batch_start in range(0, data["input_ids"].size(0), forward_batch_size):
|
||||||
input_ids_forward_micro_batch = data["input_ids"][
|
input_ids_forward_micro_batch = data["input_ids"][
|
||||||
forward_micro_batch_start : forward_micro_batch_start + forward_batch_size
|
forward_micro_batch_start : forward_micro_batch_start + forward_batch_size
|
||||||
@ -234,6 +194,111 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
advantages_forward_micro_batch = advantages[
|
advantages_forward_micro_batch = advantages[
|
||||||
forward_micro_batch_start : forward_micro_batch_start + forward_batch_size
|
forward_micro_batch_start : forward_micro_batch_start + forward_batch_size
|
||||||
]
|
]
|
||||||
|
|
||||||
|
if self.plugin.pp_size > 1:
|
||||||
|
# Support training with PP.
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
reference_model_outputs = self.booster.execute_pipeline(
|
||||||
|
iter(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"input_ids": input_ids_forward_micro_batch,
|
||||||
|
"attention_mask": attention_mask_forward_micro_batch,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
self.reference_model,
|
||||||
|
criterion=lambda outputs, inputs: torch.tensor(
|
||||||
|
[0.0], device=action_mask.device
|
||||||
|
), # dummy criterion
|
||||||
|
optimizer=None,
|
||||||
|
return_loss=False,
|
||||||
|
return_outputs=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.booster.plugin.stage_manager.is_last_stage():
|
||||||
|
reference_model_logits = reference_model_outputs["outputs"]["logits"]
|
||||||
|
reference_action_log_probs = calc_action_log_probs(
|
||||||
|
reference_model_logits / self.generate_config["temperature"],
|
||||||
|
input_ids_forward_micro_batch,
|
||||||
|
num_action,
|
||||||
|
self.plugin.shard_config,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Dummy reference logprobs for data iterator.
|
||||||
|
reference_action_log_probs = None
|
||||||
|
|
||||||
|
data_policy_forward = {
|
||||||
|
"input_ids": input_ids_forward_micro_batch,
|
||||||
|
"attention_mask": attention_mask_forward_micro_batch,
|
||||||
|
"action_mask": action_mask_forward_micro_batch,
|
||||||
|
"reference_action_log_probs": reference_action_log_probs,
|
||||||
|
"advantages": advantages_forward_micro_batch,
|
||||||
|
"loss_mask": loss_mask_forward_micro_batch,
|
||||||
|
"source": self.rank,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _criterion(outputs, inputs):
|
||||||
|
action_logits = outputs.logits
|
||||||
|
action_log_probs = calc_action_log_probs(
|
||||||
|
action_logits / self.generate_config["temperature"],
|
||||||
|
inputs["input_ids"],
|
||||||
|
num_action,
|
||||||
|
self.plugin.shard_config,
|
||||||
|
)
|
||||||
|
per_token_kl = (
|
||||||
|
torch.exp(inputs["reference_action_log_probs"] - action_log_probs)
|
||||||
|
- (inputs["reference_action_log_probs"] - action_log_probs)
|
||||||
|
- 1
|
||||||
|
)
|
||||||
|
decode_tokens_100 = self.tokenizer.batch_decode(
|
||||||
|
input_ids_forward_micro_batch[:, -num_action:],
|
||||||
|
skip_special_tokens=False,
|
||||||
|
)
|
||||||
|
loss, skip_update, _ = self.policy_loss_fn(
|
||||||
|
action_log_probs,
|
||||||
|
action_log_probs,
|
||||||
|
inputs["advantages"].repeat_interleave(action_log_probs.size(-1), dim=-1),
|
||||||
|
per_token_kl,
|
||||||
|
inputs["action_mask"],
|
||||||
|
loss_mask=inputs["loss_mask"],
|
||||||
|
)
|
||||||
|
return loss
|
||||||
|
|
||||||
|
policy_model_outputs = self.booster.execute_pipeline(
|
||||||
|
iter([data_policy_forward]),
|
||||||
|
self.policy_model,
|
||||||
|
criterion=_criterion,
|
||||||
|
optimizer=self.optimizer,
|
||||||
|
return_loss=True,
|
||||||
|
return_outputs=True,
|
||||||
|
)
|
||||||
|
loss = policy_model_outputs["loss"]
|
||||||
|
|
||||||
|
if self.booster.plugin.stage_manager.is_last_stage():
|
||||||
|
# calculate kl
|
||||||
|
action_logits = policy_model_outputs["outputs"]["logits"]
|
||||||
|
action_log_probs = calc_action_log_probs(
|
||||||
|
action_logits / self.generate_config["temperature"],
|
||||||
|
input_ids_forward_micro_batch,
|
||||||
|
num_action,
|
||||||
|
self.plugin.shard_config,
|
||||||
|
)
|
||||||
|
per_token_kl = (
|
||||||
|
torch.exp(reference_action_log_probs - action_log_probs)
|
||||||
|
- (reference_action_log_probs - action_log_probs)
|
||||||
|
- 1
|
||||||
|
)
|
||||||
|
kl = torch.sum(per_token_kl * action_mask_forward_micro_batch, dim=-1) / torch.sum(
|
||||||
|
action_mask_forward_micro_batch, dim=-1
|
||||||
|
)
|
||||||
|
kl = all_reduce_mean(kl.mean(), self.plugin)
|
||||||
|
loss = all_reduce_mean(loss, self.plugin)
|
||||||
|
mean_loss.append(loss.data)
|
||||||
|
mean_kl.append(kl)
|
||||||
|
else:
|
||||||
|
|
||||||
policy_model_logits = self.policy_model(
|
policy_model_logits = self.policy_model(
|
||||||
input_ids=input_ids_forward_micro_batch,
|
input_ids=input_ids_forward_micro_batch,
|
||||||
attention_mask=attention_mask_forward_micro_batch,
|
attention_mask=attention_mask_forward_micro_batch,
|
||||||
@ -256,7 +321,6 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
num_action,
|
num_action,
|
||||||
self.plugin.shard_config,
|
self.plugin.shard_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
per_token_kl = (
|
per_token_kl = (
|
||||||
torch.exp(reference_action_log_probs - action_log_probs)
|
torch.exp(reference_action_log_probs - action_log_probs)
|
||||||
- (reference_action_log_probs - action_log_probs)
|
- (reference_action_log_probs - action_log_probs)
|
||||||
@ -282,7 +346,9 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
# Calculate accumulate value.
|
# Calculate accumulate value.
|
||||||
mean_kl.append(kl.data)
|
mean_kl.append(kl.data)
|
||||||
mean_loss.append(loss.data)
|
mean_loss.append(loss.data)
|
||||||
|
if not self.plugin.pp_size > 1 or (
|
||||||
|
self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage()
|
||||||
|
):
|
||||||
reward = all_reduce_mean(reward.mean(), self.plugin)
|
reward = all_reduce_mean(reward.mean(), self.plugin)
|
||||||
format_reward = all_reduce_mean(format_reward.mean(), self.plugin)
|
format_reward = all_reduce_mean(format_reward.mean(), self.plugin)
|
||||||
acc_reward = all_reduce_mean(acc_reward.mean(), self.plugin)
|
acc_reward = all_reduce_mean(acc_reward.mean(), self.plugin)
|
||||||
@ -299,8 +365,13 @@ class GRPOConsumer(BaseConsumer):
|
|||||||
if need_update:
|
if need_update:
|
||||||
self.optimizer.step()
|
self.optimizer.step()
|
||||||
self.optimizer.zero_grad()
|
self.optimizer.zero_grad()
|
||||||
|
if not self.plugin.pp_size > 1 or (
|
||||||
|
self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage()
|
||||||
|
):
|
||||||
loss_scalar = self.accum_loss.item()
|
loss_scalar = self.accum_loss.item()
|
||||||
if self.rank == 0:
|
if (not self.plugin.pp_size > 1 and self.rank == 0) or (
|
||||||
|
self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage()
|
||||||
|
):
|
||||||
print(
|
print(
|
||||||
"Loss:",
|
"Loss:",
|
||||||
self.accum_loss.item() / self.accum_count,
|
self.accum_loss.item() / self.accum_count,
|
||||||
|
@ -109,7 +109,14 @@ if __name__ == "__main__":
|
|||||||
generate_config=generate_config,
|
generate_config=generate_config,
|
||||||
num_generations=args.num_generations,
|
num_generations=args.num_generations,
|
||||||
train_model_config=train_model_config,
|
train_model_config=train_model_config,
|
||||||
plugin_config={"pp_size": 2, "tp_size": 1, "microbatch_size": 2, "zero_stage": 0},
|
# plugin_config={}, # for zero
|
||||||
|
plugin_config={
|
||||||
|
"pp_size": 2,
|
||||||
|
"tp_size": 1,
|
||||||
|
"microbatch_size": args.train_microbatch_size // 2,
|
||||||
|
"zero_stage": 0,
|
||||||
|
"max_norm": 1.0,
|
||||||
|
}, # for pp
|
||||||
inference_backend=args.backend,
|
inference_backend=args.backend,
|
||||||
master_addr="localhost",
|
master_addr="localhost",
|
||||||
master_port=29505,
|
master_port=29505,
|
||||||
|
Loading…
Reference in New Issue
Block a user