[None][chore] Introduceing an abstract WaitingQueue interface to decouple the request scheduling logic from specific queue implementations (#11330)

Signed-off-by: Lanyu Liao <lancelly@users.noreply.github.com>
Signed-off-by: Lance Liao <108499334+lancelly@users.noreply.github.com>
Co-authored-by: Lanyu Liao <lancelly@users.noreply.github.com>
This commit is contained in:
Liao Lanyu 2026-02-12 09:18:24 +08:00 committed by GitHub
parent 2c4a4c7b94
commit 58165d5394
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 767 additions and 339 deletions

View File

@ -11,12 +11,13 @@ from tensorrt_llm._torch.models.modeling_utils import \
from tensorrt_llm._utils import (confidential_compute_enabled,
str_dtype_to_binding, torch_dtype_to_str)
from tensorrt_llm.bindings.executor import DecodingMode
from tensorrt_llm.llmapi.llm_args import (CacheTransceiverConfig,
EagleDecodingConfig, KvCacheConfig,
MTPDecodingConfig, PeftCacheConfig,
SamplerType, SchedulerConfig,
SparseAttentionConfig,
SpeculativeConfig, TorchLlmArgs)
# isort: off
from tensorrt_llm.llmapi.llm_args import (
CacheTransceiverConfig, EagleDecodingConfig, KvCacheConfig,
MTPDecodingConfig, PeftCacheConfig, SamplerType, SchedulerConfig,
SparseAttentionConfig, SpeculativeConfig, TorchLlmArgs, WaitingQueuePolicy)
# isort: on
from tensorrt_llm.logger import logger
from tensorrt_llm.lora_helper import (LoraConfig,
get_default_trtllm_modules_to_hf_modules)
@ -1006,6 +1007,9 @@ def create_py_executor_instance(
kv_cache_transceiver = create_kv_cache_transceiver(
mapping, dist, kv_cache_manager, attention_type,
cache_transceiver_config)
waiting_queue_policy = (scheduler_config.waiting_queue_policy
if scheduler_config is not None else
WaitingQueuePolicy.FCFS)
return PyExecutor(
resource_manager,
scheduler,
@ -1029,7 +1033,8 @@ def create_py_executor_instance(
max_seq_len=max_seq_len,
peft_cache_config=peft_cache_config,
virtual_memory_pools=virtual_memory_pools,
execution_stream=execution_stream)
execution_stream=execution_stream,
waiting_queue_policy=waiting_queue_policy)
def create_torch_sampler_args(

View File

@ -5,7 +5,6 @@ import os
import threading
import time
import traceback
from collections import deque
from contextlib import contextmanager
from enum import IntEnum
from queue import Queue
@ -32,7 +31,7 @@ from tensorrt_llm.bindings.executor import (DisServingRequestStats,
StaticBatchingStats)
from tensorrt_llm.bindings.internal.batch_manager import (LlmRequestType,
ReqIdsSet)
from tensorrt_llm.llmapi.llm_args import PeftCacheConfig
from tensorrt_llm.llmapi.llm_args import PeftCacheConfig, WaitingQueuePolicy
from tensorrt_llm.logger import logger
from tensorrt_llm.mapping import CpType
from tensorrt_llm.runtime.generation import CUASSERT
@ -63,7 +62,8 @@ from .resource_manager import (ResourceManager, ResourceManagerType,
from .sampler import (AsyncWorkerMixin, Sampler, SamplerEvent, SampleState,
SampleStateTensors, TRTLLMSampler)
from .scheduler import (RequestScheduler, ScheduledRequests,
SerializableSchedulerOutput)
SerializableSchedulerOutput, WaitingQueue,
create_waiting_queue)
# Environment variable to specify iteration ranges for profiling start/stop.
# Format: "start1-stop1,start2-stop2,..." or single iterations "iter1,iter2,..."
@ -253,30 +253,32 @@ class PyExecutor:
# 1024 in-flight micro batches can avoid synchronization in most cases and keep host memory usage low.
MIN_ASYNC_MICRO_BATCH_NUM = 1024
def __init__(self,
resource_manager,
scheduler: RequestScheduler,
model_engine: ModelEngine,
sampler: Sampler,
dist: Distributed,
max_num_sequences: int,
drafter: Optional[Drafter] = None,
disable_overlap_scheduler: bool = False,
max_input_len: int = 0x7fffffff,
max_batch_size: int = 8,
max_beam_width: int = 1,
max_draft_len: int = 0,
max_total_draft_tokens: int = 0,
kv_cache_transceiver: Optional[KvCacheTransceiver] = None,
guided_decoder: Optional[GuidedDecoder] = None,
garbage_collection_gen0_threshold: Optional[int] = None,
start_worker: bool = True,
kv_connector_manager: Optional[KvCacheConnectorManager] = None,
max_seq_len: Optional[int] = None,
peft_cache_config: Optional[PeftCacheConfig] = None,
virtual_memory_pools: Optional[dict] = None,
hang_detection_timeout: Optional[int] = None,
execution_stream: Optional[torch.cuda.Stream] = None):
def __init__(
self,
resource_manager,
scheduler: RequestScheduler,
model_engine: ModelEngine,
sampler: Sampler,
dist: Distributed,
max_num_sequences: int,
drafter: Optional[Drafter] = None,
disable_overlap_scheduler: bool = False,
max_input_len: int = 0x7fffffff,
max_batch_size: int = 8,
max_beam_width: int = 1,
max_draft_len: int = 0,
max_total_draft_tokens: int = 0,
kv_cache_transceiver: Optional[KvCacheTransceiver] = None,
guided_decoder: Optional[GuidedDecoder] = None,
garbage_collection_gen0_threshold: Optional[int] = None,
start_worker: bool = True,
kv_connector_manager: Optional[KvCacheConnectorManager] = None,
max_seq_len: Optional[int] = None,
peft_cache_config: Optional[PeftCacheConfig] = None,
virtual_memory_pools: Optional[dict] = None,
hang_detection_timeout: Optional[int] = None,
execution_stream: Optional[torch.cuda.Stream] = None,
waiting_queue_policy: WaitingQueuePolicy = WaitingQueuePolicy.FCFS):
super(PyExecutor, self).__init__()
self.device_id = torch.cuda.current_device()
self.global_rank = dist.rank
@ -474,7 +476,8 @@ class PyExecutor:
self.hang_detector)
# Waiting queue for requests that have been fetched but not yet scheduled
self.waiting_queue: deque[RequestQueueItem] = deque()
self.waiting_queue: WaitingQueue = create_waiting_queue(
waiting_queue_policy)
self.control_request_barrier = threading.Event()
self.control_action_done = threading.Event()
@ -2233,8 +2236,7 @@ class PyExecutor:
self.model_engine.model.lm_head.num_embeddings):
raise ValueError("Token ID out of range")
def _fetch_and_enqueue_requests(self,
waiting_queue: deque[RequestQueueItem],
def _fetch_and_enqueue_requests(self, waiting_queue: WaitingQueue,
total_num_active_requests: int) -> None:
"""Fetch requests from request_queue and enqueue to waiting_queue."""
# Block new requests while control requests are pending
@ -2277,11 +2279,11 @@ class PyExecutor:
> 1) and self.dist.rank > 0:
attach_py_objects_to_requests(new_requests, py_request_objects)
waiting_queue.extend(new_requests)
waiting_queue.add_requests(new_requests)
def _pop_from_waiting_queue(
self,
waiting_queue: deque[RequestQueueItem],
waiting_queue: WaitingQueue,
total_num_active_requests: int,
all_ranks_num_active_requests: Optional[List[int]] = None
) -> List[RequestQueueItem]:
@ -2302,7 +2304,7 @@ class PyExecutor:
@nvtx_range("_fetch_new_requests")
def _fetch_new_requests(
self, waiting_queue: deque[RequestQueueItem],
self, waiting_queue: WaitingQueue,
activate_requests: List[LlmRequest]) -> List[LlmRequest]:
"""Fetch new requests and return LlmRequests ready for execution."""
# 1. Gather info and calculate total_num_active_requests
@ -3039,8 +3041,7 @@ class PyExecutor:
canceled_req_ids_set = set(self.canceled_req_ids)
# Remove canceled requests from the waiting queue
self.waiting_queue = deque(req for req in self.waiting_queue
if req.id not in canceled_req_ids_set)
self.waiting_queue.remove_by_ids(canceled_req_ids_set)
still_pending_canceled_ids = []
for request in self.active_requests:

View File

@ -2,8 +2,11 @@
import heapq
import os
from collections import deque, namedtuple
from typing import Any, Dict, List, Optional, Tuple
from collections import namedtuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
if TYPE_CHECKING:
from .scheduler import WaitingQueue
import torch
@ -246,7 +249,7 @@ def can_process_attention_dp_request(
def get_from_waiting_queue(
waiting_queue: deque,
waiting_queue: "WaitingQueue",
max_req_count: int,
enable_attention_dp: bool,
max_num_active_requests: int,
@ -277,11 +280,11 @@ def get_from_waiting_queue(
)
while req_count < max_req_count and waiting_queue:
req_item = waiting_queue[0]
req_item = waiting_queue.peek_request()
num_children = len(req_item.child_req_ids) if req_item.child_req_ids else 0
if (req_count + 1 + num_children) > max_req_count:
break
req_item = waiting_queue.popleft()
req_item = waiting_queue.pop_request()
can_process = (
can_process_attention_dp_request(
@ -299,7 +302,7 @@ def get_from_waiting_queue(
# Put the pending requests back to the waiting queue
# All ranks should have the same waiting queue
waiting_queue.extendleft(reversed(pending_requests))
waiting_queue.prepend_requests(reversed(pending_requests))
return items

View File

@ -0,0 +1,64 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Scheduler module for TensorRT-LLM PyExecutor.
This module contains:
- Request schedulers (capacity, micro-batch, unified)
- Waiting queues (FCFS)
"""
# Re-export from scheduler.py
from .scheduler import (
BindCapacityScheduler,
BindMicroBatchScheduler,
CapacityScheduler,
KVCacheV2DummyScheduler,
MicroBatchScheduler,
PyCapacityScheduler,
PyMicroBatchScheduler,
RequestList,
RequestScheduler,
ScheduledRequests,
SchedulerOutput,
SerializableSchedulerOutput,
SimpleScheduler,
SimpleUnifiedScheduler,
)
# Re-export from waiting_queue.py
from .waiting_queue import FCFSWaitingQueue, WaitingQueue, create_waiting_queue
__all__ = [
# Schedulers
"BindCapacityScheduler",
"BindMicroBatchScheduler",
"CapacityScheduler",
"KVCacheV2DummyScheduler",
"MicroBatchScheduler",
"PyCapacityScheduler",
"PyMicroBatchScheduler",
"RequestList",
"RequestScheduler",
"ScheduledRequests",
"SchedulerOutput",
"SerializableSchedulerOutput",
"SimpleScheduler",
"SimpleUnifiedScheduler",
# Waiting queues
"FCFSWaitingQueue",
"WaitingQueue",
"create_waiting_queue",
]

View File

@ -0,0 +1,134 @@
from abc import ABC, abstractmethod
from collections import deque
from collections.abc import Iterable, Iterator
from typing import Callable, Optional
from tensorrt_llm.llmapi.llm_args import WaitingQueuePolicy
from ..executor_request_queue import RequestQueueItem
class WaitingQueue(ABC):
"""Abstract base class for waiting queues."""
@abstractmethod
def add_request(self, request: RequestQueueItem) -> None:
"""Add a request to the queue according to the policy."""
pass
@abstractmethod
def add_requests(self, requests: Iterable[RequestQueueItem]) -> None:
"""Add multiple requests to the queue according to the policy."""
pass
@abstractmethod
def pop_request(self) -> RequestQueueItem:
"""Pop a request from the queue according to the policy."""
pass
@abstractmethod
def peek_request(self) -> RequestQueueItem:
"""Peek at the request at the front of the queue without removing it."""
pass
@abstractmethod
def prepend_request(self, request: RequestQueueItem) -> None:
"""Prepend a request to the front of the queue."""
pass
@abstractmethod
def prepend_requests(self, requests: Iterable[RequestQueueItem]) -> None:
"""Prepend all requests from another iterable to the front of this queue."""
pass
@abstractmethod
def remove_by_ids(self, request_ids: set[int]) -> None:
"""Remove requests with the given IDs."""
pass
@abstractmethod
def __bool__(self) -> bool:
"""Check if queue has any requests."""
pass
@abstractmethod
def __len__(self) -> int:
"""Get number of requests in queue."""
pass
@abstractmethod
def __iter__(self) -> Iterator[RequestQueueItem]:
"""Iterate over the queue according to the policy."""
pass
class FCFSWaitingQueue(deque, WaitingQueue):
"""A first-come-first-served queue that supports deque operations."""
def add_request(self, request: RequestQueueItem) -> None:
"""Add a request to the queue according to FCFS policy."""
self.append(request)
def add_requests(self, requests: Iterable[RequestQueueItem]) -> None:
"""Add multiple requests to the queue according to FCFS policy."""
self.extend(requests)
def pop_request(self) -> RequestQueueItem:
"""Pop a request from the queue according to FCFS policy."""
return self.popleft()
def peek_request(self) -> RequestQueueItem:
"""Peek at the next request in the queue without removing it."""
if not self:
raise IndexError("peek from an empty queue")
return self[0]
def prepend_request(self, request: RequestQueueItem) -> None:
"""Prepend a request to the front of the queue."""
self.appendleft(request)
def prepend_requests(self, requests: Iterable[RequestQueueItem]) -> None:
"""Prepend all requests from another iterable to the front of this queue.
Note: The requests will be prepended in reverse order of their
appearance in the `requests` iterable.
"""
self.extendleft(requests)
def remove_by_ids(self, request_ids: set[int]) -> None:
"""Remove requests with the given IDs."""
filtered_requests = [req for req in self if req.id not in request_ids]
self.clear()
self.extend(filtered_requests)
def __bool__(self) -> bool:
"""Check if queue has any requests."""
return len(self) > 0
def __len__(self) -> int:
"""Get number of requests in queue."""
return super().__len__()
def __iter__(self) -> Iterator[RequestQueueItem]:
"""Iterate over the queue according to FCFS policy."""
return super().__iter__()
def create_waiting_queue(
policy: WaitingQueuePolicy = WaitingQueuePolicy.FCFS,
priority_fn: Optional[Callable[[RequestQueueItem], float]] = None,
) -> WaitingQueue:
"""Create a waiting queue based on the specified policy.
Args:
policy: The scheduling policy to use. Currently only FCFS is supported.
priority_fn: Reserved for future use.
Returns:
A WaitingQueue instance.
"""
# Currently only FCFS is implemented
if policy == WaitingQueuePolicy.FCFS:
return FCFSWaitingQueue()
else:
raise ValueError(f"Unsupported waiting queue policy: {policy}")

View File

@ -1487,6 +1487,12 @@ class ContextChunkingPolicy(StrEnum, metaclass=PybindMirrorEnumMeta):
return getattr(_ContextChunkingPolicy, self.value)
class WaitingQueuePolicy(StrEnum):
"""Waiting queue scheduling policy for managing pending requests."""
FCFS = "fcfs" # First-Come-First-Served
@PybindMirror.mirror_pybind_fields(_DynamicBatchConfig)
class DynamicBatchConfig(StrictBaseModel, PybindMirror):
"""Dynamic batch configuration.
@ -1525,6 +1531,10 @@ class SchedulerConfig(StrictBaseModel, PybindMirror):
dynamic_batch_config: Optional[DynamicBatchConfig] = Field(
default=None, description="The dynamic batch config to use")
waiting_queue_policy: WaitingQueuePolicy = Field(
default=WaitingQueuePolicy.FCFS,
description="The waiting queue scheduling policy")
def _to_pybind(self):
return _SchedulerConfig(
capacity_scheduler_policy=self.capacity_scheduler_policy._to_pybind(

View File

@ -9,7 +9,6 @@ to PyExecutor, including:
- expected_num_active_requests tracking
"""
from collections import deque
from unittest.mock import Mock
import pytest
@ -18,6 +17,7 @@ from tensorrt_llm._torch.pyexecutor.executor_request_queue import (
SHUTDOWN_REQUEST_ID,
RequestQueueItem,
)
from tensorrt_llm._torch.pyexecutor.scheduler import FCFSWaitingQueue
class MockPyExecutor:
@ -35,7 +35,7 @@ class MockPyExecutor:
self.is_shutdown = False
self.expected_num_active_requests = 0
self.new_active_requests_queue_latency_ms = 0.0
self.waiting_queue = deque()
self.waiting_queue = FCFSWaitingQueue()
def _handle_special_queue_items(self, new_requests):
"""Handle special signals.
@ -62,13 +62,11 @@ class MockPyExecutor:
def update_waiting_queue(self):
"""Update waiting queue to remove canceled requests.
This method mirrors PyExecutor.update_waiting_queue.
This method mirrors PyExecutor._handle_canceled_requests.
"""
if self.canceled_req_ids:
canceled_set = set(self.canceled_req_ids)
self.waiting_queue = deque(
item for item in self.waiting_queue if item.id not in canceled_set
)
self.waiting_queue.remove_by_ids(canceled_set)
def clear_canceled_req_ids(self):
"""Clear the list of canceled request IDs."""

View File

@ -6,7 +6,6 @@ This module tests:
- Waiting queue functions (get_from_waiting_queue, can_process_attention_dp_request)
"""
from collections import deque
from unittest.mock import Mock, patch
import pytest
@ -20,6 +19,7 @@ from tensorrt_llm._torch.pyexecutor.request_utils import (
merge_requests,
schedule_attention_dp_requests,
)
from tensorrt_llm._torch.pyexecutor.scheduler import FCFSWaitingQueue
from tensorrt_llm.bindings import executor as trtllm
from tensorrt_llm.mapping import CpType
@ -263,7 +263,7 @@ def test_merge_requests_with_helix_cp_config():
def test_get_from_waiting_queue():
"""Test getting items from waiting queue."""
# Add items to waiting queue
waiting_queue = deque()
waiting_queue = FCFSWaitingQueue()
items = [RequestQueueItem(i, Mock()) for i in range(5)]
waiting_queue.extend(items)
@ -291,7 +291,7 @@ def test_get_from_waiting_queue_edge_cases(
):
"""Test edge cases for getting items from waiting queue."""
# Setup queue
waiting_queue = deque()
waiting_queue = FCFSWaitingQueue()
if queue_size > 0:
items = [RequestQueueItem(i, Mock()) for i in range(queue_size)]
waiting_queue.extend(items)
@ -307,7 +307,7 @@ def test_get_from_waiting_queue_edge_cases(
def test_get_from_waiting_queue_with_attention_dp(
attention_dp_config, all_ranks_num_active_requests
):
waiting_queue = deque()
waiting_queue = FCFSWaitingQueue()
items = [RequestQueueItem(i, Mock()) for i in range(5)]
waiting_queue.extend(items)
@ -338,7 +338,8 @@ def test_get_from_waiting_queue_with_attention_dp_filtering(
3, create_mock_request_with_py_schedule_params(attention_dp_rank=None)
) # No scheduling params
waiting_queue = deque([req1, req2, req3])
waiting_queue = FCFSWaitingQueue()
waiting_queue.extend([req1, req2, req3])
# Set rank 0 to full capacity to test filtering
all_ranks_num_active_requests[0] = 8
@ -719,7 +720,8 @@ def test_achieve_max_num_active_requests(attention_dp_config):
req_id += 1
all_ranks_num_active_requests = [5, 6, 3, 7]
waiting_queue = deque(req_list)
waiting_queue = FCFSWaitingQueue()
waiting_queue.extend(req_list)
available_active_requests = max_num_active_requests * 4 - sum(all_ranks_num_active_requests)
result = get_from_waiting_queue(
@ -843,7 +845,7 @@ def test_attention_dp_scheduling_cases(
all_ranks_expected_req_ids,
):
"""Test attention DP scheduling with various scenarios."""
waiting_queue = deque()
waiting_queue = FCFSWaitingQueue()
for rank, relax in request_configs:
append_to_waiting_queue(waiting_queue, rank, relax)

View File

@ -0,0 +1,189 @@
"""Tests for WaitingQueue implementations.
This module tests the waiting queue functionality including:
- FCFSWaitingQueue operations
- WaitingQueue abstract interface
- create_waiting_queue factory function
"""
from unittest.mock import Mock
import pytest
from tensorrt_llm._torch.pyexecutor.executor_request_queue import RequestQueueItem
from tensorrt_llm._torch.pyexecutor.scheduler import (
FCFSWaitingQueue,
WaitingQueue,
create_waiting_queue,
)
from tensorrt_llm.llmapi.llm_args import WaitingQueuePolicy
def create_mock_request_item(request_id: int) -> RequestQueueItem:
"""Create a mock RequestQueueItem for testing."""
mock_request = Mock()
return RequestQueueItem(request_id, mock_request)
class TestFCFSWaitingQueue:
"""Tests for FCFSWaitingQueue."""
def test_add_request(self):
"""Test adding a single request."""
queue = FCFSWaitingQueue()
item = create_mock_request_item(1)
queue.add_request(item)
assert len(queue) == 1
assert queue.peek_request() == item
def test_add_requests(self):
"""Test adding multiple requests."""
queue = FCFSWaitingQueue()
items = [create_mock_request_item(i) for i in range(3)]
queue.add_requests(items)
assert len(queue) == 3
def test_pop_request_fcfs_order(self):
"""Test that pop_request returns requests in FCFS order."""
queue = FCFSWaitingQueue()
items = [create_mock_request_item(i) for i in range(3)]
queue.add_requests(items)
# Should pop in order: 0, 1, 2
assert queue.pop_request().id == 0
assert queue.pop_request().id == 1
assert queue.pop_request().id == 2
def test_pop_from_empty_queue(self):
"""Test that pop_request raises IndexError on empty queue."""
queue = FCFSWaitingQueue()
with pytest.raises(IndexError):
queue.pop_request()
def test_peek_request(self):
"""Test peeking at the front of the queue."""
queue = FCFSWaitingQueue()
items = [create_mock_request_item(i) for i in range(3)]
queue.add_requests(items)
# Peek should return first item without removing it
assert queue.peek_request().id == 0
assert len(queue) == 3 # Size unchanged
def test_peek_from_empty_queue(self):
"""Test that peek_request raises IndexError on empty queue."""
queue = FCFSWaitingQueue()
with pytest.raises(IndexError):
queue.peek_request()
def test_prepend_request(self):
"""Test prepending a request to the front."""
queue = FCFSWaitingQueue()
queue.add_request(create_mock_request_item(1))
queue.add_request(create_mock_request_item(2))
# Prepend item 0 to front
queue.prepend_request(create_mock_request_item(0))
# Should pop in order: 0, 1, 2
assert queue.pop_request().id == 0
assert queue.pop_request().id == 1
assert queue.pop_request().id == 2
def test_prepend_requests(self):
"""Test prepending multiple requests."""
queue = FCFSWaitingQueue()
queue.add_request(create_mock_request_item(3))
# Prepend items [1, 2] - note: extendleft reverses order
queue.prepend_requests([create_mock_request_item(i) for i in [1, 2]])
# After extendleft([1, 2]), order is: 2, 1, 3
assert queue.pop_request().id == 2
assert queue.pop_request().id == 1
assert queue.pop_request().id == 3
def test_remove_by_ids(self):
"""Test removing requests by their IDs."""
queue = FCFSWaitingQueue()
items = [create_mock_request_item(i) for i in range(5)]
queue.add_requests(items)
# Remove items 1 and 3
queue.remove_by_ids({1, 3})
assert len(queue) == 3
remaining_ids = [item.id for item in queue]
assert remaining_ids == [0, 2, 4]
def test_remove_nonexistent_ids(self):
"""Test removing IDs that don't exist (should not raise)."""
queue = FCFSWaitingQueue()
items = [create_mock_request_item(i) for i in range(3)]
queue.add_requests(items)
# Remove IDs that don't exist
queue.remove_by_ids({10, 20})
assert len(queue) == 3
def test_bool_empty_queue(self):
"""Test bool conversion for empty queue."""
queue = FCFSWaitingQueue()
assert not queue
assert bool(queue) is False
def test_bool_nonempty_queue(self):
"""Test bool conversion for non-empty queue."""
queue = FCFSWaitingQueue()
queue.add_request(create_mock_request_item(1))
assert queue
assert bool(queue) is True
def test_len(self):
"""Test length of queue."""
queue = FCFSWaitingQueue()
assert len(queue) == 0
queue.add_request(create_mock_request_item(1))
assert len(queue) == 1
queue.add_requests([create_mock_request_item(i) for i in range(2, 5)])
assert len(queue) == 4
def test_iter(self):
"""Test iteration over queue."""
queue = FCFSWaitingQueue()
items = [create_mock_request_item(i) for i in range(3)]
queue.add_requests(items)
iterated_ids = [item.id for item in queue]
assert iterated_ids == [0, 1, 2]
# Iteration should not consume items
assert len(queue) == 3
def test_is_waiting_queue_subclass(self):
"""Test that FCFSWaitingQueue is a WaitingQueue."""
queue = FCFSWaitingQueue()
assert isinstance(queue, WaitingQueue)
class TestCreateWaitingQueue:
"""Tests for create_waiting_queue factory function."""
def test_create_fcfs_queue(self):
"""Test creating FCFS queue."""
queue = create_waiting_queue(WaitingQueuePolicy.FCFS)
assert isinstance(queue, FCFSWaitingQueue)
def test_create_default_queue(self):
"""Test creating queue with default policy."""
queue = create_waiting_queue()
assert isinstance(queue, FCFSWaitingQueue)