This commit is contained in:
Yan Chunwei 2026-01-13 21:25:09 +08:00 committed by GitHub
commit 697161ed7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 73 additions and 57 deletions

View File

@ -350,22 +350,29 @@ class RayExecutor(RpcExecutorMixin, GenerationExecutor):
tp_size: int,
worker_kwargs: Dict = None) -> Tuple[Any, List[int]]:
"""
Either use the existing placement group from driver script (e.g., in the case of RL FW integration),
or create a default PACK placement group where each bundle has tp_size GPUs.
- When tp_size GPUs per node, keep one TP group per node.
- When tp_size > GPUs per node, allow a TP group span nodes.
- rank 0 must be put on the driver node
Obtain placement group(s) and bundle indices for workers.
Priorities:
1. `ray_placement_config` in `llm_args`.
2. `TRTLLM_RAY_BUNDLE_INDICES` environment variable (uses current placement group).
3. Default creation: A PACK placement group where each bundle has `tp_size` GPUs.
- When `tp_size` <= GPUs per node, keep one TP group per node.
- When `tp_size` > GPUs per node, allow a TP group to span nodes.
- rank 0 is forced onto the driver node.
Returns:
Tuple of (placement_group(s), bundle_indices)
- placement_group(s) can be a single PlacementGroup or a List[PlacementGroup]
- bundle_indices is always a List[int]
Tuple[Union[PlacementGroup, List[PlacementGroup]], List[int]]:
- placement_group(s): A single `PlacementGroup` (shared by all workers) or a list of `PlacementGroup` (one per worker).
- bundle_indices: A list of bundle indices.
If `placement_group(s)` is a single object, `bundle_indices[i]` maps worker `i` to that bundle in the group.
If `placement_group(s)` is a list, `bundle_indices[i]` maps worker `i` to that bundle in `placement_groups[i]`.
"""
llm_args = worker_kwargs.get("llm_args") if worker_kwargs else None
placement_config = getattr(llm_args, 'ray_placement_config',
None) if llm_args else None
if placement_config and placement_config.placement_groups is not None:
def _get_from_placement_config(placement_config):
total_workers = sum(
len(indices)
for indices in placement_config.placement_bundle_indices)
@ -388,62 +395,65 @@ class RayExecutor(RpcExecutorMixin, GenerationExecutor):
return flat_pgs, flat_indices
bundle_indices = os.getenv("TRTLLM_RAY_BUNDLE_INDICES", None)
if bundle_indices:
def _get_from_env(bundle_indices):
pg = get_current_placement_group()
if pg is not None:
bundle_indices = list(map(int, bundle_indices.split(",")))
assert len(bundle_indices) == self.world_size, (
f"Need {self.world_size} bundle indices for world_size, got {bundle_indices=}"
)
assert len(set(bundle_indices)) == len(bundle_indices), \
assert len(set(bundle_indices)) == len(bundle_indices), (
f"TRTLLM_RAY_BUNDLE_INDICES cannot have duplicate values, but got {bundle_indices=}."
assert max(bundle_indices) < len(pg.bundle_specs), \
f"{bundle_indices=} out of range for PG with {len(pg.bundle_specs)} bundles"
logger.info(
f"Found existing placement group {pg.bundle_specs=}. {bundle_indices=}"
)
# TODO: need to ping TP group onto the same node for RL FW integration case
assert max(bundle_indices) < len(pg.bundle_specs), (
f"{bundle_indices=} out of range for PG with {len(pg.bundle_specs)} bundles"
)
return pg, bundle_indices
else:
logger.warning(
f"Ignoring TRTLLM_RAY_BUNDLE_INDICES={bundle_indices} because no global placement group is found."
)
raise ValueError(f"No global placement group is found.")
if self.world_size % tp_size:
raise ValueError("world_size must be a multiple of tp_size")
def _get_default(tp_size):
head_tag = f"node:{self.master_address}"
nodes = ray.nodes()
gpus_per_node = int(nodes[0]["Resources"].get(
"GPU", 0)) # assume symmetric across nodes
head_tag = f"node:{self.master_address}"
nodes = ray.nodes()
gpus_per_node = int(nodes[0]["Resources"].get(
"GPU", 0)) # assume symmetric across nodes
bundle_cpu = bundle_gpu = min(tp_size, gpus_per_node)
bundle_cpu = bundle_gpu = min(tp_size, gpus_per_node)
bundles, bundle_indices = [], []
current = 0
for rank in range(self.world_size):
if current == 0:
bundle = {"GPU": bundle_gpu, "CPU": bundle_cpu}
if len(bundles) == 0:
bundle[
head_tag] = 0.01 # to force placement on head node
bundles.append(bundle)
bundles, bundle_indices = [], []
current = 0
for rank in range(self.world_size):
if current == 0:
bundle = {"GPU": bundle_gpu, "CPU": bundle_cpu}
if len(bundles) == 0:
bundle[head_tag] = 0.01 # to force placement on head node
bundles.append(bundle)
bundle_indices.append(len(bundles) - 1)
current = (current + 1) % bundle_gpu
bundle_indices.append(len(bundles) - 1)
current = (current + 1) % bundle_gpu
strategy = "PACK"
logger.debug(
f"[Strategy={strategy}] Bundles: {bundles} for tp_size: {tp_size} and world_size: {self.world_size}"
)
pg = placement_group(bundles, strategy=strategy)
strategy = "PACK"
logger.debug(
f"[Strategy={strategy}] Bundles: {bundles} for tp_size: {tp_size} and world_size: {self.world_size}"
)
pg = placement_group(bundles, strategy=strategy)
return pg, bundle_indices
return pg, bundle_indices
if self.world_size % tp_size != 0:
raise ValueError(
f"world_size {self.world_size} must be a multiple of tp_size {tp_size}"
)
# path 0
if placement_config and placement_config.placement_groups is not None:
return _get_from_placement_config(placement_config)
# path 1
if bundle_indices := os.getenv("TRTLLM_RAY_BUNDLE_INDICES", None):
return _get_from_env(bundle_indices)
# path 2
return _get_default(tp_size)
@property
def enable_postprocess_parallel(self) -> bool:

View File

@ -1164,7 +1164,7 @@ class AutoDecodingConfig(DecodingBaseConfig):
class RayPlacementConfig(StrictBaseModel):
"""
Configuration for Ray GPU workers placement.
This config is only used with AsyncLLM for RL scenarios.
Currently, this config is only used with AsyncLLM for RL scenarios.
"""
defer_workers_init: bool = Field(
default=False,
@ -1178,8 +1178,11 @@ class RayPlacementConfig(StrictBaseModel):
placement_bundle_indices: Optional[List[List[int]]] = Field(
default=None,
description="List of bundle indices for each placement group. "
"Outer list corresponds to placement_groups, inner list contains bundle indices for that group."
description=
"List of lists of bundle indices. The outer list corresponds to "
"`placement_groups`. Each inner list specifies the bundle indices to use within "
"that placement group. For example, if `placement_groups=[pg1, pg2]`, "
"`[[0, 1], [0, 1]]` assigns bundles 0 and 1 from `pg1` and bundles 0 and 1 from `pg2`."
)
per_worker_gpu_share: Optional[float] = Field(
@ -1204,12 +1207,15 @@ class RayPlacementConfig(StrictBaseModel):
f"placement_groups length ({len(self.placement_groups)}) must equal "
f"placement_bundle_indices length ({len(self.placement_bundle_indices)})"
)
if PlacementGroup is not None:
for i, pg in enumerate(self.placement_groups):
if not isinstance(pg, PlacementGroup):
raise TypeError(
f"placement_groups[{i}] must be a Ray PlacementGroup, "
f"got {type(pg).__name__}")
if PlacementGroup is None:
raise ValueError(
"Ray must be installed to use `placement_groups`")
for i, pg in enumerate(self.placement_groups):
if not isinstance(pg, PlacementGroup):
raise TypeError(
f"placement_groups[{i}] must be a Ray PlacementGroup, "
f"got {type(pg).__name__}")
if self.per_worker_gpu_share is not None:
if not (0 < self.per_worker_gpu_share <= 1.0):