mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-09-13 05:01:44 +00:00
upgrade colossal-chat support tp_group>1, add sp for sft
This commit is contained in:
@@ -248,9 +248,9 @@ class StatefulDistributedSampler(DistributedSampler):
|
||||
shuffle: bool = True,
|
||||
seed: int = 0,
|
||||
drop_last: bool = False,
|
||||
use_tp: Optional[bool] = False,
|
||||
tp_size: int = 1,
|
||||
) -> None:
|
||||
if not use_tp:
|
||||
if not tp_size>1:
|
||||
super().__init__(
|
||||
dataset=dataset,
|
||||
num_replicas=num_replicas,
|
||||
@@ -261,14 +261,16 @@ class StatefulDistributedSampler(DistributedSampler):
|
||||
)
|
||||
else:
|
||||
# adapted from https://github.com/pytorch/pytorch/blob/4979f9c0d72490970e2019bb1d2284f83d93f76b/torch/utils/data/distributed.py#L62
|
||||
# TODO: support tp_group>1. will fix it later
|
||||
num_replicas = 1
|
||||
if rank is None:
|
||||
rank = dist.get_rank()
|
||||
world_size = dist.get_world_size()
|
||||
dp_size = world_size // tp_size # data parallel size
|
||||
dp_rank = int(rank / tp_size) # data parallel rank
|
||||
if rank < 0:
|
||||
raise ValueError(f"Invalid rank {rank}, rank should be in the interval [0, 0]")
|
||||
self.dataset = dataset
|
||||
self.num_replicas = num_replicas
|
||||
self.dp_rank = dp_rank
|
||||
self.rank = rank
|
||||
self.epoch = 0
|
||||
self.drop_last = drop_last
|
||||
@@ -287,10 +289,10 @@ class StatefulDistributedSampler(DistributedSampler):
|
||||
self.shuffle = shuffle
|
||||
self.seed = seed
|
||||
self.start_index = 0
|
||||
self.use_tp = use_tp
|
||||
self.tp_size = tp_size
|
||||
|
||||
def __iter__(self) -> Iterator:
|
||||
if self.use_tp:
|
||||
if self.tp_size > 1:
|
||||
# TODO Add support for tp_group not equal to 1
|
||||
pass
|
||||
# adpated from https://github.com/pytorch/pytorch/blob/4979f9c0d72490970e2019bb1d2284f83d93f76b/torch/utils/data/distributed.py#L96
|
||||
@@ -316,10 +318,9 @@ class StatefulDistributedSampler(DistributedSampler):
|
||||
|
||||
# subsample
|
||||
indices = indices[
|
||||
: self.total_size : self.num_replicas
|
||||
self.dp_rank: self.dp_rank + self.total_size : self.num_replicas
|
||||
] # num_replicas=tp_group=1, we only support tp_group==1 for now
|
||||
assert len(indices) == self.num_samples
|
||||
|
||||
return iter(indices)
|
||||
|
||||
else:
|
||||
@@ -345,7 +346,7 @@ def setup_distributed_dataloader(
|
||||
num_workers: int = 0,
|
||||
collate_fn: Callable[[Sequence[Dict[str, Union[str, List[int]]]]], Dict[str, torch.Tensor]] = None,
|
||||
process_group: Optional[ProcessGroup] = None,
|
||||
use_tp: Optional[bool] = False,
|
||||
tp_size: Optional[int] = 1,
|
||||
**kwargs,
|
||||
) -> DataLoader:
|
||||
"""
|
||||
@@ -353,14 +354,16 @@ def setup_distributed_dataloader(
|
||||
"""
|
||||
_kwargs = kwargs.copy()
|
||||
process_group = process_group or _get_default_group()
|
||||
# world_size = tp_size * pp_size
|
||||
assert process_group.size()%tp_size == 0, f"process_group.size()={process_group.size()} must be divisible by tp_size={tp_size}"
|
||||
sampler = StatefulDistributedSampler(
|
||||
dataset=dataset,
|
||||
num_replicas=process_group.size() if not use_tp else 1,
|
||||
num_replicas=int(process_group.size()/tp_size),
|
||||
rank=process_group.rank(),
|
||||
shuffle=shuffle,
|
||||
seed=seed,
|
||||
drop_last=drop_last,
|
||||
use_tp=use_tp,
|
||||
tp_size=tp_size,
|
||||
)
|
||||
|
||||
# Deterministic dataloader
|
||||
|
Reference in New Issue
Block a user