[Core][Engine] allow DP ray placement groups to be set on specific nodes (#44669)

Signed-off-by: walterbm <walter.beller.morales@gmail.com>
This commit is contained in:
Walter Beller-Morales
2026-06-05 16:07:47 -04:00
committed by GitHub
parent e28e369f78
commit c73b0d0db9
3 changed files with 173 additions and 11 deletions
@@ -0,0 +1,119 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Unit tests for VLLM_RAY_DP_PLACEMENT_NODE_IPS."""
from types import SimpleNamespace
from unittest.mock import patch
import pytest
import vllm.v1.engine.utils as utils
from vllm.v1.engine.utils import CoreEngineActorManager
def _vllm_config(
*, dp_size, dp_local, master_ip, world_size=1, all2all_backend="naive"
):
parallel = SimpleNamespace(
data_parallel_master_ip=master_ip,
data_parallel_size=dp_size,
data_parallel_size_local=dp_local,
world_size=world_size,
all2all_backend=all2all_backend,
)
return SimpleNamespace(parallel_config=parallel)
def _resources(node_gpus: dict[str, int]):
# node_gpus: {ip: gpu_count}; plus a CPU-only head node.
res = {
f"id-{ip}": {"GPU": float(g), f"node:{ip}": 1.0} for ip, g in node_gpus.items()
}
res["id-head"] = {
"CPU": 8.0,
"node:__internal_head__": 1.0,
"node:10.9.9.9": 1.0,
}
return res
def _run(cfg, resources):
created = []
def fake_pg(name, strategy, bundles):
created.append({"name": name, "strategy": strategy, "bundles": bundles})
return object()
with (
patch(
"ray._private.state.available_resources_per_node",
return_value=resources,
),
patch.object(utils, "current_platform", SimpleNamespace(ray_device_key="GPU")),
patch("ray.util.placement_group", side_effect=fake_pg),
):
pgs, local_ranks = CoreEngineActorManager.create_dp_placement_groups(cfg)
return pgs, local_ranks, created
def _pinned_ips(created):
return {
key.split(":", 1)[1]
for pg in created
for bundle in pg["bundles"]
for key in bundle
if key.startswith("node:")
}
def test_allowlist_confines_dp_to_listed_nodes(monkeypatch):
monkeypatch.setenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", "10.0.0.1,10.0.0.3")
resources = _resources({"10.0.0.1": 8, "10.0.0.2": 8, "10.0.0.3": 8, "10.0.0.4": 8})
cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.1")
pgs, _, created = _run(cfg, resources)
assert len(pgs) == 16 # 8 on .1 (master) + 8 on .3
assert _pinned_ips(created) <= {"10.0.0.1", "10.0.0.3"}
def test_empty_allowlist_is_noop(monkeypatch):
monkeypatch.delenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", raising=False)
resources = _resources({"10.0.0.1": 8, "10.0.0.2": 8})
cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.1")
pgs, _, created = _run(cfg, resources)
assert len(pgs) == 16
assert _pinned_ips(created) == {"10.0.0.1", "10.0.0.2"} # all nodes used
def test_master_auto_added_with_warning(monkeypatch):
# Allowlist omits the master; vLLM must still keep it and warn.
monkeypatch.setenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", "10.0.0.3")
resources = _resources({"10.0.0.1": 8, "10.0.0.3": 8})
cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.1")
_, _, created = _run(cfg, resources)
assert _pinned_ips(created) == {"10.0.0.1", "10.0.0.3"}
def test_allowlist_isolates_two_engines(monkeypatch):
# Engine B is confined to .2/.4, so it can never touch engine A's master .1.
monkeypatch.setenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", "10.0.0.2,10.0.0.4")
resources = _resources({"10.0.0.1": 8, "10.0.0.2": 8, "10.0.0.3": 8, "10.0.0.4": 8})
cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.2")
_, _, created = _run(cfg, resources)
assert _pinned_ips(created) <= {"10.0.0.2", "10.0.0.4"}
def test_allowlist_too_small_raises(monkeypatch):
# Master alone can't hold all ranks and no other node is allowed.
monkeypatch.setenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", "10.0.0.1")
resources = _resources({"10.0.0.1": 8, "10.0.0.2": 8})
cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.1")
with pytest.raises(ValueError): # not enough placement groups created
_run(cfg, resources)
+8
View File
@@ -157,6 +157,7 @@ if TYPE_CHECKING:
VLLM_DP_MASTER_PORT: int = 0 VLLM_DP_MASTER_PORT: int = 0
VLLM_RANDOMIZE_DP_DUMMY_INPUTS: bool = False VLLM_RANDOMIZE_DP_DUMMY_INPUTS: bool = False
VLLM_RAY_DP_PACK_STRATEGY: Literal["strict", "fill", "span"] = "strict" VLLM_RAY_DP_PACK_STRATEGY: Literal["strict", "fill", "span"] = "strict"
VLLM_RAY_DP_PLACEMENT_NODE_IPS: str = ""
VLLM_RAY_EXTRA_ENV_VAR_PREFIXES_TO_COPY: str = "" VLLM_RAY_EXTRA_ENV_VAR_PREFIXES_TO_COPY: str = ""
VLLM_RAY_EXTRA_ENV_VARS_TO_COPY: str = "" VLLM_RAY_EXTRA_ENV_VARS_TO_COPY: str = ""
VLLM_MARLIN_USE_ATOMIC_ADD: bool = False VLLM_MARLIN_USE_ATOMIC_ADD: bool = False
@@ -1324,6 +1325,13 @@ environment_variables: dict[str, Callable[[], Any]] = {
"VLLM_RAY_DP_PACK_STRATEGY": lambda: os.getenv( "VLLM_RAY_DP_PACK_STRATEGY": lambda: os.getenv(
"VLLM_RAY_DP_PACK_STRATEGY", "strict" "VLLM_RAY_DP_PACK_STRATEGY", "strict"
), ),
# Optional comma-separated list of node IPs that Ray data-parallel
# placement groups may use. When set, create_dp_placement_groups only
# considers these nodes (the DP master node is always included).
# This environment variable is ignored if data-parallel-backend is not Ray.
"VLLM_RAY_DP_PLACEMENT_NODE_IPS": lambda: os.getenv(
"VLLM_RAY_DP_PLACEMENT_NODE_IPS", ""
),
# Comma-separated *additional* prefixes of env vars to copy from the # Comma-separated *additional* prefixes of env vars to copy from the
# driver to Ray workers. These are merged with the built-in defaults # driver to Ray workers. These are merged with the built-in defaults
# defined in ``vllm.ray.ray_env`` (VLLM_, etc.). Example: "MYLIB_,OTHER_" # defined in ``vllm.ray.ray_env`` (VLLM_, etc.). Example: "MYLIB_,OTHER_"
+46 -11
View File
@@ -101,6 +101,23 @@ def _get_bundle_node_ip(bundle: dict[str, float]) -> str:
raise ValueError(f"Missing node affinity in placement bundle: {bundle}") raise ValueError(f"Missing node affinity in placement bundle: {bundle}")
def _node_ip_from_resources(node_resources: dict) -> str | None:
"""Return the node IP encoded in a Ray per-node resource dict, or None.
Ray advertises each node's IP as a ``node:<ip>`` resource key. The head node
also carries ``node:__internal_head__``, and placement groups add
``..._group_...`` keys; both are ignored.
"""
for key in node_resources:
if (
key.startswith("node:")
and key != "node:__internal_head__"
and "_group_" not in key
):
return key.split(":", 1)[1]
return None
class CoreEngineProcManager: class CoreEngineProcManager:
""" """
Utility class to handle creation, readiness, and shutdown Utility class to handle creation, readiness, and shutdown
@@ -506,6 +523,32 @@ class CoreEngineActorManager:
assert dp_master_ip_key in nodes[0], ( assert dp_master_ip_key in nodes[0], (
f"The DP master node (ip: {dp_master_ip}) is missing or dead" f"The DP master node (ip: {dp_master_ip}) is missing or dead"
) )
# optionally restrict DP placement to a caller-provided node set.
requested_node_ips = {
ip.strip()
for ip in envs.VLLM_RAY_DP_PLACEMENT_NODE_IPS.split(",")
if ip.strip()
}
if requested_node_ips:
allowed_node_ips = set(requested_node_ips)
# The master node must host the local ranks, so it has to be allowed.
if dp_master_ip not in allowed_node_ips:
allowed_node_ips.add(dp_master_ip)
filtered_nodes = [
node_resources
for node_resources in nodes
if _node_ip_from_resources(node_resources) in allowed_node_ips
]
logger.info(
"VLLM_RAY_DP_PLACEMENT_NODE_IPS set; restricting DP placement "
"from %d to %d node(s): %s",
len(nodes),
len(filtered_nodes),
sorted(allowed_node_ips),
)
nodes = filtered_nodes
device_str = current_platform.ray_device_key device_str = current_platform.ray_device_key
n_node_devices: list[int] = [ n_node_devices: list[int] = [
int(node_resources[device_str]) int(node_resources[device_str])
@@ -572,18 +615,10 @@ class CoreEngineActorManager:
# for "span" pack strategy # for "span" pack strategy
collected_bundles = [] collected_bundles = []
for node_resources in nodes: for node_resources in nodes:
node_ip_keys = [ node_ip = _node_ip_from_resources(node_resources)
key assert node_ip is not None, (
for key in node_resources f"No node IP key found in node resources: {node_resources}"
if key != "node:__internal_head__"
and key.startswith("node:")
and "_group_" not in key
]
assert len(node_ip_keys) == 1, (
f"Zero or multiple node IP keys found in node resources: {node_ip_keys}"
) )
node_ip_key = node_ip_keys[0]
node_ip = node_ip_key.split(":")[1]
n_device_on_node = int(node_resources.get(device_str, 0)) n_device_on_node = int(node_resources.get(device_str, 0))
if pack_strategy == "span" and n_device_on_node != 0: if pack_strategy == "span" and n_device_on_node != 0: