[Rust Frontend] Add /version endpoint using engine-reported value (#43854)

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
This commit is contained in:
Bugen Zhao
2026-05-29 08:32:27 +08:00
committed by GitHub
parent b690b2bb67
commit 1521173c17
17 changed files with 159 additions and 195 deletions
+2 -2
View File
@@ -93,7 +93,7 @@ pub struct ChatLlm {
text: TextLlm,
backend: DynChatBackend,
/// Effective model dtype reported by the engine.
model_dtype: Option<ModelDtype>,
model_dtype: ModelDtype,
/// Tool-call parser selection.
tool_call_parser: ParserSelection,
/// Reasoning parser selection.
@@ -135,7 +135,7 @@ impl ChatLlm {
}
/// Override the effective model dtype used for multimodal tensor encoding.
pub fn with_model_dtype(mut self, model_dtype: Option<ModelDtype>) -> Self {
pub fn with_model_dtype(mut self, model_dtype: ModelDtype) -> Self {
self.model_dtype = model_dtype;
self
}
+2 -12
View File
@@ -12,7 +12,7 @@
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::Path;
use std::sync::{Arc, LazyLock, Once};
use std::sync::{Arc, LazyLock};
use itertools::izip;
use llm_multimodal::{
@@ -239,7 +239,7 @@ pub(crate) async fn finalize_rendered_prompt(
request: &ChatRequest,
rendered: RenderedPrompt,
info: Option<&MultimodalModelInfo>,
model_dtype: Option<ModelDtype>,
model_dtype: ModelDtype,
) -> Result<(Prompt, Option<MmFeatures>)> {
if !request.has_multimodal() {
return Ok((rendered.prompt, None));
@@ -249,16 +249,6 @@ pub(crate) async fn finalize_rendered_prompt(
bail_multimodal!("multimodal chat renderer must return a text prompt before expansion");
};
let media_parts = extract_media_parts(request)?;
let model_dtype = model_dtype.unwrap_or_else(|| {
static WARN_ONCE: Once = Once::new();
WARN_ONCE.call_once(|| {
warn!(
"engine handshake did not report model dtype; \
falling back to float32 for multimodal tensor encoding"
);
});
ModelDtype::Float32
});
let mut prompt_token_ids = info
.context
+23 -21
View File
@@ -290,10 +290,8 @@ impl EngineCoreClient {
// If any engine reported a dp_stats_address in its ready response, use it
// as the external coordinator address.
let dp_stats_address: Option<String> = engines
.iter()
.filter_map(|e| e.ready_response.as_ref())
.find_map(|r| r.dp_stats_address.clone());
let dp_stats_address: Option<String> =
engines.iter().find_map(|engine| engine.ready_response.dp_stats_address.clone());
let (coordinator, coordinator_output_task, coordinator_task) =
if let Some(coordinator_transport) = connected.coordinator {
@@ -368,40 +366,44 @@ impl EngineCoreClient {
/// Return the ready responses received from all engines on the input
/// socket.
pub fn ready_responses(&self) -> Vec<&EngineCoreReadyResponse> {
self.engines
.iter()
.filter_map(|engine| engine.ready_response.as_ref())
.collect()
self.engines.iter().map(|engine| &engine.ready_response).collect()
}
/// Return the engine-reported effective model dtype, when available.
pub fn model_dtype(&self) -> Option<ModelDtype> {
/// Return the engine-reported effective model dtype.
pub fn model_dtype(&self) -> ModelDtype {
self.engines
.iter()
.filter_map(|engine| engine.ready_response.as_ref())
.find_map(|response| response.dtype)
.first()
.expect("engine core client requires at least one engine")
.ready_response
.dtype
}
/// Return the engine-reported Python vLLM version.
pub fn vllm_version(&self) -> &str {
self.engines
.first()
.expect("engine core client requires at least one engine")
.ready_response
.vllm_version
.as_str()
}
/// Return the total number of GPU blocks summed across all connected
/// engines.
pub fn total_num_gpu_blocks(&self) -> u64 {
self.engines
.iter()
.filter_map(|engine| engine.ready_response.as_ref())
.map(|r| r.num_gpu_blocks)
.sum()
self.engines.iter().map(|engine| engine.ready_response.num_gpu_blocks).sum()
}
/// Return the minimum engine-reported `max_model_len` across all engines.
///
/// This is the auto-fitted value after KV cache profiling and may differ
/// from the originally configured value.
pub fn max_model_len(&self) -> Option<u32> {
pub fn max_model_len(&self) -> u32 {
self.engines
.iter()
.filter_map(|e| e.ready_response.as_ref())
.map(|r| r.max_model_len as u32)
.map(|engine| engine.ready_response.max_model_len as u32)
.min()
.expect("engine core client requires at least one engine")
}
/// Get the model name associated with this client used for metrics
@@ -382,6 +382,7 @@ mod tests {
use zeromq::{RouterSocket, Socket};
use super::*;
use crate::mock_engine::default_ready_response;
async fn test_inner() -> ClientInner {
let mut socket = RouterSocket::new();
@@ -392,7 +393,7 @@ mod tests {
"test-model".to_string(),
&[ConnectedEngine {
engine_id: EngineId::from(b"engine-0"),
ready_response: None,
ready_response: default_ready_response(),
}],
)
}
+25 -68
View File
@@ -339,15 +339,20 @@ mod tests {
use super::{EngineRoutingState, RequestRegistry, UtilityRegistry};
use crate::EngineId;
use crate::client::state::EngineLoadSnapshot;
use crate::mock_engine::default_ready_response;
use crate::protocol::{EngineCoreFinishReason, EngineCoreOutput};
use crate::transport::ConnectedEngine;
fn connected_engine(engine_id: EngineId) -> ConnectedEngine {
ConnectedEngine {
engine_id,
ready_response: default_ready_response(),
}
}
#[test]
fn registry_rejects_duplicate_request_ids() {
let mut registry = RequestRegistry::new(&[ConnectedEngine {
engine_id: EngineId::from(b"engine-0"),
ready_response: None,
}]);
let mut registry = RequestRegistry::new(&[connected_engine(EngineId::from(b"engine-0"))]);
registry.register("req-1".to_string(), None).unwrap();
let error = registry.register("req-1".to_string(), None).unwrap_err();
assert!(matches!(
@@ -358,10 +363,7 @@ mod tests {
#[test]
fn registry_removes_finished_request_on_output() {
let mut registry = RequestRegistry::new(&[ConnectedEngine {
engine_id: EngineId::from(b"engine-0"),
ready_response: None,
}]);
let mut registry = RequestRegistry::new(&[connected_engine(EngineId::from(b"engine-0"))]);
registry.register("req-1".to_string(), None).unwrap();
let sender = registry.sender_for_output(&EngineCoreOutput {
@@ -376,10 +378,7 @@ mod tests {
#[test]
fn registry_closes_all_requests_on_failure() {
let mut registry = RequestRegistry::new(&[ConnectedEngine {
engine_id: EngineId::from(b"engine-0"),
ready_response: None,
}]);
let mut registry = RequestRegistry::new(&[connected_engine(EngineId::from(b"engine-0"))]);
registry.register("req-1".to_string(), None).unwrap();
registry.register("req-2".to_string(), None).unwrap();
@@ -394,14 +393,8 @@ mod tests {
let engine_0 = EngineId::from_engine_index(0);
let engine_1 = EngineId::from_engine_index(1);
let mut registry = RequestRegistry::new(&[
ConnectedEngine {
engine_id: engine_0.clone(),
ready_response: None,
},
ConnectedEngine {
engine_id: engine_1.clone(),
ready_response: None,
},
connected_engine(engine_0.clone()),
connected_engine(engine_1.clone()),
]);
let (chosen_0, _) = registry.register("req-1".to_string(), None).unwrap();
let (chosen_1, _) = registry.register("req-2".to_string(), None).unwrap();
@@ -428,14 +421,8 @@ mod tests {
let engine_0 = EngineId::from_engine_index(0);
let engine_1 = EngineId::from_engine_index(1);
let mut registry = RequestRegistry::new(&[
ConnectedEngine {
engine_id: engine_0.clone(),
ready_response: None,
},
ConnectedEngine {
engine_id: engine_1.clone(),
ready_response: None,
},
connected_engine(engine_0.clone()),
connected_engine(engine_1.clone()),
]);
let (chosen_0, _) = registry.register("req-1".to_string(), None).unwrap();
@@ -488,14 +475,8 @@ mod tests {
let engine_0 = EngineId::from_engine_index(0);
let engine_1 = EngineId::from_engine_index(1);
let mut registry = RequestRegistry::new(&[
ConnectedEngine {
engine_id: engine_0.clone(),
ready_response: None,
},
ConnectedEngine {
engine_id: engine_1.clone(),
ready_response: None,
},
connected_engine(engine_0.clone()),
connected_engine(engine_1.clone()),
]);
assert!(registry.apply_scheduler_counts(
@@ -523,18 +504,9 @@ mod tests {
let engine_1 = EngineId::from_engine_index(1);
let engine_2 = EngineId::from_engine_index(2);
let mut registry = RequestRegistry::new(&[
ConnectedEngine {
engine_id: engine_0.clone(),
ready_response: None,
},
ConnectedEngine {
engine_id: engine_1.clone(),
ready_response: None,
},
ConnectedEngine {
engine_id: engine_2.clone(),
ready_response: None,
},
connected_engine(engine_0.clone()),
connected_engine(engine_1.clone()),
connected_engine(engine_2.clone()),
]);
// Explicitly target rank 2 (third engine).
@@ -555,14 +527,8 @@ mod tests {
let engine_0 = EngineId::from_engine_index(0);
let engine_1 = EngineId::from_engine_index(1);
let mut registry = RequestRegistry::new(&[
ConnectedEngine {
engine_id: engine_0.clone(),
ready_response: None,
},
ConnectedEngine {
engine_id: engine_1.clone(),
ready_response: None,
},
connected_engine(engine_0.clone()),
connected_engine(engine_1.clone()),
]);
// Load-balance: first two go to engine_0 and engine_1.
@@ -577,14 +543,8 @@ mod tests {
#[test]
fn register_with_out_of_range_rank_returns_error() {
let mut registry = RequestRegistry::new(&[
ConnectedEngine {
engine_id: EngineId::from_engine_index(0),
ready_response: None,
},
ConnectedEngine {
engine_id: EngineId::from_engine_index(1),
ready_response: None,
},
connected_engine(EngineId::from_engine_index(0)),
connected_engine(EngineId::from_engine_index(1)),
]);
let error = registry.register("req-1".to_string(), Some(2)).unwrap_err();
@@ -600,10 +560,7 @@ mod tests {
#[test]
fn register_with_rank_on_single_engine_only_accepts_zero() {
let engine_0 = EngineId::from_engine_index(0);
let mut registry = RequestRegistry::new(&[ConnectedEngine {
engine_id: engine_0.clone(),
ready_response: None,
}]);
let mut registry = RequestRegistry::new(&[connected_engine(engine_0.clone())]);
let (chosen, _) = registry.register("req-ok".to_string(), Some(0)).unwrap();
assert_eq!(chosen, engine_0);
@@ -47,7 +47,8 @@ pub fn default_ready_response() -> EngineCoreReadyResponse {
max_model_len: DEFAULT_MOCK_MAX_MODEL_LEN,
num_gpu_blocks: DEFAULT_MOCK_NUM_GPU_BLOCKS,
dp_stats_address: None,
dtype: Some(ModelDtype::Float32),
dtype: ModelDtype::Float32,
vllm_version: "test-vllm-version".to_string(),
}
}
@@ -39,10 +39,9 @@ pub struct EngineCoreReadyResponse {
/// DP coordinator stats publish address, if applicable.
pub dp_stats_address: Option<String>,
/// Effective model dtype after Python vLLM resolves `--dtype`.
// TODO: This is currently not wired up on the engine side. After it's added, remove `Option`
// and `serde(default)`.
#[serde(default)]
pub dtype: Option<ModelDtype>,
pub dtype: ModelDtype,
/// Python vLLM version reported by the engine process.
pub vllm_version: String,
}
/// Frontend-owned ZMQ addresses that are sent to the engine during startup
@@ -69,22 +68,3 @@ pub struct HandshakeInitMessage {
pub addresses: HandshakeAddresses,
pub parallel_config: BTreeMap<String, OpaqueValue>,
}
#[cfg(test)]
mod tests {
use super::EngineCoreReadyResponse;
use crate::protocol::ModelDtype;
#[test]
fn ready_response_accepts_effective_dtype() {
let response: EngineCoreReadyResponse = serde_json::from_value(serde_json::json!({
"max_model_len": 4096,
"num_gpu_blocks": 2,
"dp_stats_address": null,
"dtype": "bfloat16"
}))
.unwrap();
assert_eq!(response.dtype, Some(ModelDtype::BFloat16));
}
}
@@ -10,9 +10,9 @@ use crate::EngineId;
pub use crate::mock_engine::{MockCoordinatorSockets, MockEngineSockets};
use crate::mock_engine::{
MockEngineConfig, MockEngineDataSockets, connect_to_bootstrapped_frontend, connect_to_frontend,
default_ready_response,
};
use crate::protocol::ModelDtype;
use crate::protocol::handshake::{EngineCoreReadyResponse, HandshakeInitMessage};
use crate::protocol::handshake::HandshakeInitMessage;
/// Per-test IPC endpoint namespace backed by a unique temporary directory.
///
@@ -57,12 +57,7 @@ fn test_mock_engine_config() -> MockEngineConfig {
MockEngineConfig {
local: true,
headless: true,
ready_response: EngineCoreReadyResponse {
max_model_len: 4096,
num_gpu_blocks: 0,
dp_stats_address: None,
dtype: Some(ModelDtype::Float32),
},
ready_response: default_ready_response(),
..Default::default()
}
}
@@ -925,6 +925,7 @@ async fn client_fail_closes_when_main_output_path_receives_dp_control() {
.await;
assert_eq!(client.engine_identities()[0], b"engine-0");
assert!(client.ready_responses()[0].max_model_len > 0);
assert_eq!(client.vllm_version(), "test-vllm-version");
let mut stream_1 = client.call(sample_request_with_id("req-1")).await.unwrap();
let mut stream_2 = client.call(sample_request_with_id("req-2")).await.unwrap();
+40 -50
View File
@@ -104,8 +104,8 @@ pub struct ConnectedEngine {
/// The identity of the connected engine.
pub engine_id: EngineId,
/// Post-initialization configuration received from the engine on the input
/// socket registration message. `None` until the registration is received.
pub ready_response: Option<EngineCoreReadyResponse>,
/// socket registration message.
pub ready_response: EngineCoreReadyResponse,
}
/// Represents the connected shared transport plus all registered engines after
@@ -295,18 +295,9 @@ pub async fn connect_handshake(
}
}
// 4. Wait for every engine to connect to the shared input socket and register itself. The
// `ready_response` is a placeholder; it is populated for each engine by
// `wait_for_input_registrations` below.
let mut engines: Vec<_> = engines
.into_keys()
.map(|engine_id| ConnectedEngine {
engine_id,
ready_response: None,
})
.collect();
wait_for_input_registrations(&mut input_socket, &mut engines, ready_timeout).await?;
// 6. Wait for every engine to connect to the shared input socket and register itself.
let engines =
wait_for_input_registrations(&mut input_socket, engines.into_keys(), ready_timeout).await?;
debug!(
engine_count = engines.len(),
"all engines registered on shared input socket"
@@ -349,15 +340,13 @@ pub async fn connect_bootstrapped(
let mut output_socket = PullSocket::new();
let output_address = output_socket.bind(output_address).await?.to_string();
// TODO: follow start rank
let mut engines = (0..engine_count)
.map(|index| ConnectedEngine {
engine_id: EngineId::from((index as u16).to_le_bytes().to_vec()),
ready_response: None,
})
.collect::<Vec<_>>();
wait_for_input_registrations(&mut input_socket, &mut engines, ready_timeout).await?;
let engines = wait_for_input_registrations(
&mut input_socket,
// TODO: follow start rank
(0..engine_count).map(|index| EngineId::from((index as u16).to_le_bytes().to_vec())),
ready_timeout,
)
.await?;
info!(
engine_count = engines.len(),
"bootstrapped engines connected"
@@ -455,17 +444,14 @@ async fn send_init_message(
/// Simplify API server handshake"), the payload is a msgpack-encoded
/// [`EngineCoreReadyResponse`] carrying post-initialization values such as
/// `max_model_len`.
///
/// Older engines sent an empty second frame here just to establish the
/// ROUTER/DEALER backchannel, with no structured payload on the input socket.
/// We continue to tolerate that legacy shape so the frontend can still connect
/// to slightly older local engine checkouts.
async fn wait_for_input_registrations(
input_socket: &mut RouterSocket,
engines: &mut [ConnectedEngine],
expected_engines: impl IntoIterator<Item = EngineId>,
ready_timeout: Duration,
) -> Result<()> {
let mut pending = engines.iter().map(|e| e.engine_id.clone()).collect::<BTreeSet<_>>();
) -> Result<Vec<ConnectedEngine>> {
let expected_engines = expected_engines.into_iter().collect::<Vec<_>>();
let mut pending = expected_engines.iter().cloned().collect::<BTreeSet<_>>();
let mut ready_responses = BTreeMap::new();
while !pending.is_empty() {
let registration = timeout(ready_timeout, input_socket.recv()).await.map_err(|_| {
@@ -489,29 +475,33 @@ async fn wait_for_input_registrations(
);
}
let ready_response = if frames[1].is_empty() {
debug!(
?actual_id,
"received legacy empty input registration from engine"
if frames[1].is_empty() {
bail_unexpected_handshake_message!(
"expected msgpack EngineCoreReadyResponse for engine input registration, got empty payload from engine id {actual_id:?}"
);
None
} else {
let ready_response: EngineCoreReadyResponse = decode_msgpack(&frames[1])?;
debug!(
?actual_id,
?ready_response,
"received input registration from engine"
);
Some(ready_response)
};
// Store the ready response in the corresponding engine entry.
if let Some(engine) = engines.iter_mut().find(|e| e.engine_id == actual_id) {
engine.ready_response = ready_response;
}
let ready_response: EngineCoreReadyResponse = decode_msgpack(&frames[1])?;
debug!(
?actual_id,
?ready_response,
"received input registration from engine"
);
ready_responses.insert(actual_id, ready_response);
}
Ok(())
Ok(expected_engines
.into_iter()
.map(|engine_id| {
let ready_response = ready_responses
.remove(&engine_id)
.expect("every expected engine id has a decoded ready response");
ConnectedEngine {
engine_id,
ready_response,
}
})
.collect())
}
/// Send an encoded message to the engine through the input socket.
+2 -1
View File
@@ -98,7 +98,8 @@ async fn mock_engine_connects_over_tcp() {
let (client, shutdown, task) = connect_with_mock(handshake_address, 1, 1).await;
assert_eq!(client.engine_count(), 1);
assert_eq!(client.engine_identities()[0], &[0, 0]);
assert_eq!(client.max_model_len(), Some(1024 * 1024));
assert_eq!(client.max_model_len(), 1024 * 1024);
assert_eq!(client.vllm_version(), "test-vllm-version");
shutdown_mock(client, shutdown, task).await;
}
+2
View File
@@ -6,6 +6,7 @@ mod load;
mod metrics;
pub(crate) mod openai;
mod sleep;
mod version;
use std::sync::Arc;
@@ -35,6 +36,7 @@ fn build_router_with_dev_mode(state: Arc<AppState>, dev_mode_enabled: bool) -> R
.route("/health", get(health::health))
.route("/metrics", get(metrics::scrape))
.route("/load", get(load::load))
.route("/version", get(version::version))
// OpenAI-compatible endpoints
.route("/v1/models", get(openai::list_models))
.route("/v1/completions", post(openai::completions))
+21
View File
@@ -994,6 +994,27 @@ async fn list_models_returns_configured_model() {
assert_eq!(json["data"][0]["id"], "Qwen/Qwen1.5-0.5B-Chat");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn version_returns_engine_vllm_version() {
let mut app = test_app().await;
let response = app
.call(Request::builder().uri("/version").body(Body::empty()).expect("build request"))
.await
.expect("call app");
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body(), usize::MAX).await.expect("read body");
let json: serde_json::Value = serde_json::from_slice(&body).expect("decode json");
assert_eq!(
json,
json!({
"version": "test-vllm-version",
"rust_frontend_version": env!("CARGO_PKG_VERSION"),
})
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn http_metrics_record_list_models_requests() {
+23
View File
@@ -0,0 +1,23 @@
use std::sync::Arc;
use axum::Json;
use axum::extract::State;
use serde::Serialize;
use crate::state::AppState;
#[derive(Serialize)]
pub(crate) struct VersionResponse {
version: String,
rust_frontend_version: &'static str,
}
/// Get engine and Rust frontend version metadata.
pub async fn version(State(state): State<Arc<AppState>>) -> Json<VersionResponse> {
let version = state.engine_core_client().vllm_version().to_string();
Json(VersionResponse {
version,
rust_frontend_version: env!("CARGO_PKG_VERSION"),
})
}
+5 -7
View File
@@ -45,9 +45,9 @@ pub struct TextLlm {
/// Tokenizer/model metadata backend responsible for prompt encode/decode
/// and sampling hints.
backend: DynTextBackend,
/// Context window size derived by the backend or from engine startup
/// handshake, with optional override from config.
max_model_len: Option<u32>,
/// Context window size reported by the engine startup handshake, with
/// optional override from config.
max_model_len: u32,
}
impl TextLlm {
@@ -71,7 +71,7 @@ impl TextLlm {
/// This takes priority over both the engine-reported default and any
/// tokenizer/model metadata exposed by the backend.
pub fn with_max_model_len(mut self, max_model_len: u32) -> Self {
self.max_model_len = Some(max_model_len);
self.max_model_len = max_model_len;
self
}
@@ -129,9 +129,7 @@ impl TextLlm {
};
let mut sampling_hints = self.backend.sampling_hints()?;
if let Some(max_model_len) = self.max_model_len {
sampling_hints.max_model_len = Some(max_model_len);
}
sampling_hints.max_model_len = Some(self.max_model_len);
let PreparedTextRequest {
text_request,
generate_request,
+2 -1
View File
@@ -75,7 +75,8 @@ class EngineCoreReadyResponse:
max_model_len: int
num_gpu_blocks: int
dp_stats_address: str | None
dtype: str | None = None
dtype: str
vllm_version: str
class EngineCoreRequest(
+1
View File
@@ -1464,6 +1464,7 @@ class EngineCoreProc(EngineCore):
num_gpu_blocks=self.vllm_config.cache_config.num_gpu_blocks or 0,
dp_stats_address=self.frontend_stats_publish_address,
dtype=str(self.vllm_config.model_config.dtype).removeprefix("torch."),
vllm_version=VLLM_VERSION,
)
ready_payload = msgspec.msgpack.encode(ready_response)
for input_socket in input_sockets: