TensorRT-LLMs/tensorrt_llm/executor/rpc
Erin 44d1c75701
[TRTLLM-8988][feat] Unify MPI & Ray's req/response handling with RPC Client/Server (#8765)
Signed-off-by: Erin Ho <14718778+hchings@users.noreply.github.com>
2025-11-13 17:21:24 -08:00
..
__init__.py [None][fix] fix rpc unique addr related issue (#8419) 2025-10-22 04:47:18 -04:00
README.md [TRTLLM-8189][chore] enhance GenerationExecutor with RPC (part1) (#5543) 2025-10-05 17:28:20 +08:00
rpc_client.py [TRTLLM-8988][feat] Unify MPI & Ray's req/response handling with RPC Client/Server (#8765) 2025-11-13 17:21:24 -08:00
rpc_common.py [None][fix] fix rpc unique addr related issue (#8419) 2025-10-22 04:47:18 -04:00
rpc_server.py [None][chore] Optimize perf for the RPC executor and add some profile utilities to llm-api (#8415) 2025-11-03 17:59:49 -08:00

A Lightweight RPC

This is a pure-Python lightweight RPC we build to simplify our existing IPC code in the orchestrator part. It provides multiple call modes (sync, async, future, streaming) and supports both IPC and TCP connections.

Examples

Create Server and Client

from tensorrt_llm.executor.rpc import RPCServer, RPCClient

# Define your application
class App:
    def add(self, a: int, b: int) -> int:
        return a + b
    
    async def async_multiply(self, x: int, y: int) -> int:
        return x * y

# Create and start server
app = App()
with RPCServer(app) as server:
    server.bind("ipc:///tmp/my_rpc")  # or "tcp://127.0.0.1:5555"
    server.start()
    
    # Create client and make calls
    with RPCClient("ipc:///tmp/my_rpc") as client:
        result = client.add(5, 3).remote()
        print(result)  # Output: 8

Different Remote Calls

Synchronous Call

# Blocking call that waits for result
result = client.add(10, 20).remote()
# or with timeout
result = client.add(10, 20).remote(timeout=5.0)

Asynchronous Call

# Async call that returns a coroutine
result = await client.async_multiply(3, 4).remote_async()

Future-based Call

# Returns a concurrent.futures.Future
future = client.add(1, 2).remote_future()
# Get result later
result = future.result()

Fire-and-Forget Call

# Send request without waiting for response
client.submit_task(task_id=123).remote(need_response=False)

Streaming Call

# For async generator methods
async for value in client.stream_data(n=10).remote_streaming():
    print(f"Received: {value}")

Error Handling

from tensorrt_llm.executor.rpc import RPCError, RPCTimeout

try:
    result = client.risky_operation().remote(timeout=1.0)
except RPCTimeout:
    print("Operation timed out")
except RPCError as e:
    print(f"RPC Error: {e}")
    print(f"Original cause: {e.cause}")
    print(f"Traceback: {e.traceback}")

Graceful Shutdown

# Shutdown server from client
client.shutdown_server()