TensorRT-LLMs/cpp/tensorrt_llm/batch_manager/dataTransceiverImpl.h
Chuang Zhu 44cfd757b2
Agent interface impl for NIXL (#4125)
* agentConnection

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

recv

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

agentState

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

NIXL interfaces

Signed-off-by: Shixiaowei02 <39303645+Shixiaowei02@users.noreply.github.com>

update cmakelists

Signed-off-by: Shixiaowei02 <39303645+Shixiaowei02@users.noreply.github.com>

nixl improve

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

remove cppzmq

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

fix

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

transferAgent remove register

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

work for cache Test

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

reduce sleep time

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

fix test

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

intergarte

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

nixl env

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

fix rebase error

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

cpp test

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

stash for send metaData

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

loadRemoteMD after fetchRemoteMD

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

workaround for mixed gen and context

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

test_env

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

avoid port conflict in test

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

* format

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

* use std::string

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

* typo

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

* fix transferAgentTest

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>

---------

Signed-off-by: Chuang Zhu <111838961+chuangz0@users.noreply.github.com>
2025-05-22 09:09:41 +08:00

108 lines
3.7 KiB
C++

/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "tensorrt_llm/batch_manager/cacheTransBuffer.h"
#include "tensorrt_llm/batch_manager/dataTransceiver.h"
#include "tensorrt_llm/common/envUtils.h"
#include "tensorrt_llm/executor/cache_transmission/cacheConcatenate.h"
namespace tensorrt_llm::batch_manager
{
struct TransceiverTag
{
enum class Id : uint64_t
{
REQUEST_SEND = 1,
TERMINATION = 2
};
static constexpr int32_t kID_TAG{19};
static constexpr int32_t kINFO_SIZE_TAG{22};
static constexpr int32_t kINFO_TAG{32};
};
class DataSenderImpl : public DataSender, public TransceiverTag
{
public:
using SizeType32 = tensorrt_llm::runtime::SizeType32;
using RequestMapInfo
= std::vector<std::pair<executor::kv_cache::Connection const*, executor::DataTransceiverState>>;
DataSenderImpl(executor::kv_cache::ConnectionManager* manager, executor::kv_cache::CacheState selfCacheState,
SizeType32 selfIndex, std::unique_ptr<IOFormatter> formatter);
[[nodiscard]] RequestInfo recvRequestInfo() override;
void sendSync(LlmRequest const& llmRequest) override;
[[nodiscard]] executor::kv_cache::CommState const& getCommState() const override;
void setCommState(executor::kv_cache::CommState commState) override;
[[nodiscard]] size_t getCounterpartsCount(LlmRequest::RequestIdType requestId) const override;
void release(LlmRequest::RequestIdType requestId) override;
private:
executor::kv_cache::ConnectionManager* mManager;
std::map<LlmRequest::RequestIdType, RequestMapInfo> mRequestToComms;
executor::DataTransceiverState mSelfState;
std::unique_ptr<IOFormatter> mFormatter;
std::mutex mMtxForMap;
runtime::BufferManager mBufferManager;
};
class DataReceiverImpl : public DataReceiver, public TransceiverTag
{
public:
using SizeType32 = tensorrt_llm::runtime::SizeType32;
DataReceiverImpl(executor::kv_cache::ConnectionManager* manager, executor::kv_cache::CacheState selfCacheState,
SizeType32 selfIndex, std::unique_ptr<IOFormatter> formatter);
void sendRequestInfo(LlmRequest const& llmRequest) override;
void receiveSync(LlmRequest const& llmRequest) override;
private:
struct ReceiveCacheResource
{
runtime::BufferManager mBufferManager;
runtime::CudaEvent mCudaEvent;
ReceiveCacheResource(runtime::BufferManager&& bufferManager, runtime::CudaEvent&& cudaEvent)
: mBufferManager(bufferManager)
, mCudaEvent(std::move(cudaEvent))
{
}
};
static void sendRequestInfo(executor::kv_cache::Connection const* connection, RequestInfo const& info);
[[nodiscard]] std::unique_ptr<ReceiveCacheResource> const& getReceiveCacheResource(LlmRequest const& llmRequest);
executor::kv_cache::ConnectionManager* mManager;
executor::DataTransceiverState mSelfState;
std::unique_ptr<IOFormatter> mFormatter;
std::unordered_map<std::string, std::unique_ptr<ReceiveCacheResource>> mProcessToResources;
std::mutex mProcessIoResouceMutex;
};
} // namespace tensorrt_llm::batch_manager