ColossalAI/applications/ColossalChat/coati/distributed/utils.py

213 lines
6.8 KiB
Python

import json
import os
import time
from typing import Any, Dict, List
import torch
from filelock import FileLock
from colossalai.shardformer.layer.loss import dist_log_prob
def unbind_batch(batch: Dict[str, torch.Tensor]) -> List[Dict[str, torch.Tensor]]:
batches = []
for k, v in batch.items():
if len(batches) == 0:
unbinded_tensors = v.unbind(0)
batches = [{k: tensor} for tensor in unbinded_tensors]
else:
unbinded_tensors = v.unbind(0)
assert len(batches) == len(unbinded_tensors)
for i, tensor in enumerate(unbinded_tensors):
batches[i][k] = tensor
return batches
def bind_batch(batches: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]:
batch = {}
for k in batches[0].keys():
batch[k] = torch.stack([batch[k] for batch in batches], dim=0)
return batch
def pre_send(batch: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
# compress mask to save bandwidth
if "attention_mask" in batch:
batch["attention_mask"] = batch["attention_mask"].to(torch.bool)
if "action_mask" in batch:
batch["action_mask"] = batch["action_mask"].to(torch.bool)
return batch
def post_recv(batch: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
# decompress mask
if "attention_mask" in batch:
batch["attention_mask"] = batch["attention_mask"].to(torch.int)
if "action_mask" in batch:
batch["action_mask"] = batch["action_mask"].to(torch.int)
return batch
def update_by_default(data: Dict[str, Any], default: Dict[str, Any]) -> Dict[str, Any]:
data = data.copy()
for k, v in default.items():
if k not in data:
data[k] = v
return data
def log_probs_from_logits(logits: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
"""
Compute the log probabilities from logits for the given labels.
Args:
logits (torch.Tensor): The input logits.
labels (torch.Tensor): The target labels.
Returns:
torch.Tensor: The log probabilities corresponding to the labels.
"""
log_probs = torch.log_softmax(logits, dim=-1)
per_label_logps = log_probs.gather(dim=-1, index=labels.unsqueeze(-1))
return per_label_logps.squeeze(-1)
def calc_action_log_probs(
logits: torch.Tensor,
sequences: torch.LongTensor,
num_actions: int,
shard_config,
vocab_size: int = None,
) -> torch.Tensor:
"""Calculate action log probs.
Args:
logits (torch.Tensor): Output tensor of Actor.forward.logits.
sequences (torch.LongTensor): Input sequences.
num_actions (int): Number of actions.
shard_config
vocab_size
Returns:
torch.Tensor: Action log probs.
"""
# labels: torch.Tensor, # [B, S] or [B, S, Vocab_size]
# logits: torch.Tensor, # [B, S, Vocab_size]
log_probs = dist_log_prob(sequences, logits, shard_config, vocab_size, logits.dtype)
log_probs = log_probs.squeeze(-1)
return log_probs[:, -num_actions:]
def masked_mean(tensor: torch.Tensor, mask: torch.Tensor, dim: int = 1) -> torch.Tensor:
"""
Compute the masked mean of a tensor along a specified dimension.
Args:
tensor (torch.Tensor): The input tensor.
mask (torch.Tensor): The mask tensor with the same shape as the input tensor.
dim (int, optional): The dimension along which to compute the mean. Default is 1.
Returns:
torch.Tensor: The masked mean tensor.
"""
tensor = tensor * mask
tensor = tensor.sum(dim=dim)
mask_sum = mask.sum(dim=dim)
mean = tensor / (mask_sum + 1e-8)
return mean
def masked_sum(tensor: torch.Tensor, mask: torch.Tensor, dim: int = 1) -> torch.Tensor:
"""
Compute the masked sum of a tensor along a specified dimension.
Args:
tensor (torch.Tensor): The input tensor.
mask (torch.Tensor): The mask tensor with the same shape as the input tensor.
dim (int, optional): The dimension along which to compute the sum. Default is 1.
Returns:
torch.Tensor: The masked sum tensor.
"""
tensor = tensor * mask
return tensor.sum(dim=dim)
def safe_append_to_jsonl_file(file_path, data):
with FileLock(file_path + ".lock"):
# Ensure file exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "a", encoding="utf8") as f:
for entry in data:
json_line = json.dumps(entry, ensure_ascii=False)
f.write(json_line + "\n")
def memory_efficient_logprob(
logits: torch.Tensor,
inputs: torch.Tensor,
num_action: int,
chunk_size: int = 2048,
shard_config: Any = None,
vocab_size: int = None,
) -> torch.Tensor:
"""
Calculate action log probs in a memory-efficient way by processing in chunks.
Args:
logits (torch.Tensor): Output tensor of Actor.forward.logits.
inputs (torch.LongTensor): Input sequences.
num_action (int): Number of actions.
chunk_size (int, optional): Size of each chunk to process. Default is 2048.
shard_config: Shard configuration for distributed computation.
vocab_size (int, optional): Vocabulary size. Default is None.
Returns:
torch.Tensor: Action log probs.
"""
action_log_probs = torch.zeros((logits.size(0), num_action), device=logits.device, dtype=logits.dtype)
context_length = logits.size(1) - num_action
for i in range(action_log_probs.size(0)):
# loop over each sample in the micro-batch
for start in range(context_length, logits.size(1), chunk_size):
end = min(start + chunk_size, logits.size(1))
# calculate log probs in chunks to save memory
log_probs = dist_log_prob(
inputs[i : i + 1, start - 1 : end],
logits[i : i + 1, start - 1 : end],
shard_config,
vocab_size,
logits.dtype,
) # [1, chunk_size, 1]
log_probs = log_probs.squeeze(-1)
action_log_probs[i, start - context_length : end - context_length] += log_probs[0]
return action_log_probs
class CustomProfiler:
def __init__(self, name):
self.name = name
self.pid = os.getpid()
self.file = open(f"{name}.prof", "w")
def _log(self, message):
current_time = time.time()
self.file.write(f"{current_time} {self.name} {self.pid}:: {message}\n")
self.file.flush()
def log(self, message):
current_time = time.time()
self.file.write(f"[Log]: {current_time} {self.name} {self.pid}:: {message}\n")
self.file.flush()
def enter(self, event_name):
self._log(f"Enter {event_name}")
def exit(self, event_name):
self._log(f"Exit {event_name}")
def close(self):
self.file.close()
print(f"Profiler data written to {self.name}.prof")