diff --git a/tests/v1/engine/test_dp_placement_node_allowlist.py b/tests/v1/engine/test_dp_placement_node_allowlist.py new file mode 100644 index 00000000000..1fd6f34fed4 --- /dev/null +++ b/tests/v1/engine/test_dp_placement_node_allowlist.py @@ -0,0 +1,119 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Unit tests for VLLM_RAY_DP_PLACEMENT_NODE_IPS.""" + +from types import SimpleNamespace +from unittest.mock import patch + +import pytest + +import vllm.v1.engine.utils as utils +from vllm.v1.engine.utils import CoreEngineActorManager + + +def _vllm_config( + *, dp_size, dp_local, master_ip, world_size=1, all2all_backend="naive" +): + parallel = SimpleNamespace( + data_parallel_master_ip=master_ip, + data_parallel_size=dp_size, + data_parallel_size_local=dp_local, + world_size=world_size, + all2all_backend=all2all_backend, + ) + return SimpleNamespace(parallel_config=parallel) + + +def _resources(node_gpus: dict[str, int]): + # node_gpus: {ip: gpu_count}; plus a CPU-only head node. + res = { + f"id-{ip}": {"GPU": float(g), f"node:{ip}": 1.0} for ip, g in node_gpus.items() + } + res["id-head"] = { + "CPU": 8.0, + "node:__internal_head__": 1.0, + "node:10.9.9.9": 1.0, + } + return res + + +def _run(cfg, resources): + created = [] + + def fake_pg(name, strategy, bundles): + created.append({"name": name, "strategy": strategy, "bundles": bundles}) + return object() + + with ( + patch( + "ray._private.state.available_resources_per_node", + return_value=resources, + ), + patch.object(utils, "current_platform", SimpleNamespace(ray_device_key="GPU")), + patch("ray.util.placement_group", side_effect=fake_pg), + ): + pgs, local_ranks = CoreEngineActorManager.create_dp_placement_groups(cfg) + return pgs, local_ranks, created + + +def _pinned_ips(created): + return { + key.split(":", 1)[1] + for pg in created + for bundle in pg["bundles"] + for key in bundle + if key.startswith("node:") + } + + +def test_allowlist_confines_dp_to_listed_nodes(monkeypatch): + monkeypatch.setenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", "10.0.0.1,10.0.0.3") + resources = _resources({"10.0.0.1": 8, "10.0.0.2": 8, "10.0.0.3": 8, "10.0.0.4": 8}) + cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.1") + + pgs, _, created = _run(cfg, resources) + + assert len(pgs) == 16 # 8 on .1 (master) + 8 on .3 + assert _pinned_ips(created) <= {"10.0.0.1", "10.0.0.3"} + + +def test_empty_allowlist_is_noop(monkeypatch): + monkeypatch.delenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", raising=False) + resources = _resources({"10.0.0.1": 8, "10.0.0.2": 8}) + cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.1") + + pgs, _, created = _run(cfg, resources) + + assert len(pgs) == 16 + assert _pinned_ips(created) == {"10.0.0.1", "10.0.0.2"} # all nodes used + + +def test_master_auto_added_with_warning(monkeypatch): + # Allowlist omits the master; vLLM must still keep it and warn. + monkeypatch.setenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", "10.0.0.3") + resources = _resources({"10.0.0.1": 8, "10.0.0.3": 8}) + cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.1") + _, _, created = _run(cfg, resources) + + assert _pinned_ips(created) == {"10.0.0.1", "10.0.0.3"} + + +def test_allowlist_isolates_two_engines(monkeypatch): + # Engine B is confined to .2/.4, so it can never touch engine A's master .1. + monkeypatch.setenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", "10.0.0.2,10.0.0.4") + resources = _resources({"10.0.0.1": 8, "10.0.0.2": 8, "10.0.0.3": 8, "10.0.0.4": 8}) + cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.2") + + _, _, created = _run(cfg, resources) + + assert _pinned_ips(created) <= {"10.0.0.2", "10.0.0.4"} + + +def test_allowlist_too_small_raises(monkeypatch): + # Master alone can't hold all ranks and no other node is allowed. + monkeypatch.setenv("VLLM_RAY_DP_PLACEMENT_NODE_IPS", "10.0.0.1") + resources = _resources({"10.0.0.1": 8, "10.0.0.2": 8}) + cfg = _vllm_config(dp_size=16, dp_local=8, master_ip="10.0.0.1") + + with pytest.raises(ValueError): # not enough placement groups created + _run(cfg, resources) diff --git a/vllm/envs.py b/vllm/envs.py index a32f055a028..8f4e18d2235 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -157,6 +157,7 @@ if TYPE_CHECKING: VLLM_DP_MASTER_PORT: int = 0 VLLM_RANDOMIZE_DP_DUMMY_INPUTS: bool = False VLLM_RAY_DP_PACK_STRATEGY: Literal["strict", "fill", "span"] = "strict" + VLLM_RAY_DP_PLACEMENT_NODE_IPS: str = "" VLLM_RAY_EXTRA_ENV_VAR_PREFIXES_TO_COPY: str = "" VLLM_RAY_EXTRA_ENV_VARS_TO_COPY: str = "" VLLM_MARLIN_USE_ATOMIC_ADD: bool = False @@ -1324,6 +1325,13 @@ environment_variables: dict[str, Callable[[], Any]] = { "VLLM_RAY_DP_PACK_STRATEGY": lambda: os.getenv( "VLLM_RAY_DP_PACK_STRATEGY", "strict" ), + # Optional comma-separated list of node IPs that Ray data-parallel + # placement groups may use. When set, create_dp_placement_groups only + # considers these nodes (the DP master node is always included). + # This environment variable is ignored if data-parallel-backend is not Ray. + "VLLM_RAY_DP_PLACEMENT_NODE_IPS": lambda: os.getenv( + "VLLM_RAY_DP_PLACEMENT_NODE_IPS", "" + ), # Comma-separated *additional* prefixes of env vars to copy from the # driver to Ray workers. These are merged with the built-in defaults # defined in ``vllm.ray.ray_env`` (VLLM_, etc.). Example: "MYLIB_,OTHER_" diff --git a/vllm/v1/engine/utils.py b/vllm/v1/engine/utils.py index 8a7269a7707..e13301f03c6 100644 --- a/vllm/v1/engine/utils.py +++ b/vllm/v1/engine/utils.py @@ -101,6 +101,23 @@ def _get_bundle_node_ip(bundle: dict[str, float]) -> str: raise ValueError(f"Missing node affinity in placement bundle: {bundle}") +def _node_ip_from_resources(node_resources: dict) -> str | None: + """Return the node IP encoded in a Ray per-node resource dict, or None. + + Ray advertises each node's IP as a ``node:`` resource key. The head node + also carries ``node:__internal_head__``, and placement groups add + ``..._group_...`` keys; both are ignored. + """ + for key in node_resources: + if ( + key.startswith("node:") + and key != "node:__internal_head__" + and "_group_" not in key + ): + return key.split(":", 1)[1] + return None + + class CoreEngineProcManager: """ Utility class to handle creation, readiness, and shutdown @@ -506,6 +523,32 @@ class CoreEngineActorManager: assert dp_master_ip_key in nodes[0], ( f"The DP master node (ip: {dp_master_ip}) is missing or dead" ) + + # optionally restrict DP placement to a caller-provided node set. + requested_node_ips = { + ip.strip() + for ip in envs.VLLM_RAY_DP_PLACEMENT_NODE_IPS.split(",") + if ip.strip() + } + if requested_node_ips: + allowed_node_ips = set(requested_node_ips) + # The master node must host the local ranks, so it has to be allowed. + if dp_master_ip not in allowed_node_ips: + allowed_node_ips.add(dp_master_ip) + filtered_nodes = [ + node_resources + for node_resources in nodes + if _node_ip_from_resources(node_resources) in allowed_node_ips + ] + logger.info( + "VLLM_RAY_DP_PLACEMENT_NODE_IPS set; restricting DP placement " + "from %d to %d node(s): %s", + len(nodes), + len(filtered_nodes), + sorted(allowed_node_ips), + ) + nodes = filtered_nodes + device_str = current_platform.ray_device_key n_node_devices: list[int] = [ int(node_resources[device_str]) @@ -572,18 +615,10 @@ class CoreEngineActorManager: # for "span" pack strategy collected_bundles = [] for node_resources in nodes: - node_ip_keys = [ - key - for key in node_resources - if key != "node:__internal_head__" - and key.startswith("node:") - and "_group_" not in key - ] - assert len(node_ip_keys) == 1, ( - f"Zero or multiple node IP keys found in node resources: {node_ip_keys}" + node_ip = _node_ip_from_resources(node_resources) + assert node_ip is not None, ( + f"No node IP key found in node resources: {node_resources}" ) - node_ip_key = node_ip_keys[0] - node_ip = node_ip_key.split(":")[1] n_device_on_node = int(node_resources.get(device_str, 0)) if pack_strategy == "span" and n_device_on_node != 0: