From 6c4e0c3dbea6e7e5b79929313779e804ef2db4d1 Mon Sep 17 00:00:00 2001 From: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com> Date: Mon, 26 Jan 2026 18:10:58 +0800 Subject: [PATCH] [https://nvbugs/5826689][fix] replace etcd3 with etcd-sdk-python (#10886) Signed-off-by: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com> Signed-off-by: Wangshanshan <30051912+dominicshanshan@users.noreply.github.com> --- requirements.txt | 2 +- tensorrt_llm/serve/cluster_storage.py | 64 +++++++++++++++++-- tensorrt_llm/serve/metadata_server.py | 7 +- .../disaggregated/test_remoteDictionary.py | 4 +- 4 files changed, 61 insertions(+), 16 deletions(-) diff --git a/requirements.txt b/requirements.txt index 8c44a008a2..1a854224e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -64,7 +64,6 @@ nvtx matplotlib # FIXME: this is added to make nvtx happy meson ninja -etcd3 @ git+https://github.com/kragniz/python-etcd3.git@e58a899579ba416449c4e225b61f039457c8072a blake3 soundfile triton==3.5.1 # NOTE: if you update this, you must also run scripts/vendor_triton_kernels.py to vendor the new version of triton_kernels @@ -83,3 +82,4 @@ cuda-core llist cuda-tile>=1.0.1 nvidia-cuda-tileiras>=13.1 +etcd-sdk-python==0.0.7 diff --git a/tensorrt_llm/serve/cluster_storage.py b/tensorrt_llm/serve/cluster_storage.py index e6fe14eda4..d2ba0d9117 100644 --- a/tensorrt_llm/serve/cluster_storage.py +++ b/tensorrt_llm/serve/cluster_storage.py @@ -1,13 +1,16 @@ import abc import asyncio +import importlib.metadata as importlib_metadata +import importlib.util +import sys import time from dataclasses import dataclass from enum import IntEnum from functools import wraps from typing import Callable, Dict, List, Optional +from urllib.parse import urlparse import aiohttp -import etcd3 from fastapi import FastAPI from fastapi.responses import JSONResponse from pydantic import BaseModel @@ -15,6 +18,45 @@ from pydantic import BaseModel from tensorrt_llm.logger import logger +def _find_module_file_in_distribution(dist, module_name: str): + module_path = module_name.replace(".", "/") + candidates = (f"{module_path}/__init__.py", f"{module_path}.py") + for dist_file in dist.files or []: + dist_file_str = str(dist_file) + if dist_file_str.endswith(candidates): + return dist.locate_file(dist_file) + return None + + +def load_module_from_distribution(dist_name: str, module_name: str): + dist = importlib_metadata.distribution(dist_name) + + module_file = _find_module_file_in_distribution(dist, module_name) + if not module_file: + raise ModuleNotFoundError( + f"{module_name} not found in distribution {dist_name}") + + load_name = module_name + spec = importlib.util.spec_from_file_location(load_name, module_file) + if spec is None or spec.loader is None: + raise ImportError( + f"Could not create a module spec for {module_name} in {dist_name}") + + module = importlib.util.module_from_spec(spec) + sys.modules[load_name] = module + try: + spec.loader.exec_module(module) + except Exception: + sys.modules.pop(load_name, None) + raise + return module + + +# pyectd and etcd-sdk-python both have package name "pyetcd", we need to find the correct one +# by distribution name +etcd3 = load_module_from_distribution("etcd-sdk-python", "pyetcd") + + class StorageItem(BaseModel): key: str value: Optional[str] = "" @@ -455,14 +497,25 @@ def handle_etcd_error(return_on_error: bool | None): class Etcd3ClusterStorage(ClusterStorage): + @staticmethod + def _connect(cluster_uri: str) -> etcd3.MultiEndpointEtcd3Client: + logger.info(f"Connecting to {cluster_uri}") + endpoints = [] + for url in cluster_uri.split(","): + parsed_url = urlparse(url) + endpoints.append( + etcd3.Endpoint(parsed_url.hostname, + parsed_url.port, + secure=False)) + return etcd3.MultiEndpointEtcd3Client(endpoints, timeout=10) + def __init__(self, cluster_uri: str, cluster_name: str, one_single_lease: bool = False, **kwargs): - cluster_uri = cluster_uri.replace("etcd://", "") - self._host, self._port = cluster_uri.rsplit(":", 1) - self._client = etcd3.client(self._host, self._port) + self._cluster_uri = cluster_uri + self._client = self._connect(self._cluster_uri) self._leases = {} self._instance_lease = None self._watch_handles = {} @@ -486,8 +539,7 @@ class Etcd3ClusterStorage(ClusterStorage): return self._client def reconnect(self): - logger.info(f"Reconnecting to {self._host}:{self._port}") - self._client = etcd3.client(self._host, self._port) + self._client = self._connect(self._cluster_uri) async def start(self): # nothing to do diff --git a/tensorrt_llm/serve/metadata_server.py b/tensorrt_llm/serve/metadata_server.py index c4c046938d..1453ca196e 100644 --- a/tensorrt_llm/serve/metadata_server.py +++ b/tensorrt_llm/serve/metadata_server.py @@ -3,12 +3,7 @@ from abc import ABC, abstractmethod from typing import Optional from tensorrt_llm.llmapi.disagg_utils import MetadataServerConfig -from tensorrt_llm.logger import logger - -try: - import etcd3 -except Exception as e: - logger.warning(f"etcd3 is not installed correctly: {e}") +from tensorrt_llm.serve.cluster_storage import etcd3 class RemoteDictionary(ABC): diff --git a/tests/unittest/disaggregated/test_remoteDictionary.py b/tests/unittest/disaggregated/test_remoteDictionary.py index c9909de88a..bdd465aabd 100644 --- a/tests/unittest/disaggregated/test_remoteDictionary.py +++ b/tests/unittest/disaggregated/test_remoteDictionary.py @@ -5,9 +5,7 @@ import subprocess import time import unittest -import etcd3 - -from tensorrt_llm.serve.metadata_server import EtcdDictionary +from tensorrt_llm.serve.metadata_server import EtcdDictionary, etcd3 def wait_for_port(host, port, timeout=15):