Manually schedule resources and support auto master address assigning

This commit is contained in:
YeAnbang 2025-06-10 15:00:48 +08:00
parent bb6f5d98fc
commit 21d517d0fa

View File

@ -79,8 +79,35 @@ def launch_distributed(
f"{project_name.replace(' ','_')}_run_{wandb_group_name}.jsonl",
)
procs = []
# Attention: Ray use complex schedualing method that consider various factors including load-balancing.
# when requesting resources, it is not guaranteed that the resource comes from a node with lower node it
# this go against the design principle of our implementation, and we need to manually force the schedualing,
# allocating the producer to nodes with lower node id and the consumer to the resouces from nodes with higher
# node id. See the reference here: https://docs.ray.io/en/latest/ray-core/scheduling/index.html#nodeaffinityschedulingstrategy
nodes = ray.nodes()
node_info = {
node["NodeID"]: {
"num_gpus": node["Resources"].get("GPU", 0),
"address": node["NodeManagerAddress"],
} # Default to 0 if no GPUs are available
for node in nodes
}
gpu_to_node_id = []
gpu_to_ip_address = []
for node_id in node_info:
for idx in range(int(node_info[node_id]["num_gpus"])):
gpu_to_node_id.append(node_id)
gpu_to_ip_address.append(node_info[node_id]["address"])
print(node_info)
producer_procs = []
for i in range(num_producers):
node_id = gpu_to_node_id[0]
producer_ip_address = gpu_to_ip_address[0]
for _ in range(num_proc_per_producer):
gpu_to_node_id.pop(0)
gpu_to_ip_address.pop(0)
print(f"Schedual Producer P[{i}] which requires {num_proc_per_producer} GPUs on node {producer_ip_address}")
producer = SimpleProducer.options(num_gpus=num_proc_per_producer).remote(
producer_idx=i,
num_producers=num_producers,
@ -106,20 +133,29 @@ def launch_distributed(
log_rollout_interval=log_rollout_interval,
rollout_log_file=rollout_log_file,
)
procs.append(producer)
producer_procs.append(producer)
ray.get([p.setup.remote() for p in producer_procs])
generate_config_consumer = copy.deepcopy(generate_config)
generate_config_consumer.update(
dict(
backend=inference_backend,
)
)
consumer_master_ip_address = gpu_to_ip_address[0]
print(f"Use {consumer_master_ip_address} as master address for torch DDP.")
consumer_procs = []
for i in range(num_consumer_procs):
node_id = gpu_to_node_id[0]
consumer_ip_address = gpu_to_ip_address[0]
gpu_to_node_id.pop(0)
gpu_to_ip_address.pop(0)
print(f"Schedual Consumer T[{i}] which requires 1 GPUs on node {consumer_ip_address}")
consumer = core_consumer.options(num_gpus=1).remote(
num_producers=num_producers,
num_episodes=num_episodes,
rank=i,
world_size=num_consumer_procs,
master_addr=master_addr,
master_addr=consumer_master_ip_address,
master_port=master_port,
num_update_per_episode=num_update_per_episode,
num_recv_per_update=num_recv_per_update,
@ -136,6 +172,6 @@ def launch_distributed(
run_name=run_name,
wandb_group_name=wandb_group_name,
)
procs.append(consumer)
ray.get([p.setup.remote() for p in procs])
ray.get([p.loop.remote() for p in procs])
consumer_procs.append(consumer)
ray.get([p.setup.remote() for p in consumer_procs])
ray.get([p.loop.remote() for p in (producer_procs + consumer_procs)])