diff --git a/tensorrt_llm/executor/ray_executor.py b/tensorrt_llm/executor/ray_executor.py index 0fc4fa2810..b81ed3ea69 100644 --- a/tensorrt_llm/executor/ray_executor.py +++ b/tensorrt_llm/executor/ray_executor.py @@ -350,22 +350,29 @@ class RayExecutor(RpcExecutorMixin, GenerationExecutor): tp_size: int, worker_kwargs: Dict = None) -> Tuple[Any, List[int]]: """ - Either use the existing placement group from driver script (e.g., in the case of RL FW integration), - or create a default PACK placement group where each bundle has tp_size GPUs. - - When tp_size ≤ GPUs per node, keep one TP group per node. - - When tp_size > GPUs per node, allow a TP group span nodes. - - rank 0 must be put on the driver node + Obtain placement group(s) and bundle indices for workers. + + Priorities: + 1. `ray_placement_config` in `llm_args`. + 2. `TRTLLM_RAY_BUNDLE_INDICES` environment variable (uses current placement group). + 3. Default creation: A PACK placement group where each bundle has `tp_size` GPUs. + - When `tp_size` <= GPUs per node, keep one TP group per node. + - When `tp_size` > GPUs per node, allow a TP group to span nodes. + - rank 0 is forced onto the driver node. Returns: - Tuple of (placement_group(s), bundle_indices) - - placement_group(s) can be a single PlacementGroup or a List[PlacementGroup] - - bundle_indices is always a List[int] + Tuple[Union[PlacementGroup, List[PlacementGroup]], List[int]]: + - placement_group(s): A single `PlacementGroup` (shared by all workers) or a list of `PlacementGroup` (one per worker). + - bundle_indices: A list of bundle indices. + If `placement_group(s)` is a single object, `bundle_indices[i]` maps worker `i` to that bundle in the group. + If `placement_group(s)` is a list, `bundle_indices[i]` maps worker `i` to that bundle in `placement_groups[i]`. """ llm_args = worker_kwargs.get("llm_args") if worker_kwargs else None placement_config = getattr(llm_args, 'ray_placement_config', None) if llm_args else None - if placement_config and placement_config.placement_groups is not None: + + def _get_from_placement_config(placement_config): total_workers = sum( len(indices) for indices in placement_config.placement_bundle_indices) @@ -388,62 +395,65 @@ class RayExecutor(RpcExecutorMixin, GenerationExecutor): return flat_pgs, flat_indices - bundle_indices = os.getenv("TRTLLM_RAY_BUNDLE_INDICES", None) - - if bundle_indices: + def _get_from_env(bundle_indices): pg = get_current_placement_group() if pg is not None: bundle_indices = list(map(int, bundle_indices.split(","))) assert len(bundle_indices) == self.world_size, ( f"Need {self.world_size} bundle indices for world_size, got {bundle_indices=}" ) - assert len(set(bundle_indices)) == len(bundle_indices), \ + assert len(set(bundle_indices)) == len(bundle_indices), ( f"TRTLLM_RAY_BUNDLE_INDICES cannot have duplicate values, but got {bundle_indices=}." - - assert max(bundle_indices) < len(pg.bundle_specs), \ - f"{bundle_indices=} out of range for PG with {len(pg.bundle_specs)} bundles" - - logger.info( - f"Found existing placement group {pg.bundle_specs=}. {bundle_indices=}" ) - - # TODO: need to ping TP group onto the same node for RL FW integration case - + assert max(bundle_indices) < len(pg.bundle_specs), ( + f"{bundle_indices=} out of range for PG with {len(pg.bundle_specs)} bundles" + ) return pg, bundle_indices else: - logger.warning( - f"Ignoring TRTLLM_RAY_BUNDLE_INDICES={bundle_indices} because no global placement group is found." - ) + raise ValueError(f"No global placement group is found.") - if self.world_size % tp_size: - raise ValueError("world_size must be a multiple of tp_size") + def _get_default(tp_size): + head_tag = f"node:{self.master_address}" + nodes = ray.nodes() + gpus_per_node = int(nodes[0]["Resources"].get( + "GPU", 0)) # assume symmetric across nodes - head_tag = f"node:{self.master_address}" - nodes = ray.nodes() - gpus_per_node = int(nodes[0]["Resources"].get( - "GPU", 0)) # assume symmetric across nodes + bundle_cpu = bundle_gpu = min(tp_size, gpus_per_node) - bundle_cpu = bundle_gpu = min(tp_size, gpus_per_node) + bundles, bundle_indices = [], [] + current = 0 + for rank in range(self.world_size): + if current == 0: + bundle = {"GPU": bundle_gpu, "CPU": bundle_cpu} + if len(bundles) == 0: + bundle[ + head_tag] = 0.01 # to force placement on head node + bundles.append(bundle) - bundles, bundle_indices = [], [] - current = 0 - for rank in range(self.world_size): - if current == 0: - bundle = {"GPU": bundle_gpu, "CPU": bundle_cpu} - if len(bundles) == 0: - bundle[head_tag] = 0.01 # to force placement on head node - bundles.append(bundle) + bundle_indices.append(len(bundles) - 1) + current = (current + 1) % bundle_gpu - bundle_indices.append(len(bundles) - 1) - current = (current + 1) % bundle_gpu + strategy = "PACK" + logger.debug( + f"[Strategy={strategy}] Bundles: {bundles} for tp_size: {tp_size} and world_size: {self.world_size}" + ) + pg = placement_group(bundles, strategy=strategy) - strategy = "PACK" - logger.debug( - f"[Strategy={strategy}] Bundles: {bundles} for tp_size: {tp_size} and world_size: {self.world_size}" - ) - pg = placement_group(bundles, strategy=strategy) + return pg, bundle_indices - return pg, bundle_indices + if self.world_size % tp_size != 0: + raise ValueError( + f"world_size {self.world_size} must be a multiple of tp_size {tp_size}" + ) + + # path 0 + if placement_config and placement_config.placement_groups is not None: + return _get_from_placement_config(placement_config) + # path 1 + if bundle_indices := os.getenv("TRTLLM_RAY_BUNDLE_INDICES", None): + return _get_from_env(bundle_indices) + # path 2 + return _get_default(tp_size) @property def enable_postprocess_parallel(self) -> bool: diff --git a/tensorrt_llm/llmapi/llm_args.py b/tensorrt_llm/llmapi/llm_args.py index 3f15252b84..84fdd2657c 100644 --- a/tensorrt_llm/llmapi/llm_args.py +++ b/tensorrt_llm/llmapi/llm_args.py @@ -1164,7 +1164,7 @@ class AutoDecodingConfig(DecodingBaseConfig): class RayPlacementConfig(StrictBaseModel): """ Configuration for Ray GPU workers placement. - This config is only used with AsyncLLM for RL scenarios. + Currently, this config is only used with AsyncLLM for RL scenarios. """ defer_workers_init: bool = Field( default=False, @@ -1178,8 +1178,11 @@ class RayPlacementConfig(StrictBaseModel): placement_bundle_indices: Optional[List[List[int]]] = Field( default=None, - description="List of bundle indices for each placement group. " - "Outer list corresponds to placement_groups, inner list contains bundle indices for that group." + description= + "List of lists of bundle indices. The outer list corresponds to " + "`placement_groups`. Each inner list specifies the bundle indices to use within " + "that placement group. For example, if `placement_groups=[pg1, pg2]`, " + "`[[0, 1], [0, 1]]` assigns bundles 0 and 1 from `pg1` and bundles 0 and 1 from `pg2`." ) per_worker_gpu_share: Optional[float] = Field( @@ -1204,12 +1207,15 @@ class RayPlacementConfig(StrictBaseModel): f"placement_groups length ({len(self.placement_groups)}) must equal " f"placement_bundle_indices length ({len(self.placement_bundle_indices)})" ) - if PlacementGroup is not None: - for i, pg in enumerate(self.placement_groups): - if not isinstance(pg, PlacementGroup): - raise TypeError( - f"placement_groups[{i}] must be a Ray PlacementGroup, " - f"got {type(pg).__name__}") + if PlacementGroup is None: + raise ValueError( + "Ray must be installed to use `placement_groups`") + + for i, pg in enumerate(self.placement_groups): + if not isinstance(pg, PlacementGroup): + raise TypeError( + f"placement_groups[{i}] must be a Ray PlacementGroup, " + f"got {type(pg).__name__}") if self.per_worker_gpu_share is not None: if not (0 < self.per_worker_gpu_share <= 1.0):