diff --git a/cpp/include/tensorrt_llm/batch_manager/llmRequest.h b/cpp/include/tensorrt_llm/batch_manager/llmRequest.h index 9b1e2b2f0c..276b0a6483 100644 --- a/cpp/include/tensorrt_llm/batch_manager/llmRequest.h +++ b/cpp/include/tensorrt_llm/batch_manager/llmRequest.h @@ -1691,22 +1691,22 @@ public: mDecodingIter = iter; } - void setKvCacheTransferStart(TimePoint const& time) + void setKvCacheTransferStart(TimePoint time) const { mPerfMetrics.timingMetrics.kvCacheTransferStart = maybeToGlobalSteadyClock(time); } - void setKvCacheTransferEnd(TimePoint const& time) + void setKvCacheTransferEnd(TimePoint time) const { mPerfMetrics.timingMetrics.kvCacheTransferEnd = maybeToGlobalSteadyClock(time); } - TimePoint getKvCacheTransferStart() + TimePoint getKvCacheTransferStart() const { return mPerfMetrics.timingMetrics.kvCacheTransferStart; } - TimePoint getKvCacheTransferEnd() + TimePoint getKvCacheTransferEnd() const { return mPerfMetrics.timingMetrics.kvCacheTransferEnd; } @@ -1865,13 +1865,11 @@ public: return mUseDraftModel; } - // If mGlobalSteadyClockOffset is set, return a global steady clock time point, otherwise return local steady clock + // If sGlobalSteadyClockOffset is set, return a global steady clock time point, otherwise return local steady clock // time point - [[nodiscard]] TimePoint getSteadyClockNow() const + [[nodiscard]] static TimePoint getSteadyClockNow() { - const TimePoint time_point = std::chrono::steady_clock::now(); - - return maybeToGlobalSteadyClock(time_point); + return maybeToGlobalSteadyClock(std::chrono::steady_clock::now()); } RequestIdType mRequestId; @@ -1894,7 +1892,7 @@ public: SizeType32 mPtableCurrentPosition{0}; // The offset between local steady clock and global steady clock (at rank 0) - inline static std::optional mGlobalSteadyClockOffset{std::nullopt}; + inline static std::optional sGlobalSteadyClockOffset{std::nullopt}; protected: bool mIsStreaming; @@ -2028,9 +2026,9 @@ protected: std::optional mSkipCrossAttnBlocks{std::nullopt}; - // Performance metrics. + // Performance metrics. Should be updatable even from a const LlmRequest reference. bool mReturnPerfMetrics{false}; - executor::RequestPerfMetrics mPerfMetrics; + mutable executor::RequestPerfMetrics mPerfMetrics; // Guided decoding params. std::optional mGuidedDecodingParams{std::nullopt}; @@ -2183,16 +2181,13 @@ private: return tensor; } - TimePoint maybeToGlobalSteadyClock(TimePoint const& time_point) const + static TimePoint maybeToGlobalSteadyClock(TimePoint const& time_point) { - if (mGlobalSteadyClockOffset.has_value()) + if (sGlobalSteadyClockOffset.has_value()) { - return time_point + *mGlobalSteadyClockOffset; - } - else - { - return time_point; + return time_point + *sGlobalSteadyClockOffset; } + return time_point; } }; diff --git a/cpp/include/tensorrt_llm/executor/types.h b/cpp/include/tensorrt_llm/executor/types.h index 41df1c9c7a..e248cb1c3c 100644 --- a/cpp/include/tensorrt_llm/executor/types.h +++ b/cpp/include/tensorrt_llm/executor/types.h @@ -451,7 +451,7 @@ struct RequestPerfMetrics /// @brief End time of the KV cache transfer for disaggregated serving TimePoint kvCacheTransferEnd; /// @brief KV Cache size transfer for disaggregated serving - mutable size_t kvCacheSize = 0; + size_t kvCacheSize = 0; }; struct KvCacheMetrics diff --git a/cpp/tensorrt_llm/batch_manager/cacheFormatter.cpp b/cpp/tensorrt_llm/batch_manager/cacheFormatter.cpp index 4b457ea2a6..e7520d47fc 100644 --- a/cpp/tensorrt_llm/batch_manager/cacheFormatter.cpp +++ b/cpp/tensorrt_llm/batch_manager/cacheFormatter.cpp @@ -227,6 +227,7 @@ std::vector CacheFormatter::pickRecvConnections( void CacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& session) { NVTX3_SCOPED_RANGE(CacheFormatter_format); + session.setTime(TransferSession::kTimeFormatter); auto const& llmRequest = session.getLlmRequest(); TLLM_LOG_DEBUG( mpi::MpiComm::world().getRank(), "Start sending KV cache for request ID: %ld.", llmRequest.mRequestId); @@ -249,9 +250,6 @@ void CacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& sessio auto const numPools = blockManager.getNumPools(); // TODO(oargov): are we sure the other side has the same number of pools? this might not hold for pp_size>1... - auto lastTokenTime = llmRequest.getPerfMetrics().timingMetrics.lastTokenTime; - bool recordDelay = lastTokenTime != std::chrono::steady_clock::time_point(); - bool layerWise = common::getEnvDisaggLayerwise() && numPools == 1; if (layerWise) { @@ -420,6 +418,7 @@ void CacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& sessio inputKvCacheBlocksPerWindow, outputSplitCaches, destConfig, selfConfig, selfIdx, bufferManager); bufferManager.getStream().synchronize(); + session.setTime(TransferSession::kTimePreprocess); auto preAllocSendBuffer = mCacheTransBufferManager->getSendBuffer(cacheBufferId); if (preAllocSendBuffer != nullptr) @@ -434,7 +433,7 @@ void CacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& sessio TLLM_CUDA_CHECK(cudaSetDevice(deviceId)); TLLM_CHECK(connections.size() > (processIdx / peerDuplicateHeadFactor)); TLLM_CHECK(outputSplitCaches.size() > (processIdx / peerDuplicateHeadFactor)); - auto startTime = llmRequest.getSteadyClockNow(); + auto startTime = LlmRequest::getSteadyClockNow(); size_t ppDomainSize = targetInfo.mDomainPPSize; size_t bufferTpRank = (processIdx / ppDomainSize) / peerDuplicateHeadFactor; @@ -481,15 +480,8 @@ void CacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& sessio } } - auto endTime = llmRequest.getSteadyClockNow(); - double delay = 0.0; - if (recordDelay) - { - delay = std::chrono::duration(startTime - lastTokenTime).count(); - } - double cacheTransferTime - = std::max(0.0, std::chrono::duration(endTime - startTime).count()); - session.appendMeasure(delay, cacheTransferTime, size); + auto endTime = LlmRequest::getSteadyClockNow(); + session.appendMeasure(startTime, endTime, size); }; if (connections.size() > 1) @@ -534,8 +526,10 @@ void CacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& sessio { sendBufferFun(deviceId, 0); } + session.setTime(TransferSession::kTimeTransmissions); mCacheTransBufferManager->freeBufferIndexForSend(cacheBufferId); + session.setTime(TransferSession::kTimePostprocess); } TLLM_LOG_DEBUG( mpi::MpiComm::world().getRank(), "End the sending of KV cache for the request ID:%ld ", llmRequest.mRequestId); @@ -544,6 +538,7 @@ void CacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& sessio void CacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& session) { NVTX3_SCOPED_RANGE(CacheFormatter_unformat); + session.setTime(TransferSession::kTimeFormatter); auto const& llmRequest = session.getLlmRequest(); auto const ctxReqId = llmRequest.getContextPhaseParams().value().getReqId(); TLLM_LOG_DEBUG(mpi::MpiComm::world().getRank(), @@ -555,9 +550,6 @@ void CacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& sess auto& bufferManager = session.getBufferManager(); auto blockRange = getBlockRangeForReceiving(mCacheManager, llmRequest, destConfig.getEnableBlockReuse()); - auto arrivalTime = llmRequest.getPerfMetrics().timingMetrics.arrivalTime; - bool recordDelay = arrivalTime != std::chrono::steady_clock::time_point(); - auto pickUpConnections = pickRecvConnections(connections.size(), selfConfig, selfIdx, destConfig); TLLM_LOG_DEBUG("pickUpConnections size: %d connections size: %d", pickUpConnections.size(), connections.size()); @@ -779,6 +771,7 @@ void CacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& sess // sync to alloc buffer bufferManager.getStream().synchronize(); } + session.setTime(TransferSession::kTimePreprocess); runtime::ITensor::SharedPtr preAllocRecvBuffer = nullptr; if (cacheBufferId.has_value()) @@ -794,7 +787,7 @@ void CacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& sess TLLM_CUDA_CHECK(cudaSetDevice(deviceId)); TLLM_CHECK(pickUpConnections.size() > processIdx); TLLM_CHECK(recvSplitCaches.size() > processIdx); - auto startTime = llmRequest.getSteadyClockNow(); + auto startTime = LlmRequest::getSteadyClockNow(); size_t size = 0; if (processIdx >= remainNoCoverTargetNum) @@ -835,15 +828,8 @@ void CacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& sess } } - auto endTime = llmRequest.getSteadyClockNow(); - double delay = 0.0; - if (recordDelay) - { - delay = std::chrono::duration(startTime - arrivalTime).count(); - } - double cacheTransferTime - = std::max(0.0, std::chrono::duration(endTime - startTime).count()); - session.appendMeasure(delay, cacheTransferTime, size); + auto endTime = LlmRequest::getSteadyClockNow(); + session.appendMeasure(startTime, endTime, size); }; if (pickUpConnections.size() > 1) { @@ -891,6 +877,7 @@ void CacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& sess { recvBufferFun(deviceId, 0); } + session.setTime(TransferSession::kTimeTransmissions); { NVTX3_SCOPED_RANGE(formatInputConcatenate); @@ -904,6 +891,7 @@ void CacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& sess mCacheTransBufferManager->freeBufferIndexForRecv(cacheBufferId); } } + session.setTime(TransferSession::kTimePostprocess); } } diff --git a/cpp/tensorrt_llm/batch_manager/cacheTransceiver.cpp b/cpp/tensorrt_llm/batch_manager/cacheTransceiver.cpp index 27676fb656..b96d47dfbd 100644 --- a/cpp/tensorrt_llm/batch_manager/cacheTransceiver.cpp +++ b/cpp/tensorrt_llm/batch_manager/cacheTransceiver.cpp @@ -603,7 +603,7 @@ void CacheTransceiver::checkGenTransferStatus(std::optional const& atLeastR it->first->setState(LlmRequestState::kDISAGG_GENERATION_TRANS_COMPLETE); // Gather the kv cache transfer time from all workers and update to leader rank - if (!common::getEnvKVCacheTransferOutputPath().empty()) + if (!common::getEnvKVCacheTimeOutputPath().empty()) { auto syncComm = mCacheState->getParallelConfig().mEnableAttentionDP ? mGroupDataComm : mGroupComm; updateKVCacheTransferBW(syncComm, it->first); diff --git a/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp b/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp index 943e18a112..d5cfc607c7 100644 --- a/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp +++ b/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp @@ -28,6 +28,7 @@ #include "tensorrt_llm/executor/cache_transmission/agent_utils/connection.h" #include "tensorrt_llm/runtime/common.h" #include "tensorrt_llm/runtime/utils/mpiUtils.h" +#include #include #include #include @@ -105,39 +106,65 @@ void TransferSession::setLlmRequest(LlmRequest const& llmRequest) mRequest = &llmRequest; } -void TransferSession::appendMeasure(double delay, double duration, size_t size) +void TransferSession::setTime(TimeNames name) { - if (!mRecordMeasure) + if (mTimes) { - return; + mTimes->times.at(name) = LlmRequest::getSteadyClockNow(); + } +} + +void TransferSession::appendMeasure(LlmRequest::TimePoint start, LlmRequest::TimePoint end, size_t size) +{ + if (mTimes) + { + mTimes->measures.emplace_back(Measure{start, end, size}); } - auto bandwidth = size * 8 / (duration / 1000) / 1e9; // byte, ms => Gbps - mMeasures.emplace_back(Measure{delay, duration, bandwidth}); } void TransferSession::exportMeasure(std::ofstream& outFile, bool isContext) const { - if (mMeasures.empty()) + if (!mTimes || mTimes->measures.empty()) { return; } // write header if not exist if (outFile.tellp() == 0) { - outFile << "RequestID"; - for (size_t i = 0; i < mMeasures.size(); i++) + outFile << "RequestID,RequestInfo,Preparation,Preprocess,Transmissions,Postprocess"; + for (size_t i = 0; i < mTimes->measures.size(); i++) { - outFile << ",Delay(ms),Duration(ms),Bandwidth(Gbps)"; + outFile << ",Delay,Duration,Bandwidth(Gbps)"; } outFile << '\n'; } - // write measures + auto transferStart = mRequest->getPerfMetrics().timingMetrics.kvCacheTransferStart; + using Milliseconds = std::chrono::duration; + + // write measures, time is in milliseconds TLLM_CHECK(isContext || mRequest->getContextPhaseParams().has_value()); auto reqId = isContext ? mRequest->mRequestId : mRequest->getContextPhaseParams().value().getReqId(); outFile << reqId; - for (auto const& measure : mMeasures) + auto previousTime = transferStart; + for (auto time : mTimes->times) { - outFile << "," << measure.delay << "," << measure.duration << "," << measure.bandwidth; + if (time == LlmRequest::TimePoint()) + { + // timepoint is unset, skip + outFile << ",0.0"; + continue; + } + double delay = Milliseconds(time - previousTime).count(); + previousTime = time; + outFile << "," << delay; + } + previousTime = mTimes->times[kTimePreprocess]; + for (auto const& measure : mTimes->measures) + { + double delay = Milliseconds(measure.start - previousTime).count(); + double duration = Milliseconds(measure.end - measure.start).count(); + double bandwidth = static_cast(measure.size) * 8.0 / duration / 1e6; // byte, ms => Gbps + outFile << "," << delay << "," << duration << "," << bandwidth; } outFile << '\n' << std::flush; } @@ -158,7 +185,7 @@ int32_t tagFromRequestId(LlmRequest::RequestIdType requestId) std::filesystem::path getTransferOutputPath(char const* tag) { namespace fs = std::filesystem; - auto outputPath = common::getEnvKVCacheTransferOutputPath(); + auto outputPath = common::getEnvKVCacheTimeOutputPath(); if (!outputPath.empty()) { auto rank = mpi::MpiComm::world().getRank(); @@ -273,6 +300,7 @@ public: { std::promise promise; auto future = promise.get_future(); + llmRequest.setKvCacheTransferStart(LlmRequest::getSteadyClockNow()); { { std::scoped_lock lkResp(mSenderMutex); @@ -309,7 +337,7 @@ public: std::unique_lock lk(mMtxForMap); auto it = mRequestToSession.find(requestId); TLLM_CHECK(it != mRequestToSession.end()); - if (!common::getEnvKVCacheTransferOutputPath().empty()) + if (!common::getEnvKVCacheTimeOutputPath().empty()) { if (!mMeasuresFile.is_open()) { @@ -363,7 +391,8 @@ public: auto session = TransferSession(std::vector(peerRelativeRanks.size(), nullptr), DataContext{tagFromRequestId(requestId)}, mSelfState, info.getTransState(), mBufferManager, info.getIndexFromEnd(), info.getLastBlockKey(), nullptr, - !common::getEnvKVCacheTransferOutputPath().empty()); + !common::getEnvKVCacheTimeOutputPath().empty()); + session.setTime(TransferSession::kTimeRequestInfo); it = mRequestToSession.emplace(requestId, std::move(session)).first; } it->second.setConnection(peerIdx, connection); @@ -382,6 +411,7 @@ public: } session->setLlmRequest(llmRequest); mFormatter->format(*session); + llmRequest.setKvCacheTransferEnd(LlmRequest::getSteadyClockNow()); } bool cancelRequest(LlmRequest const& llmRequest) @@ -751,7 +781,7 @@ public: void receiveSync(TransferSession& session) { mFormatter->unformat(session); - if (!common::getEnvKVCacheTransferOutputPath().empty()) + if (!common::getEnvKVCacheTimeOutputPath().empty()) { std::unique_lock lock(mMeasuresFileMutex); if (!mMeasuresFile.is_open()) @@ -846,7 +876,7 @@ public: auto const& resource = getReceiveCacheResource(llmRequest); return TransferSession(std::move(counterPartConnections), DataContext{tagFromRequestId(requestId)}, mSelfState, contextState, resource->mBufferManager, requestInfo.getIndexFromEnd(), requestInfo.getLastBlockKey(), - &llmRequest, !common::getEnvKVCacheTransferOutputPath().empty()); + &llmRequest, !common::getEnvKVCacheTimeOutputPath().empty()); } std::unique_ptr const& getReceiveCacheResource(LlmRequest const& llmRequest) @@ -957,6 +987,7 @@ private: llmRequest.setKvCacheTransferStart(std::chrono::steady_clock::now()); TLLM_CUDA_CHECK(cudaSetDevice(mDeviceId)); auto session = sendRequestInfo(llmRequest); + session.setTime(TransferSession::kTimeRequestInfo); bool isReady = receiveReadySignal(session); if (!isReady) { diff --git a/cpp/tensorrt_llm/batch_manager/dataTransceiver.h b/cpp/tensorrt_llm/batch_manager/dataTransceiver.h index dc77401cb2..c97633ba96 100644 --- a/cpp/tensorrt_llm/batch_manager/dataTransceiver.h +++ b/cpp/tensorrt_llm/batch_manager/dataTransceiver.h @@ -56,29 +56,48 @@ using UniqueToken = tensorrt_llm::runtime::UniqueToken; class TransferSession { public: + // measures for each single transmission struct Measure { - double delay; // from last token (ctx) or arrival time (gen), in ms - double duration; // in ms - double bandwidth; // in Gbps + LlmRequest::TimePoint start; + LlmRequest::TimePoint end; + size_t size = 0; + }; + + enum TimeNames : uint8_t + { + kTimeRequestInfo = 0, + kTimeFormatter, + kTimePreprocess, + kTimeTransmissions, + kTimePostprocess, + kTimeCounts + }; + + struct KVCacheTimes + { + std::array times; + std::vector measures; }; TransferSession(std::vector connections, DataContext dataContext, executor::DataTransceiverState const& selfState, executor::DataTransceiverState otherState, runtime::BufferManager const& bufferManager, int32_t indexFromEnd, BlockKey const& lastBlockKey, - LlmRequest const* llmRequest = nullptr, bool recordMeasure = false) + LlmRequest const* llmRequest = nullptr, bool recordTiming = false) : mConnections(std::move(connections)) , mDataContext(std::move(dataContext)) , mSelfState(&selfState) , mOtherState(std::move(otherState)) , mBufferManager(&bufferManager) , mRequest(llmRequest) - , mMeasures() - , mRecordMeasure(recordMeasure) , mIndexFromEnd(indexFromEnd) , mLastBlockKey(lastBlockKey) { TLLM_CHECK(!mConnections.empty()); + if (recordTiming) + { + mTimes = std::make_unique(); + } } [[nodiscard]] std::vector const& getConnections() const; @@ -103,7 +122,9 @@ public: // in CacheSender, the LlmRequest is not available until the sendSync is called void setLlmRequest(LlmRequest const& llmRequest); - void appendMeasure(double delay, double duration, size_t size); + void setTime(TimeNames name); + + void appendMeasure(LlmRequest::TimePoint start, LlmRequest::TimePoint end, size_t size); // TODO: 1. use global id instead of context request id; 2. export to llm metrics instead of file void exportMeasure(std::ofstream& outFile, bool isContext) const; @@ -125,8 +146,7 @@ private: executor::DataTransceiverState mOtherState; runtime::BufferManager const* mBufferManager; LlmRequest const* mRequest; - std::vector mMeasures; - bool mRecordMeasure{false}; + std::unique_ptr mTimes; int32_t mIndexFromEnd{0}; BlockKey mLastBlockKey{}; }; diff --git a/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.cpp b/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.cpp index 432a541c3c..8e50ebb1a1 100644 --- a/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.cpp +++ b/cpp/tensorrt_llm/batch_manager/mlaCacheFormatter.cpp @@ -122,6 +122,7 @@ bool MLACacheFormatter::needSendCache( void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& session) { NVTX3_SCOPED_RANGE(MLACacheFormatter_format); + session.setTime(TransferSession::kTimeFormatter); auto const& llmRequest = session.getLlmRequest(); TLLM_LOG_DEBUG( mpi::MpiComm::world().getRank(), "Start sending KV cache for request ID: %ld.", llmRequest.mRequestId); @@ -141,9 +142,6 @@ void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& ses // diff end - auto lastTokenTime = llmRequest.getPerfMetrics().timingMetrics.lastTokenTime; - bool recordDelay = lastTokenTime != std::chrono::steady_clock::time_point(); - int blockNum = 0; std::vector inputKvCacheBlocks; auto const numPools = mCacheManager->getBlockManager().getNumPools(); @@ -235,6 +233,7 @@ void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& ses inputKvCacheBlocksPerWindow, outputSplitCaches, destConfig, selfConfig, selfIdx, bufferManager); bufferManager.getStream().synchronize(); + session.setTime(TransferSession::kTimePreprocess); auto preAllocSendBuffer = mCacheTransBufferManager->getSendBuffer(cacheBufferId); if (preAllocSendBuffer != nullptr) @@ -246,7 +245,7 @@ void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& ses NVTX3_SCOPED_RANGE(sendBufferFun); TLLM_CUDA_CHECK(cudaSetDevice(deviceId)); - auto startTime = llmRequest.getSteadyClockNow(); + auto startTime = LlmRequest::getSteadyClockNow(); auto cacheIdx = processIdx % (pPDomainSize * cPDomainSize); if (cacheIdx < bufferCoverTargetNum) { @@ -279,15 +278,8 @@ void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& ses remainSendSize -= sendSize; } } - auto endTime = llmRequest.getSteadyClockNow(); - double delay = 0.0; - if (recordDelay) - { - delay = std::chrono::duration(startTime - lastTokenTime).count(); - } - double cacheTransferTime - = std::max(0.0, std::chrono::duration(endTime - startTime).count()); - session.appendMeasure(delay, cacheTransferTime, outputSplitCaches.at(cacheIdx)->getSizeInBytes()); + auto endTime = LlmRequest::getSteadyClockNow(); + session.appendMeasure(startTime, endTime, outputSplitCaches.at(cacheIdx)->getSizeInBytes()); }; if (connections.size() > 1) @@ -331,7 +323,9 @@ void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& ses { sendBufferFun(deviceId, 0); } + session.setTime(TransferSession::kTimeTransmissions); mCacheTransBufferManager->freeBufferIndexForSend(cacheBufferId); + session.setTime(TransferSession::kTimePostprocess); TLLM_LOG_DEBUG( mpi::MpiComm::world().getRank(), "End the sending of KV cache for the request ID: %ld.", llmRequest.mRequestId); @@ -340,6 +334,7 @@ void MLACacheFormatter::format(tensorrt_llm::batch_manager::TransferSession& ses void MLACacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& session) { NVTX3_SCOPED_RANGE(MLACacheFormatter_unformat); + session.setTime(TransferSession::kTimeFormatter); auto const& llmRequest = session.getLlmRequest(); TLLM_CHECK_WITH_INFO(llmRequest.mSamplingConfig.beamWidth == 1, "Currently only supports beam width 1."); auto const ctxReqId = llmRequest.getContextPhaseParams().value().getReqId(); @@ -350,8 +345,6 @@ void MLACacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& s auto const selfIdx = session.getSelfState().getCommState().value().getSelfIdx(); auto const& connections = session.getConnections(); auto& bufferManager = session.getBufferManager(); - auto arrivalTime = llmRequest.getPerfMetrics().timingMetrics.arrivalTime; - bool recordDelay = arrivalTime != std::chrono::steady_clock::time_point(); auto pickUpConnections = pickRecvConnections(connections.size(), selfConfig, selfIdx, destConfig); auto blockRange = getBlockRangeForReceiving(mCacheManager, llmRequest, destConfig.getEnableBlockReuse()); std::vector recvBufferTmps; @@ -445,6 +438,7 @@ void MLACacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& s TLLM_CHECK(onlyUseDynamicBuffer == false); } bufferManager.getStream().synchronize(); + session.setTime(TransferSession::kTimePreprocess); auto preAllocRecvBuffer = mCacheTransBufferManager->getRecvBuffer(cacheBufferId); if (preAllocRecvBuffer != nullptr) @@ -456,7 +450,7 @@ void MLACacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& s { NVTX3_SCOPED_RANGE(recvBufferFun); TLLM_CUDA_CHECK(cudaSetDevice(deviceId)); - auto startTime = llmRequest.getSteadyClockNow(); + auto startTime = LlmRequest::getSteadyClockNow(); size_t size = 0; if (processIdx >= remainNoCoverTargetNum) { @@ -489,15 +483,8 @@ void MLACacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& s remainRecvSize -= recvSize; } } - auto endTime = llmRequest.getSteadyClockNow(); - double delay = 0.0; - if (recordDelay) - { - delay = std::chrono::duration(startTime - arrivalTime).count(); - } - double cacheTransferTime - = std::max(0.0, std::chrono::duration(endTime - startTime).count()); - session.appendMeasure(delay, cacheTransferTime, size); + auto endTime = LlmRequest::getSteadyClockNow(); + session.appendMeasure(startTime, endTime, size); }; if (pickUpConnections.size() > 1) @@ -546,6 +533,7 @@ void MLACacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& s { recvBufferFun(deviceId, 0); } + session.setTime(TransferSession::kTimeTransmissions); { std::map> outputCachesPerWindow; @@ -564,6 +552,7 @@ void MLACacheFormatter::unformat(tensorrt_llm::batch_manager::TransferSession& s { mCacheTransBufferManager->freeBufferIndexForRecv(cacheBufferId); } + session.setTime(TransferSession::kTimePostprocess); TLLM_LOG_DEBUG(mpi::MpiComm::world().getRank(), "End receiving KV cache for request ID: %ld, context request ID: %ld.", llmRequest.mRequestId, diff --git a/cpp/tensorrt_llm/common/envUtils.cpp b/cpp/tensorrt_llm/common/envUtils.cpp index 329ef39958..7c3b9ca015 100644 --- a/cpp/tensorrt_llm/common/envUtils.cpp +++ b/cpp/tensorrt_llm/common/envUtils.cpp @@ -380,7 +380,7 @@ size_t getEnvAllReduceWorkspaceSize() return workspaceSize; } -std::string const& getEnvKVCacheTransferOutputPath() +std::string const& getEnvKVCacheTimeOutputPath() { static std::string outputPath = getStrEnv("TRTLLM_KVCACHE_TIME_OUTPUT_PATH").value_or(""); return outputPath; diff --git a/cpp/tensorrt_llm/common/envUtils.h b/cpp/tensorrt_llm/common/envUtils.h index b51b7350da..af9b085fa5 100644 --- a/cpp/tensorrt_llm/common/envUtils.h +++ b/cpp/tensorrt_llm/common/envUtils.h @@ -96,7 +96,7 @@ bool getEnvDisableKVCacheTransferOverlap(); bool getEnvEnableReceiveKVCacheParallel(); -std::string const& getEnvKVCacheTransferOutputPath(); +std::string const& getEnvKVCacheTimeOutputPath(); bool getEnvTryZCopyForKVCacheTransfer(); diff --git a/cpp/tensorrt_llm/nanobind/batch_manager/bindings.cpp b/cpp/tensorrt_llm/nanobind/batch_manager/bindings.cpp index 2f144f3abc..d6149755e3 100644 --- a/cpp/tensorrt_llm/nanobind/batch_manager/bindings.cpp +++ b/cpp/tensorrt_llm/nanobind/batch_manager/bindings.cpp @@ -383,7 +383,7 @@ void initBindings(nb::module_& m) .def("set_first_scheduled_time", &tb::LlmRequest::setFirstScheduledTime) .def("update_perf_metrics", &tb::LlmRequest::updatePerfMetrics, nb::arg("iter_counter")) .def("remove_lora_tensors", &tb::LlmRequest::removeLoraTensors) - .def_rw_static("global_steady_clock_offset", &tb::LlmRequest::mGlobalSteadyClockOffset); + .def_rw_static("global_steady_clock_offset", &tb::LlmRequest::sGlobalSteadyClockOffset); nb::class_(m, "SequenceSlotManager") .def(nb::init(), nb::arg("max_num_slots"), diff --git a/cpp/tensorrt_llm/pybind/batch_manager/bindings.cpp b/cpp/tensorrt_llm/pybind/batch_manager/bindings.cpp index 2e628e7299..ecaffdda6a 100644 --- a/cpp/tensorrt_llm/pybind/batch_manager/bindings.cpp +++ b/cpp/tensorrt_llm/pybind/batch_manager/bindings.cpp @@ -389,7 +389,7 @@ void initBindings(pybind11::module_& m) .def("set_first_scheduled_time", &tb::LlmRequest::setFirstScheduledTime) .def("update_perf_metrics", &tb::LlmRequest::updatePerfMetrics, py::arg("iter_counter")) .def("remove_lora_tensors", &tb::LlmRequest::removeLoraTensors) - .def_readwrite_static("global_steady_clock_offset", &tb::LlmRequest::mGlobalSteadyClockOffset); + .def_readwrite_static("global_steady_clock_offset", &tb::LlmRequest::sGlobalSteadyClockOffset); py::classh(m, "SequenceSlotManager") .def(py::init(), py::arg("max_num_slots"), diff --git a/tests/integration/defs/disaggregated/test_disaggregated.py b/tests/integration/defs/disaggregated/test_disaggregated.py index ffc84143ae..720da1acbd 100644 --- a/tests/integration/defs/disaggregated/test_disaggregated.py +++ b/tests/integration/defs/disaggregated/test_disaggregated.py @@ -853,10 +853,12 @@ def test_disaggregated_kv_cache_time_output(disaggregated_test_root, llm_venv, lines = f.readlines() assert len(lines) > 1 assert lines[0].startswith( - "RequestID,Delay(ms),Duration(ms),Bandwidth(Gbps)") + "RequestID,RequestInfo,Preparation,Preprocess,Transmissions,Postprocess" + ) + assert ",Delay,Duration,Bandwidth(Gbps)" in lines[0] # get a send sample and match the recv sample = lines[1].split(',') - assert len(sample) >= 4 + assert len(sample) >= 9 with open(recv_file, "r") as f: lines = f.readlines() assert len(lines) > 1