mirror of
https://github.com/vllm-project/vllm.git
synced 2026-06-06 00:16:14 +00:00
[gRPC] Add standard gRPC health checking (grpc.health.v1) for Kubernetes native probes (#38016)
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
This commit is contained in:
@@ -4,6 +4,7 @@ Deploying vLLM on Kubernetes is a scalable and efficient way to serve machine le
|
||||
|
||||
- [Deployment with CPUs](#deployment-with-cpus)
|
||||
- [Deployment with GPUs](#deployment-with-gpus)
|
||||
- [Serving with gRPC](#serving-with-grpc)
|
||||
- [Troubleshooting](#troubleshooting)
|
||||
- [Startup Probe or Readiness Probe Failure, container log contains "KeyboardInterrupt: terminated"](#startup-probe-or-readiness-probe-failure-container-log-contains-keyboardinterrupt-terminated)
|
||||
- [Conclusion](#conclusion)
|
||||
@@ -387,6 +388,49 @@ INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
|
||||
|
||||
If the service is correctly deployed, you should receive a response from the vLLM model.
|
||||
|
||||
## Serving with gRPC
|
||||
|
||||
vLLM can serve models over gRPC instead of HTTP by passing the `--grpc` flag. This requires the optional gRPC dependencies:
|
||||
|
||||
```bash
|
||||
pip install vllm[grpc]
|
||||
```
|
||||
|
||||
When using `--grpc`, the server exposes the standard [gRPC Health Checking Protocol](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) (`grpc.health.v1.Health`), which integrates with Kubernetes [native gRPC probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-grpc-liveness-probe) (available since Kubernetes 1.24).
|
||||
|
||||
To deploy with gRPC, change the `vllm serve` command to include `--grpc` and replace `httpGet` probes with `grpc` probes:
|
||||
|
||||
```yaml
|
||||
containers:
|
||||
- name: mistral-7b
|
||||
image: vllm/vllm-openai:latest
|
||||
command: ["/bin/sh", "-c"]
|
||||
args: [
|
||||
"pip install vllm[grpc] && vllm serve mistralai/Mistral-7B-Instruct-v0.3 --grpc --port 50051 --trust-remote-code"
|
||||
]
|
||||
ports:
|
||||
- containerPort: 50051
|
||||
livenessProbe:
|
||||
grpc:
|
||||
port: 50051
|
||||
initialDelaySeconds: 120
|
||||
periodSeconds: 10
|
||||
readinessProbe:
|
||||
grpc:
|
||||
port: 50051
|
||||
initialDelaySeconds: 120
|
||||
periodSeconds: 5
|
||||
```
|
||||
|
||||
!!! note
|
||||
The gRPC health service checks the engine status on every probe. If the engine is unhealthy or the server is shutting down, the probe returns `NOT_SERVING`.
|
||||
|
||||
You can also verify the health service manually with `grpcurl`:
|
||||
|
||||
```bash
|
||||
grpcurl -plaintext localhost:50051 grpc.health.v1.Health/Check
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Startup Probe or Readiness Probe Failure, container log contains "KeyboardInterrupt: terminated"
|
||||
|
||||
@@ -1107,7 +1107,7 @@ setup(
|
||||
# - .buildkite/test-amd.yaml
|
||||
"helion": ["helion==1.0.0"],
|
||||
# Optional deps for gRPC server (vllm serve --grpc)
|
||||
"grpc": ["smg-grpc-servicer[vllm] >= 0.5.0"],
|
||||
"grpc": ["smg-grpc-servicer[vllm] >= 0.5.2"],
|
||||
# Optional deps for OpenTelemetry tracing
|
||||
"otel": [
|
||||
"opentelemetry-sdk>=1.26.0",
|
||||
|
||||
@@ -0,0 +1,143 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
grpc = pytest.importorskip("grpc")
|
||||
health_pb2 = pytest.importorskip("grpc_health.v1.health_pb2")
|
||||
VllmHealthServicer = pytest.importorskip(
|
||||
"smg_grpc_servicer.vllm.health_servicer"
|
||||
).VllmHealthServicer
|
||||
|
||||
SERVING = health_pb2.HealthCheckResponse.SERVING
|
||||
NOT_SERVING = health_pb2.HealthCheckResponse.NOT_SERVING
|
||||
SERVICE_UNKNOWN = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def async_llm():
|
||||
mock = MagicMock()
|
||||
mock.check_health = AsyncMock()
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def context():
|
||||
return MagicMock(spec=grpc.aio.ServicerContext)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def servicer(async_llm):
|
||||
return VllmHealthServicer(async_llm)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def request_msg():
|
||||
msg = MagicMock()
|
||||
msg.service = ""
|
||||
return msg
|
||||
|
||||
|
||||
# -- Check() tests --
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_serving_overall(servicer, request_msg, context, async_llm):
|
||||
request_msg.service = ""
|
||||
response = await servicer.Check(request_msg, context)
|
||||
assert response.status == SERVING
|
||||
async_llm.check_health.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_serving_vllm_service(servicer, request_msg, context, async_llm):
|
||||
request_msg.service = "vllm.grpc.engine.VllmEngine"
|
||||
response = await servicer.Check(request_msg, context)
|
||||
assert response.status == SERVING
|
||||
async_llm.check_health.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_not_serving_engine_errored(
|
||||
servicer, request_msg, context, async_llm
|
||||
):
|
||||
async_llm.check_health = AsyncMock(side_effect=Exception("engine dead"))
|
||||
request_msg.service = ""
|
||||
response = await servicer.Check(request_msg, context)
|
||||
assert response.status == NOT_SERVING
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_not_serving_shutting_down(
|
||||
servicer, request_msg, context, async_llm
|
||||
):
|
||||
servicer.set_not_serving()
|
||||
request_msg.service = ""
|
||||
response = await servicer.Check(request_msg, context)
|
||||
assert response.status == NOT_SERVING
|
||||
async_llm.check_health.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_unknown_service_status(servicer, request_msg, context):
|
||||
request_msg.service = "nonexistent.Service"
|
||||
response = await servicer.Check(request_msg, context)
|
||||
assert response.status == SERVICE_UNKNOWN
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_unknown_service_grpc_code(servicer, request_msg, context):
|
||||
request_msg.service = "fake.Svc"
|
||||
await servicer.Check(request_msg, context)
|
||||
context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
|
||||
context.set_details.assert_called_once()
|
||||
details_arg = context.set_details.call_args[0][0]
|
||||
assert "fake.Svc" in details_arg
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch("smg_grpc_servicer.vllm.health_servicer.logger")
|
||||
async def test_check_logs_exception_on_error(
|
||||
mock_logger, servicer, request_msg, context, async_llm
|
||||
):
|
||||
async_llm.check_health = AsyncMock(side_effect=Exception("engine exploded"))
|
||||
request_msg.service = ""
|
||||
await servicer.Check(request_msg, context)
|
||||
mock_logger.exception.assert_called_once()
|
||||
log_args = mock_logger.exception.call_args
|
||||
assert "service" in str(log_args).lower()
|
||||
|
||||
|
||||
# -- Watch() tests --
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_watch_yields_serving(servicer, request_msg, context, async_llm):
|
||||
request_msg.service = ""
|
||||
watch_iter = servicer.Watch(request_msg, context)
|
||||
first = await anext(watch_iter.__aiter__())
|
||||
assert first.status == SERVING
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_watch_yields_not_serving(servicer, request_msg, context, async_llm):
|
||||
async_llm.check_health = AsyncMock(side_effect=Exception("engine down"))
|
||||
request_msg.service = ""
|
||||
watch_iter = servicer.Watch(request_msg, context)
|
||||
first = await anext(watch_iter.__aiter__())
|
||||
assert first.status == NOT_SERVING
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_watch_unknown_service(servicer, request_msg, context):
|
||||
request_msg.service = "fake.Service"
|
||||
results = []
|
||||
async for response in servicer.Watch(request_msg, context):
|
||||
results.append(response)
|
||||
assert len(results) == 1
|
||||
assert results[0].status == SERVICE_UNKNOWN
|
||||
# Watch returns SERVICE_UNKNOWN in the response body (not as a gRPC error
|
||||
# code) so the stream terminates normally -- unlike Check, which sets
|
||||
# NOT_FOUND on the gRPC context for unknown services.
|
||||
context.set_code.assert_not_called()
|
||||
@@ -26,8 +26,10 @@ import time
|
||||
|
||||
try:
|
||||
import grpc
|
||||
from grpc_health.v1 import health_pb2_grpc
|
||||
from grpc_reflection.v1alpha import reflection
|
||||
from smg_grpc_proto import vllm_engine_pb2, vllm_engine_pb2_grpc
|
||||
from smg_grpc_servicer.vllm.health_servicer import VllmHealthServicer
|
||||
from smg_grpc_servicer.vllm.servicer import VllmEngineServicer
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
@@ -98,9 +100,14 @@ async def serve_grpc(args: argparse.Namespace):
|
||||
# Add servicer to server
|
||||
vllm_engine_pb2_grpc.add_VllmEngineServicer_to_server(servicer, server)
|
||||
|
||||
# Add standard gRPC health service for Kubernetes probes
|
||||
health_servicer = VllmHealthServicer(async_llm)
|
||||
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
|
||||
|
||||
# Enable reflection for grpcurl and other tools
|
||||
service_names = (
|
||||
vllm_engine_pb2.DESCRIPTOR.services_by_name["VllmEngine"].full_name,
|
||||
"grpc.health.v1.Health",
|
||||
reflection.SERVICE_NAME,
|
||||
)
|
||||
reflection.enable_server_reflection(service_names, server)
|
||||
@@ -147,6 +154,10 @@ async def serve_grpc(args: argparse.Namespace):
|
||||
logger.info("Shutting down vLLM gRPC server...")
|
||||
if stats_task is not None:
|
||||
stats_task.cancel()
|
||||
try:
|
||||
health_servicer.set_not_serving()
|
||||
except Exception: # broad: must not prevent server.stop() / shutdown()
|
||||
logger.warning("Failed to set health status to NOT_SERVING", exc_info=True)
|
||||
await server.stop(grace=5.0)
|
||||
logger.info("gRPC server stopped")
|
||||
async_llm.shutdown()
|
||||
|
||||
Reference in New Issue
Block a user