diff --git a/examples/rl/rlhf_async_new_apis.py b/examples/rl/rlhf_async_new_apis.py index 41e9ec673fb..a6adc208860 100644 --- a/examples/rl/rlhf_async_new_apis.py +++ b/examples/rl/rlhf_async_new_apis.py @@ -227,9 +227,6 @@ llm_kwargs = dict( attention_backend=ATTN_BACKEND, gpu_memory_utilization=0.75, weight_transfer_config=WeightTransferConfig(backend="nccl"), - # TODO(haosdent): re-enable once #42043 is fixed. Both LLM - # instances must match. - async_scheduling=False, ) llm_kwargs.update(rocm_determinism_kwargs) @@ -371,9 +368,6 @@ llm_v2_kwargs = dict( gpu_memory_utilization=0.75, distributed_executor_backend="ray", attention_backend=ATTN_BACKEND, - # TODO(haosdent): re-enable once #42043 is fixed. Both LLM - # instances must match. - async_scheduling=False, ) llm_v2_kwargs.update(rocm_determinism_kwargs) diff --git a/vllm/v1/core/sched/async_scheduler.py b/vllm/v1/core/sched/async_scheduler.py index 0b3958dbcf5..cb61bcabd3e 100644 --- a/vllm/v1/core/sched/async_scheduler.py +++ b/vllm/v1/core/sched/async_scheduler.py @@ -37,10 +37,11 @@ class AsyncScheduler(Scheduler): def _update_request_with_output( self, request: Request, new_token_ids: list[int] ) -> tuple[list[int], bool]: - if request.discard_latest_async_tokens: - # If the request is force preempted in reset_prefix_cache, we - # should discard the latest async token. - request.discard_latest_async_tokens = False + if request.async_tokens_to_discard > 0: + # The request was force-preempted in reset_prefix_cache; drop one + # stale in-flight async output frame per call until the counter + # is drained. + request.async_tokens_to_discard -= 1 return [], False status_before_update = request.status diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 1b466578892..45a5169a3ef 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1905,10 +1905,14 @@ class Scheduler(SchedulerInterface): while self.running: request = self.running.pop() self._preempt_request(request, timestamp) - # NOTE(zhuohan): For async scheduling, we need to discard the latest - # output token on the fly to avoid a redundant repetitive output token. + # For async scheduling, any output frames already in flight at + # preemption time are now stale and must be discarded when they + # return. num_output_placeholders is exactly that count: 0 if + # the engine has drained (e.g. pause_generation(keep) waited + # for idle), 1 for vanilla async mid-step, or 1 + spec/PP frames + # otherwise. + request.async_tokens_to_discard = request.num_output_placeholders request.num_output_placeholders = 0 - request.discard_latest_async_tokens = True # Clear scheduled request ids cache. Since we are forcing preemption # + resumption in the same step, we must act as if these requests were diff --git a/vllm/v1/request.py b/vllm/v1/request.py index 0d435deb3f0..26cc82fc4a6 100644 --- a/vllm/v1/request.py +++ b/vllm/v1/request.py @@ -139,8 +139,7 @@ class Request: # Used in async scheduling. self.num_output_placeholders = 0 - # Used in forced preemption (reset_prefix_cache) with async scheduling. - self.discard_latest_async_tokens = False + self.async_tokens_to_discard = 0 self.spec_token_ids: list[int] = [] self.num_computed_tokens = 0