diff --git a/applications/ColossalChat/coati/distributed/launch.py b/applications/ColossalChat/coati/distributed/launch.py index 41ba8ea55..30e7382ef 100644 --- a/applications/ColossalChat/coati/distributed/launch.py +++ b/applications/ColossalChat/coati/distributed/launch.py @@ -79,8 +79,35 @@ def launch_distributed( f"{project_name.replace(' ','_')}_run_{wandb_group_name}.jsonl", ) - procs = [] + # Attention: Ray use complex schedualing method that consider various factors including load-balancing. + # when requesting resources, it is not guaranteed that the resource comes from a node with lower node it + # this go against the design principle of our implementation, and we need to manually force the schedualing, + # allocating the producer to nodes with lower node id and the consumer to the resouces from nodes with higher + # node id. See the reference here: https://docs.ray.io/en/latest/ray-core/scheduling/index.html#nodeaffinityschedulingstrategy + nodes = ray.nodes() + node_info = { + node["NodeID"]: { + "num_gpus": node["Resources"].get("GPU", 0), + "address": node["NodeManagerAddress"], + } # Default to 0 if no GPUs are available + for node in nodes + } + gpu_to_node_id = [] + gpu_to_ip_address = [] + for node_id in node_info: + for idx in range(int(node_info[node_id]["num_gpus"])): + gpu_to_node_id.append(node_id) + gpu_to_ip_address.append(node_info[node_id]["address"]) + print(node_info) + + producer_procs = [] for i in range(num_producers): + node_id = gpu_to_node_id[0] + producer_ip_address = gpu_to_ip_address[0] + for _ in range(num_proc_per_producer): + gpu_to_node_id.pop(0) + gpu_to_ip_address.pop(0) + print(f"Schedual Producer P[{i}] which requires {num_proc_per_producer} GPUs on node {producer_ip_address}") producer = SimpleProducer.options(num_gpus=num_proc_per_producer).remote( producer_idx=i, num_producers=num_producers, @@ -106,20 +133,29 @@ def launch_distributed( log_rollout_interval=log_rollout_interval, rollout_log_file=rollout_log_file, ) - procs.append(producer) + producer_procs.append(producer) + ray.get([p.setup.remote() for p in producer_procs]) generate_config_consumer = copy.deepcopy(generate_config) generate_config_consumer.update( dict( backend=inference_backend, ) ) + consumer_master_ip_address = gpu_to_ip_address[0] + print(f"Use {consumer_master_ip_address} as master address for torch DDP.") + consumer_procs = [] for i in range(num_consumer_procs): + node_id = gpu_to_node_id[0] + consumer_ip_address = gpu_to_ip_address[0] + gpu_to_node_id.pop(0) + gpu_to_ip_address.pop(0) + print(f"Schedual Consumer T[{i}] which requires 1 GPUs on node {consumer_ip_address}") consumer = core_consumer.options(num_gpus=1).remote( num_producers=num_producers, num_episodes=num_episodes, rank=i, world_size=num_consumer_procs, - master_addr=master_addr, + master_addr=consumer_master_ip_address, master_port=master_port, num_update_per_episode=num_update_per_episode, num_recv_per_update=num_recv_per_update, @@ -136,6 +172,6 @@ def launch_distributed( run_name=run_name, wandb_group_name=wandb_group_name, ) - procs.append(consumer) - ray.get([p.setup.remote() for p in procs]) - ray.get([p.loop.remote() for p in procs]) + consumer_procs.append(consumer) + ray.get([p.setup.remote() for p in consumer_procs]) + ray.get([p.loop.remote() for p in (producer_procs + consumer_procs)])