diff --git a/applications/ColossalChat/coati/distributed/comm.py b/applications/ColossalChat/coati/distributed/comm.py index 0a724d53b..47eae5906 100644 --- a/applications/ColossalChat/coati/distributed/comm.py +++ b/applications/ColossalChat/coati/distributed/comm.py @@ -1,5 +1,6 @@ -from typing import Any, Dict import copy +from typing import Any, Dict + import ray import ray.util.collective as cc import torch @@ -31,6 +32,7 @@ def ray_broadcast_object(obj: Any, src: int = 0, device=None, group_name: str = obj = c10d._tensor_to_object(obj, size_tensor.item()) return obj + def ray_broadcast_tensor_dict( tensor_dict: Dict[str, torch.Tensor], src: int = 0, diff --git a/applications/ColossalChat/coati/distributed/zero_bubble/consumer.py b/applications/ColossalChat/coati/distributed/zero_bubble/consumer.py index 82242e874..ee1da7c01 100644 --- a/applications/ColossalChat/coati/distributed/zero_bubble/consumer.py +++ b/applications/ColossalChat/coati/distributed/zero_bubble/consumer.py @@ -7,7 +7,9 @@ import ray import ray.util.collective as cc import torch import torch.distributed as dist +from coati.distributed.comm import SharedVariableActor, ray_broadcast_tensor_dict from coati.distributed.profiling_utils import CustomProfiler +from coati.distributed.utils import bind_batch, post_recv, unbind_batch from tqdm import tqdm from colossalai.booster import Booster @@ -15,9 +17,6 @@ from colossalai.booster.plugin import HybridParallelPlugin from colossalai.initialize import launch from colossalai.utils import get_current_device -from coati.distributed.comm import SharedVariableActor, ray_broadcast_tensor_dict -from coati.distributed.utils import bind_batch, post_recv, unbind_batch - class BaseConsumer: def __init__( diff --git a/applications/ColossalChat/coati/distributed/zero_bubble/distributor.py b/applications/ColossalChat/coati/distributed/zero_bubble/distributor.py index b16f4b67e..78fe874ae 100644 --- a/applications/ColossalChat/coati/distributed/zero_bubble/distributor.py +++ b/applications/ColossalChat/coati/distributed/zero_bubble/distributor.py @@ -3,12 +3,11 @@ import time import ray import ray.util.collective as cc import torch +from coati.distributed.comm import SharedVariableActor, ray_broadcast_tensor_dict from coati.distributed.profiling_utils import CustomProfiler from colossalai.utils import get_current_device -from coati.distributed.comm import SharedVariableActor, ray_broadcast_tensor_dict - @ray.remote class Distributor: diff --git a/applications/ColossalChat/coati/distributed/zero_bubble/grpo_consumer.py b/applications/ColossalChat/coati/distributed/zero_bubble/grpo_consumer.py index c07385b97..9d3d788a5 100644 --- a/applications/ColossalChat/coati/distributed/zero_bubble/grpo_consumer.py +++ b/applications/ColossalChat/coati/distributed/zero_bubble/grpo_consumer.py @@ -5,9 +5,9 @@ import ray import torch import wandb from coati.distributed.comm import SharedVariableActor -from coati.distributed.zero_bubble.consumer import BaseConsumer from coati.distributed.loss import PolicyLoss from coati.distributed.utils import memory_efficient_logprob +from coati.distributed.zero_bubble.consumer import BaseConsumer from coati.trainer.utils import all_reduce_mean, all_reduce_sum from transformers import AutoModelForCausalLM, AutoTokenizer diff --git a/applications/ColossalChat/coati/distributed/zero_bubble/producer.py b/applications/ColossalChat/coati/distributed/zero_bubble/producer.py index 9e57914c4..6c28d0389 100644 --- a/applications/ColossalChat/coati/distributed/zero_bubble/producer.py +++ b/applications/ColossalChat/coati/distributed/zero_bubble/producer.py @@ -11,9 +11,12 @@ import torch import tqdm import wandb from coati.dataset.loader import RawConversationDataset, collate_fn_grpo +from coati.distributed.comm import SharedVariableActor, ray_broadcast_tensor_dict +from coati.distributed.inference_backend import BACKEND_MAP from coati.distributed.profiling_utils import CustomProfiler from coati.distributed.reward.reward_fn import boxed_math_reward_fn, code_reward_fn, math_reward_fn from coati.distributed.reward.verifiable_reward import VerifiableReward +from coati.distributed.utils import pre_send, safe_append_to_jsonl_file from ray.util.collective import allreduce from ray.util.collective.types import ReduceOp from torch.utils.data import DataLoader, DistributedSampler @@ -21,10 +24,6 @@ from transformers import AutoTokenizer from colossalai.utils import get_current_device -from coati.distributed.comm import SharedVariableActor, ray_broadcast_tensor_dict -from coati.distributed.inference_backend import BACKEND_MAP -from coati.distributed.utils import pre_send, safe_append_to_jsonl_file - try: from vllm import SamplingParams except ImportError: