diff --git a/tests/v1/kv_connector/unit/test_bidirectional_kv_transfer.py b/tests/v1/kv_connector/unit/test_bidirectional_kv_transfer.py index dc76d61178d..ef092dfb49f 100644 --- a/tests/v1/kv_connector/unit/test_bidirectional_kv_transfer.py +++ b/tests/v1/kv_connector/unit/test_bidirectional_kv_transfer.py @@ -98,7 +98,6 @@ def _make_connector_with_fake_worker( ) worker = connector.connector_worker assert isinstance(worker.nixl_wrapper, FakeNixlWrapper) - worker.nixl_wrapper.set_cycles_before_xfer_done(cycles_before_done) worker.kv_cache_layout = "HND" if do_handshake: remote_agents = worker._nixl_handshake( diff --git a/tests/v1/kv_connector/unit/test_nixl_connector.py b/tests/v1/kv_connector/unit/test_nixl_connector.py index f07a8352e73..1a7c35cacb8 100644 --- a/tests/v1/kv_connector/unit/test_nixl_connector.py +++ b/tests/v1/kv_connector/unit/test_nixl_connector.py @@ -197,13 +197,6 @@ class FakeNixlWrapper: def get_xfer_telemetry(self, handle: int) -> dict: return get_default_xfer_telemetry() - ############################################################ - # Follow are for changing the behavior during testing. - ############################################################ - - def set_cycles_before_xfer_done(self, cycles: int): - """Set the number of cycles before a transfer is considered done.""" - @contextlib.contextmanager def _make_fake_nixl_pkg(): @@ -578,10 +571,7 @@ class TestNixlHandshake: """Test case where multiple xfers are initiated to the same engine. This test triggers the connector to load remote KV for the same - `request_id`. The transfer is not done immediately due to - `set_cycles_before_xfer_done`, so there is a state where there are - multiple transfer states for the same `request_id`, and `get_finished` - should handle it correctly (wait for all transfers to be done). + `request_id`. """ vllm_config = create_vllm_config() @@ -598,7 +588,6 @@ class TestNixlHandshake: ) assert isinstance(connector.connector_worker.nixl_wrapper, FakeNixlWrapper) worker = connector.connector_worker - worker.nixl_wrapper.set_cycles_before_xfer_done(3) # simulate handshake worker.dst_xfer_side_handles = { FakeNixlConnectorWorker.REMOTE_ENGINE_ID: {0: 1} @@ -1304,7 +1293,6 @@ def test_scheduler_kv_connector_stats_aggregation(): # Worker stats with transfer metrics worker_stats = NixlKVConnectorStats() worker_stats.record_transfer(get_default_xfer_telemetry()) - worker_stats.data["remote_tokens"] = [] # Scheduler stats with custom metric (needs dummy transfer to avoid being skipped) scheduler_stats = NixlKVConnectorStats() @@ -1314,7 +1302,6 @@ def test_scheduler_kv_connector_stats_aggregation(): "post_duration": [0], "bytes_transferred": [0], "num_descriptors": [0], - "remote_tokens": [128], } ) @@ -1355,7 +1342,6 @@ def test_scheduler_kv_connector_stats_aggregation(): ).scheduler_stats.kv_connector_stats nixl_stats = final_stats["NixlConnector"] assert nixl_stats.num_successful_transfers == 2 - assert nixl_stats.data["remote_tokens"] == [128] @pytest.mark.parametrize("distributed_executor_backend", ["ray", None]) diff --git a/vllm/distributed/kv_transfer/kv_connector/utils.py b/vllm/distributed/kv_transfer/kv_connector/utils.py index d7a595716f0..fafc1f45724 100644 --- a/vllm/distributed/kv_transfer/kv_connector/utils.py +++ b/vllm/distributed/kv_transfer/kv_connector/utils.py @@ -120,15 +120,12 @@ class KVOutputAggregator: # Use the first worker's kv_connector_stats as accumulator. aggregated_kv_connector_stats = kv_output.kv_connector_stats elif kv_connector_stats := kv_output.kv_connector_stats: - if aggregated_kv_connector_stats is None: - aggregated_kv_connector_stats = kv_connector_stats - else: - assert isinstance( - aggregated_kv_connector_stats, type(kv_connector_stats) - ) - aggregated_kv_connector_stats = ( - aggregated_kv_connector_stats.aggregate(kv_connector_stats) - ) + assert isinstance( + aggregated_kv_connector_stats, type(kv_connector_stats) + ) + aggregated_kv_connector_stats = aggregated_kv_connector_stats.aggregate( + kv_connector_stats + ) # Aggregate kv_connector_worker_meta from all workers. if aggregated_kv_connector_worker_meta is None: diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 331e0684e32..8775e519d99 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -1270,9 +1270,6 @@ def get_dcp_group() -> GroupCoordinator: return _DCP -# kept for backward compatibility -get_context_model_parallel_group = get_dcp_group - _PP: GroupCoordinator | None = None @@ -1840,31 +1837,6 @@ def model_parallel_is_initialized(): _TP_STATE_PATCHED = False -@contextmanager -def patch_tensor_parallel_group(tp_group: GroupCoordinator): - """Patch the tp group temporarily until this function ends. - - This method is for draft workers of speculative decoding to run draft model - with different tp degree from that of target model workers. - - Args: - tp_group (GroupCoordinator): the tp group coordinator - """ - global _TP_STATE_PATCHED - assert not _TP_STATE_PATCHED, "Should not call when it's already patched" - - _TP_STATE_PATCHED = True - old_tp_group = get_tp_group() - global _TP - _TP = tp_group - try: - yield - finally: - # restore the original state - _TP_STATE_PATCHED = False - _TP = old_tp_group - - def get_tensor_model_parallel_world_size() -> int: """Return world size for the tensor model parallel group.""" return get_tp_group().world_size @@ -1875,16 +1847,6 @@ def get_tensor_model_parallel_rank() -> int: return get_tp_group().rank_in_group -def get_decode_context_model_parallel_world_size() -> int: - """Return world size for the decode context model parallel group.""" - return get_dcp_group().world_size - - -def get_decode_context_model_parallel_rank() -> int: - """Return my rank for the decode context model parallel group.""" - return get_dcp_group().rank_in_group - - def get_node_count() -> int: """Return the total number of nodes in the distributed environment.""" assert _NODE_COUNT is not None, "distributed environment is not initialized"