mirror of
https://github.com/vllm-project/vllm.git
synced 2026-06-06 00:16:14 +00:00
[KVConnector] Update connector tests for single packed K/V region
With K/V packed into a single contiguous region per block, the NIXL and Mooncake transfer paths register one region per layer and coalesce block transfers instead of emitting separate K/V halves. Update the unit tests to match: detect the 4D blocks-first layout, expect one entry per tensor, and expect coalesced (non-split) block transfers. Co-authored-by: Claude Signed-off-by: Lucas Wilkinson <lwilkins@redhat.com>
This commit is contained in:
@@ -369,26 +369,21 @@ async def test_kv_producer(monkeypatch):
|
||||
with patch.object(
|
||||
prefill_worker, "_send_blocks", return_value=0
|
||||
) as mock_send_blocks:
|
||||
# With blocks-first layout, each block is virtually split
|
||||
# into K and V halves, producing non-coalesced transfers.
|
||||
kv_half = block_len // 2
|
||||
|
||||
def expected_split_transfers(src_base, dst_base, src_blocks, dst_blocks):
|
||||
"""Build expected (src_ptrs, dst_ptrs, lengths) for
|
||||
virtual-split K/V transfers."""
|
||||
src_ptrs, dst_ptrs, lengths = [], [], []
|
||||
for kv_offset in (0, kv_half):
|
||||
for sb, db in zip(src_blocks, dst_blocks):
|
||||
src_ptrs.append(src_base + sb * block_len + kv_offset)
|
||||
dst_ptrs.append(dst_base + db * block_len + kv_offset)
|
||||
lengths.append(kv_half)
|
||||
return src_ptrs, dst_ptrs, lengths
|
||||
# Under the standardized blocks-first layout K and V are packed
|
||||
# into a single contiguous region per block. Adjacent blocks are
|
||||
# coalesced into a single larger transfer; all cases below pass
|
||||
# a single contiguous run.
|
||||
def expected_transfers(src_base, dst_base, src_blocks, dst_blocks):
|
||||
n = len(src_blocks)
|
||||
return (
|
||||
[src_base + src_blocks[0] * block_len],
|
||||
[dst_base + dst_blocks[0] * block_len],
|
||||
[n * block_len],
|
||||
)
|
||||
|
||||
# Normal case: 2 blocks to 2 blocks
|
||||
await prefill_worker.send_kv_to_decode(identity, mock_socket, xfer_meta)
|
||||
src, dst, lens = expected_split_transfers(
|
||||
0x1000, 0x2000, [10, 11], [20, 21]
|
||||
)
|
||||
src, dst, lens = expected_transfers(0x1000, 0x2000, [10, 11], [20, 21])
|
||||
mock_send_blocks.assert_called_once_with(
|
||||
"consumer-host:54321",
|
||||
src,
|
||||
@@ -420,7 +415,7 @@ async def test_kv_producer(monkeypatch):
|
||||
# Worker processes the consumer's request
|
||||
await prefill_worker.send_kv_to_decode(identity, mock_socket, xfer_meta)
|
||||
# Verify transfer parameters are correct: 11 to 20
|
||||
src, dst, lens = expected_split_transfers(0x1000, 0x2000, [11], [20])
|
||||
src, dst, lens = expected_transfers(0x1000, 0x2000, [11], [20])
|
||||
mock_send_blocks.assert_called_once_with(
|
||||
"consumer-host:54321",
|
||||
src,
|
||||
@@ -806,47 +801,34 @@ async def test_kv_producer_heterogeneous_tp(monkeypatch, d_tp_size):
|
||||
flat_remote = [b for g in remote_block_ids for b in g]
|
||||
num_blocks = len(flat_local)
|
||||
|
||||
# With blocks-first layout, virtual split halves block
|
||||
# lengths and doubles transfer regions (K + V).
|
||||
local_kv_block_len = local_block_len // 2
|
||||
remote_kv_block_len = remote_block_len // 2
|
||||
# Under the standardized blocks-first layout K and V are
|
||||
# already packed into a single contiguous region per block,
|
||||
# so _expand_transfer_regions emits one region per layer.
|
||||
assert len(src_ptrs) == num_blocks
|
||||
assert len(dst_ptrs) == num_blocks
|
||||
assert len(lengths) == num_blocks
|
||||
|
||||
assert len(src_ptrs) == 2 * num_blocks
|
||||
assert len(dst_ptrs) == 2 * num_blocks
|
||||
assert len(lengths) == 2 * num_blocks
|
||||
|
||||
# Compute expected offsets using kv_block_len
|
||||
if d_tp_size <= P_TP_SIZE:
|
||||
tp_ratio = P_TP_SIZE // d_tp_size
|
||||
expected_src_off = 0
|
||||
expected_dst_off = (P_TP_RANK % tp_ratio) * local_kv_block_len
|
||||
expected_xfer_len = local_kv_block_len
|
||||
expected_dst_off = (P_TP_RANK % tp_ratio) * local_block_len
|
||||
expected_xfer_len = local_block_len
|
||||
else:
|
||||
ratio_abs = d_tp_size // P_TP_SIZE
|
||||
expected_src_off = (d_rank % ratio_abs) * remote_kv_block_len
|
||||
expected_src_off = (d_rank % ratio_abs) * remote_block_len
|
||||
expected_dst_off = 0
|
||||
expected_xfer_len = remote_kv_block_len
|
||||
expected_xfer_len = remote_block_len
|
||||
|
||||
# First num_blocks entries are K region,
|
||||
# next num_blocks are V region.
|
||||
for region_idx in range(2):
|
||||
local_region_base = 0x1000 + region_idx * local_kv_block_len
|
||||
remote_region_base = 0x2000 + region_idx * remote_kv_block_len
|
||||
for blk_idx, (lblk, rblk) in enumerate(
|
||||
zip(flat_local, flat_remote)
|
||||
):
|
||||
idx = region_idx * num_blocks + blk_idx
|
||||
assert src_ptrs[idx] == (
|
||||
local_region_base
|
||||
+ lblk * local_block_len
|
||||
+ expected_src_off
|
||||
)
|
||||
assert dst_ptrs[idx] == (
|
||||
remote_region_base
|
||||
+ rblk * remote_block_len
|
||||
+ expected_dst_off
|
||||
)
|
||||
assert lengths[idx] == expected_xfer_len
|
||||
local_region_base = 0x1000
|
||||
remote_region_base = 0x2000
|
||||
for blk_idx, (lblk, rblk) in enumerate(zip(flat_local, flat_remote)):
|
||||
assert src_ptrs[blk_idx] == (
|
||||
local_region_base + lblk * local_block_len + expected_src_off
|
||||
)
|
||||
assert dst_ptrs[blk_idx] == (
|
||||
remote_region_base + rblk * remote_block_len + expected_dst_off
|
||||
)
|
||||
assert lengths[blk_idx] == expected_xfer_len
|
||||
|
||||
# Verify successful response sent back to consumer
|
||||
mock_socket.send_multipart.assert_called_once()
|
||||
|
||||
@@ -1620,8 +1620,10 @@ def test_register_kv_caches(
|
||||
test_shape = backend_cls.get_kv_cache_shape(
|
||||
num_blocks=1, block_size=16, num_kv_heads=1, head_size=1
|
||||
)
|
||||
is_blocks_first = len(test_shape) == 5 and test_shape[0] == 1
|
||||
virtually_split = is_blocks_first and not connector.prefer_cross_layer_blocks
|
||||
is_blocks_first = len(test_shape) == 4 and test_shape[0] == 1
|
||||
# K and V are packed into the content dim for standard attention, so
|
||||
# blocks are not virtually split (only mamba caches are).
|
||||
virtually_split = False
|
||||
|
||||
if connector.prefer_cross_layer_blocks:
|
||||
with set_current_vllm_config(vllm_config):
|
||||
@@ -1681,6 +1683,7 @@ def test_register_kv_caches(
|
||||
unique_tensor.data_ptr(),
|
||||
]
|
||||
expected_num_entries = 2
|
||||
expected_blocks_count = kv_cache_config.num_blocks * 2
|
||||
else:
|
||||
expected_tensor_size = (
|
||||
shared_tensor[0].element_size() * shared_tensor[0].numel()
|
||||
@@ -1692,7 +1695,7 @@ def test_register_kv_caches(
|
||||
unique_tensor[1].data_ptr(),
|
||||
]
|
||||
expected_num_entries = 4
|
||||
expected_blocks_count = kv_cache_config.num_blocks * 4
|
||||
expected_blocks_count = kv_cache_config.num_blocks * 4
|
||||
|
||||
# Execute register_kv_caches
|
||||
connector.register_kv_caches(kv_caches)
|
||||
|
||||
Reference in New Issue
Block a user