[Refactor] Remove dead code in tests and parallel_state (#41471)

Signed-off-by: yewentao256 <zhyanwentao@126.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
Wentao Ye
2026-06-03 22:32:39 -04:00
committed by GitHub
parent f25952e59b
commit e6018c644a
4 changed files with 7 additions and 63 deletions
@@ -98,7 +98,6 @@ def _make_connector_with_fake_worker(
)
worker = connector.connector_worker
assert isinstance(worker.nixl_wrapper, FakeNixlWrapper)
worker.nixl_wrapper.set_cycles_before_xfer_done(cycles_before_done)
worker.kv_cache_layout = "HND"
if do_handshake:
remote_agents = worker._nixl_handshake(
@@ -197,13 +197,6 @@ class FakeNixlWrapper:
def get_xfer_telemetry(self, handle: int) -> dict:
return get_default_xfer_telemetry()
############################################################
# Follow are for changing the behavior during testing.
############################################################
def set_cycles_before_xfer_done(self, cycles: int):
"""Set the number of cycles before a transfer is considered done."""
@contextlib.contextmanager
def _make_fake_nixl_pkg():
@@ -578,10 +571,7 @@ class TestNixlHandshake:
"""Test case where multiple xfers are initiated to the same engine.
This test triggers the connector to load remote KV for the same
`request_id`. The transfer is not done immediately due to
`set_cycles_before_xfer_done`, so there is a state where there are
multiple transfer states for the same `request_id`, and `get_finished`
should handle it correctly (wait for all transfers to be done).
`request_id`.
"""
vllm_config = create_vllm_config()
@@ -598,7 +588,6 @@ class TestNixlHandshake:
)
assert isinstance(connector.connector_worker.nixl_wrapper, FakeNixlWrapper)
worker = connector.connector_worker
worker.nixl_wrapper.set_cycles_before_xfer_done(3)
# simulate handshake
worker.dst_xfer_side_handles = {
FakeNixlConnectorWorker.REMOTE_ENGINE_ID: {0: 1}
@@ -1304,7 +1293,6 @@ def test_scheduler_kv_connector_stats_aggregation():
# Worker stats with transfer metrics
worker_stats = NixlKVConnectorStats()
worker_stats.record_transfer(get_default_xfer_telemetry())
worker_stats.data["remote_tokens"] = []
# Scheduler stats with custom metric (needs dummy transfer to avoid being skipped)
scheduler_stats = NixlKVConnectorStats()
@@ -1314,7 +1302,6 @@ def test_scheduler_kv_connector_stats_aggregation():
"post_duration": [0],
"bytes_transferred": [0],
"num_descriptors": [0],
"remote_tokens": [128],
}
)
@@ -1355,7 +1342,6 @@ def test_scheduler_kv_connector_stats_aggregation():
).scheduler_stats.kv_connector_stats
nixl_stats = final_stats["NixlConnector"]
assert nixl_stats.num_successful_transfers == 2
assert nixl_stats.data["remote_tokens"] == [128]
@pytest.mark.parametrize("distributed_executor_backend", ["ray", None])
@@ -120,15 +120,12 @@ class KVOutputAggregator:
# Use the first worker's kv_connector_stats as accumulator.
aggregated_kv_connector_stats = kv_output.kv_connector_stats
elif kv_connector_stats := kv_output.kv_connector_stats:
if aggregated_kv_connector_stats is None:
aggregated_kv_connector_stats = kv_connector_stats
else:
assert isinstance(
aggregated_kv_connector_stats, type(kv_connector_stats)
)
aggregated_kv_connector_stats = (
aggregated_kv_connector_stats.aggregate(kv_connector_stats)
)
assert isinstance(
aggregated_kv_connector_stats, type(kv_connector_stats)
)
aggregated_kv_connector_stats = aggregated_kv_connector_stats.aggregate(
kv_connector_stats
)
# Aggregate kv_connector_worker_meta from all workers.
if aggregated_kv_connector_worker_meta is None:
-38
View File
@@ -1270,9 +1270,6 @@ def get_dcp_group() -> GroupCoordinator:
return _DCP
# kept for backward compatibility
get_context_model_parallel_group = get_dcp_group
_PP: GroupCoordinator | None = None
@@ -1840,31 +1837,6 @@ def model_parallel_is_initialized():
_TP_STATE_PATCHED = False
@contextmanager
def patch_tensor_parallel_group(tp_group: GroupCoordinator):
"""Patch the tp group temporarily until this function ends.
This method is for draft workers of speculative decoding to run draft model
with different tp degree from that of target model workers.
Args:
tp_group (GroupCoordinator): the tp group coordinator
"""
global _TP_STATE_PATCHED
assert not _TP_STATE_PATCHED, "Should not call when it's already patched"
_TP_STATE_PATCHED = True
old_tp_group = get_tp_group()
global _TP
_TP = tp_group
try:
yield
finally:
# restore the original state
_TP_STATE_PATCHED = False
_TP = old_tp_group
def get_tensor_model_parallel_world_size() -> int:
"""Return world size for the tensor model parallel group."""
return get_tp_group().world_size
@@ -1875,16 +1847,6 @@ def get_tensor_model_parallel_rank() -> int:
return get_tp_group().rank_in_group
def get_decode_context_model_parallel_world_size() -> int:
"""Return world size for the decode context model parallel group."""
return get_dcp_group().world_size
def get_decode_context_model_parallel_rank() -> int:
"""Return my rank for the decode context model parallel group."""
return get_dcp_group().rank_in_group
def get_node_count() -> int:
"""Return the total number of nodes in the distributed environment."""
assert _NODE_COUNT is not None, "distributed environment is not initialized"