mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-06-22 05:29:36 +00:00
* Add dpo. Fix sft, ppo, lora. Refactor all * fix and tested ppo * 2 nd round refactor * add ci tests * fix ci * fix ci * fix readme, style * fix readme style * fix style, fix benchmark * reproduce benchmark result, remove useless files * rename to ColossalChat * use new image * fix ci workflow * fix ci * use local model/tokenizer for ci tests * fix ci * fix ci * fix ci * fix ci timeout * fix rm progress bar. fix ci timeout * fix ci * fix ci typo * remove 3d plugin from ci temporary * test environment * cannot save optimizer * support chat template * fix readme * fix path * test ci locally * restore build_or_pr * fix ci data path * fix benchmark * fix ci, move ci tests to 3080, disable fast tokenizer * move ci to 85 * support flash attention 2 * add all-in-one data preparation script. Fix colossal-llama2-chat chat template * add hardware requirements * move ci test data * fix save_model, add unwrap * fix missing bos * fix missing bos; support grad accumulation with gemini * fix ci * fix ci * fix ci * fix llama2 chat template config * debug sft * debug sft * fix colossalai version requirement * fix ci * add sanity check to prevent NaN loss * fix requirements * add dummy data generation script * add dummy data generation script * add dummy data generation script * add dummy data generation script * update readme * update readme * update readme and ignore * fix logger bug * support parallel_output * modify data preparation logic * fix tokenization * update lr * fix inference * run pre-commit --------- Co-authored-by: Tong Li <tong.li352711588@gmail.com>
71 lines
2.5 KiB
Python
Executable File
71 lines
2.5 KiB
Python
Executable File
from typing import List
|
|
|
|
import torch
|
|
from coati.experience_buffer.utils import BufferItem, make_experience_batch, split_experience_batch
|
|
from coati.experience_maker.base import Experience
|
|
|
|
# from torch.multiprocessing import Queue
|
|
from ray.util.queue import Queue
|
|
|
|
|
|
class DetachedReplayBuffer:
|
|
"""
|
|
Detached replay buffer. Share Experience across workers on the same node.
|
|
Therefore, a trainer node is expected to have only one instance.
|
|
It is ExperienceMakerHolder's duty to call append(exp) method, remotely.
|
|
|
|
Args:
|
|
sample_batch_size: Batch size when sampling. Exp won't enqueue until they formed a batch.
|
|
tp_world_size: Number of workers in the same tp group
|
|
limit: Limit of number of experience sample BATCHs. A number <= 0 means unlimited. Defaults to 0.
|
|
cpu_offload: Whether to offload experience to cpu when sampling. Defaults to True.
|
|
"""
|
|
|
|
def __init__(self, sample_batch_size: int, limit: int = 0) -> None:
|
|
self.sample_batch_size = sample_batch_size
|
|
self.limit = limit
|
|
self.items = Queue(self.limit, actor_options={"num_cpus": 1})
|
|
self.batch_collector: List[BufferItem] = []
|
|
|
|
@torch.no_grad()
|
|
def append(self, experience: Experience) -> None:
|
|
"""
|
|
Expected to be called remotely.
|
|
"""
|
|
items = split_experience_batch(experience)
|
|
self.extend(items)
|
|
|
|
@torch.no_grad()
|
|
def extend(self, items: List[BufferItem]) -> None:
|
|
"""
|
|
Expected to be called remotely.
|
|
"""
|
|
self.batch_collector.extend(items)
|
|
while len(self.batch_collector) >= self.sample_batch_size:
|
|
items = self.batch_collector[: self.sample_batch_size]
|
|
experience = make_experience_batch(items)
|
|
self.items.put(experience, block=True)
|
|
self.batch_collector = self.batch_collector[self.sample_batch_size :]
|
|
|
|
def clear(self) -> None:
|
|
# self.items.close()
|
|
self.items.shutdown()
|
|
self.items = Queue(self.limit)
|
|
self.worker_state = [False] * self.tp_world_size
|
|
self.batch_collector = []
|
|
|
|
@torch.no_grad()
|
|
def sample(self, worker_rank=0, to_device="cpu") -> Experience:
|
|
ret = self._sample_and_erase()
|
|
ret.to_device(to_device)
|
|
return ret
|
|
|
|
@torch.no_grad()
|
|
def _sample_and_erase(self) -> Experience:
|
|
ret = self.items.get(block=True)
|
|
return ret
|
|
|
|
def get_length(self) -> int:
|
|
ret = self.items.qsize()
|
|
return ret
|