diff --git a/.github/workflows/nexus.yml b/.github/workflows/nexus.yml index 6c803da1c..4aea9db3f 100644 --- a/.github/workflows/nexus.yml +++ b/.github/workflows/nexus.yml @@ -106,3 +106,30 @@ jobs: - name: Run TREL Tests run: | cd build/nexus && ctest -L trel --output-on-failure + + nexus-grpc-tests: + name: nexus-grpc-tests + runs-on: ubuntu-24.04 + steps: + - name: Harden Runner + uses: step-security/harden-runner@5ef0c079ce82195b2a36a210272d6b661572d83e # v2.14.2 + with: + egress-policy: audit + + - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 + with: + submodules: recursive + + - name: Bootstrap + run: | + sudo apt-get update + sudo apt-get --no-install-recommends install -y ninja-build libgrpc++-dev libprotobuf-dev protobuf-compiler-grpc + + - name: Build Nexus + run: | + mkdir -p build/nexus + OT_NEXUS_GRPC=ON top_builddir=build/nexus ./tests/nexus/build.sh + + - name: Run GRPC Tests + run: | + cd build/nexus && ./tests/nexus/nexus_grpc diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 485d7fb44..367975648 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -28,11 +28,13 @@ option(OT_BUILD_GTEST "enable gtest") -if(OT_FTD AND BUILD_TESTING AND (NOT OT_PLATFORM STREQUAL "nexus")) +if(OT_FTD AND BUILD_TESTING) target_compile_definitions(ot-config INTERFACE "OPENTHREAD_CONFIG_USE_STD_NEW=1") - add_subdirectory(unit) - if(OT_BUILD_GTEST) - add_subdirectory(gtest) + if(NOT OT_PLATFORM STREQUAL "nexus") + add_subdirectory(unit) + if(OT_BUILD_GTEST) + add_subdirectory(gtest) + endif() endif() endif() diff --git a/tests/fuzz/CMakeLists.txt b/tests/fuzz/CMakeLists.txt index 645f6715f..ba2e72657 100644 --- a/tests/fuzz/CMakeLists.txt +++ b/tests/fuzz/CMakeLists.txt @@ -39,6 +39,7 @@ set(COMMON_COMPILE_OPTIONS -DOPENTHREAD_FTD=1 -DOPENTHREAD_MTD=0 -DOPENTHREAD_RADIO=0 + -DOPENTHREAD_CONFIG_USE_STD_NEW=1 ) set(COMMON_LIBS diff --git a/tests/nexus/CMakeLists.txt b/tests/nexus/CMakeLists.txt index 75ca19eca..1e421c198 100644 --- a/tests/nexus/CMakeLists.txt +++ b/tests/nexus/CMakeLists.txt @@ -26,6 +26,10 @@ # POSSIBILITY OF SUCH DAMAGE. # +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) + set(COMMON_INCLUDES ${PROJECT_SOURCE_DIR}/include ${PROJECT_SOURCE_DIR}/src @@ -33,15 +37,57 @@ set(COMMON_INCLUDES ${PROJECT_SOURCE_DIR}/tests/nexus ${PROJECT_SOURCE_DIR}/tests/nexus/platform ${PROJECT_SOURCE_DIR}/examples/platforms/utils + ${CMAKE_CURRENT_BINARY_DIR} ) set(COMMON_COMPILE_OPTIONS -DOPENTHREAD_FTD=1 -DOPENTHREAD_MTD=0 -DOPENTHREAD_RADIO=0 + -DOPENTHREAD_CONFIG_USE_STD_NEW=1 ) -add_library(ot-nexus-platform +option(OT_NEXUS_GRPC "Enable Nexus gRPC" OFF) + +if(OT_NEXUS_GRPC) + find_package(gRPC CONFIG REQUIRED) + find_package(Protobuf REQUIRED) + + # Find gRPC C++ plugin + find_program(GRPC_CPP_PLUGIN grpc_cpp_plugin) + if(NOT GRPC_CPP_PLUGIN) + message(FATAL_ERROR "grpc_cpp_plugin not found! Please install protobuf-compiler-grpc") + endif() + + # Generate simulation.pb.cc/h and simulation.grpc.pb.cc/h + set(PROTO_FILE platform/simulation.proto) + get_filename_component(PROTO_ABS_FILE ${PROTO_FILE} ABSOLUTE) + get_filename_component(PROTO_PATH ${PROTO_ABS_FILE} PATH) + + set(SIMULATION_PROTO_SRCS + ${CMAKE_CURRENT_BINARY_DIR}/simulation.pb.cc + ${CMAKE_CURRENT_BINARY_DIR}/simulation.grpc.pb.cc + ) + set(SIMULATION_PROTO_HDRS + ${CMAKE_CURRENT_BINARY_DIR}/simulation.pb.h + ${CMAKE_CURRENT_BINARY_DIR}/simulation.grpc.pb.h + ) + + add_custom_command( + OUTPUT ${SIMULATION_PROTO_SRCS} ${SIMULATION_PROTO_HDRS} + COMMAND ${Protobuf_PROTOC_EXECUTABLE} + ARGS --cpp_out ${CMAKE_CURRENT_BINARY_DIR} + --grpc_out ${CMAKE_CURRENT_BINARY_DIR} + --plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN} + -I ${PROTO_PATH} + ${PROTO_ABS_FILE} + DEPENDS ${PROTO_ABS_FILE} + COMMENT "Generating simulation.pb.cc/h and simulation.grpc.pb.cc/h from ${PROTO_FILE}" + VERBATIM + ) +endif() + +set(NEXUS_PLATFORM_SOURCES platform/nexus_alarm.cpp platform/nexus_core.cpp platform/nexus_dns.cpp @@ -54,11 +100,26 @@ add_library(ot-nexus-platform platform/nexus_radio.cpp platform/nexus_radio_model.cpp platform/nexus_settings.cpp + platform/nexus_sim.cpp platform/nexus_trel.cpp platform/nexus_udp.cpp ../../examples/platforms/utils/mac_frame.cpp ) +if(OT_NEXUS_GRPC) + list(APPEND NEXUS_PLATFORM_SOURCES ${SIMULATION_PROTO_SRCS} platform/nexus_grpc.cpp) +endif() + +add_library(ot-nexus-platform + ${NEXUS_PLATFORM_SOURCES} +) + +if(OT_NEXUS_GRPC) + set_source_files_properties(${SIMULATION_PROTO_SRCS} PROPERTIES + COMPILE_FLAGS "-Wno-pedantic -Wno-error" + ) +endif() + target_include_directories(ot-nexus-platform PRIVATE ${COMMON_INCLUDES} @@ -70,12 +131,27 @@ target_compile_options(ot-nexus-platform ${OT_CFLAGS} ) +if(OT_NEXUS_GRPC) + target_compile_definitions(ot-nexus-platform + PRIVATE + OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE=1 + ) +endif() + target_link_libraries(ot-nexus-platform PRIVATE ot-config ${OT_MBEDTLS} ) +if(OT_NEXUS_GRPC) + target_link_libraries(ot-nexus-platform + PUBLIC + protobuf::libprotobuf + gRPC::grpc++ + ) +endif() + set(COMMON_LIBS ot-nexus-platform openthread-ftd @@ -316,6 +392,25 @@ ot_nexus_test(srp_lease "core;nexus") # Trel ot_nexus_test(trel "trel;nexus") +# Grpc +if(OT_NEXUS_GRPC) + ot_nexus_test(grpc "core;nexus") + target_compile_definitions(nexus_grpc PRIVATE OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE=1) +endif() + # Large network ot_nexus_test(full_network_reset "core;large_network;nexus") ot_nexus_test(large_network "core;large_network;nexus") + +# Live Demo Persistent Server +if(OT_NEXUS_GRPC) + add_executable(nexus_live_demo + platform/nexus_native.cpp + ) + + target_include_directories(nexus_live_demo PRIVATE ${COMMON_INCLUDES}) + target_link_libraries(nexus_live_demo PRIVATE ${COMMON_LIBS}) + + target_compile_options(nexus_live_demo PRIVATE ${COMMON_COMPILE_OPTIONS} ${OT_CFLAGS}) + target_compile_definitions(nexus_live_demo PRIVATE OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE=1) +endif() diff --git a/tests/nexus/build.sh b/tests/nexus/build.sh index c9eacde63..cd9df45fa 100755 --- a/tests/nexus/build.sh +++ b/tests/nexus/build.sh @@ -60,6 +60,7 @@ cd "${top_builddir}" || die "cd failed" cmake -GNinja -DOT_PLATFORM=nexus -DOT_COMPILE_WARNING_AS_ERROR=ON \ -DOT_THREAD_VERSION=1.4 -DOT_APP_CLI=OFF -DOT_APP_NCP=OFF -DOT_APP_RCP=OFF \ -DOT_15_4=${fifteenfour} \ + -DOT_NEXUS_GRPC="${OT_NEXUS_GRPC:-OFF}" \ -DOT_PROJECT_CONFIG="${top_srcdir}/tests/nexus/openthread-core-nexus-config.h" \ "${top_srcdir}" || die ninja || die diff --git a/tests/nexus/openthread-core-nexus-config.h b/tests/nexus/openthread-core-nexus-config.h index 9231c26b6..e5975ab2b 100644 --- a/tests/nexus/openthread-core-nexus-config.h +++ b/tests/nexus/openthread-core-nexus-config.h @@ -156,8 +156,19 @@ #define OPENTHREAD_CONFIG_TREL_MANAGE_DNSSD_ENABLE 1 #define OPENTHREAD_CONFIG_TREL_USE_HEAP_ENABLE 1 #define OPENTHREAD_CONFIG_UPTIME_ENABLE 1 +#define OPENTHREAD_CONFIG_USE_STD_NEW 1 #define OPENTHREAD_CONFIG_VERHOEFF_CHECKSUM_ENABLE 1 +/** + * @def OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE + * + * Define to 1 to enable gRPC support in Nexus simulation. + * + */ +#ifndef OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE +#define OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE 0 +#endif + // CLI configs #define OPENTHREAD_CONFIG_CLI_MAX_LINE_LENGTH 800 #define OPENTHREAD_CONFIG_CLI_LOG_INPUT_OUTPUT_ENABLE 1 diff --git a/tests/nexus/platform/nexus_core.cpp b/tests/nexus/platform/nexus_core.cpp index 0023bd61d..0ecaa7b89 100644 --- a/tests/nexus/platform/nexus_core.cpp +++ b/tests/nexus/platform/nexus_core.cpp @@ -27,6 +27,7 @@ */ #include "nexus_core.hpp" +#include "nexus_sim.hpp" #include #include @@ -49,8 +50,6 @@ Core::Core(void) , mPendingAction(false) , mSaveNodeLogs(false) , mNow(0) - , mObserver(nullptr) - { const char *pcapFile; const char *saveLogs; @@ -337,6 +336,7 @@ void Core::AddOmrPrefixTestVar(const char *aName, Node &aNode) OT_UNUSED_VARIABLE(aNode); #endif } + Core::~Core(void) { while (!mNodes.IsEmpty()) @@ -351,9 +351,25 @@ Core::~Core(void) sInUse = false; } +void Core::NotifyHeartbeat(void) +{ + for (Observer &observer : mObservers) + { + observer.OnHeartbeat(mNow); + } +} + +void Core::NotifyDumpState(void) +{ + for (Observer &observer : mObservers) + { + observer.DumpState(); + } +} + void Core::SetNodeEnabled(uint32_t aNodeId, bool aEnabled) { - Node *node = GetNodes().FindMatching(aNodeId); + Node *node = FindNodeById(aNodeId); if (node != nullptr) { @@ -372,6 +388,8 @@ void Core::SetNodeEnabled(uint32_t aNodeId, bool aEnabled) } } +Node *Core::FindNodeById(uint32_t aNodeId) { return mNodes.FindMatching(aNodeId); } + Node &Core::CreateNode(void) { Node *node; @@ -399,11 +417,12 @@ Node &Core::CreateNode(void) node->Get().SetReceiveCallback(Node::HandleIp6Receive, node); - if (mObserver) + node->Get().RegisterCallback(&Core::HandleNeighborTableChanged); + SuccessOrQuit(node->Get().RegisterCallback(&Core::HandleStateChanged, node)); + + for (Observer &observer : mObservers) { - node->Get().RegisterCallback(&Core::HandleNeighborTableChanged); - SuccessOrQuit(node->Get().RegisterCallback(&Core::HandleStateChanged, node)); - mObserver->OnNodeStateChanged(node); + observer.OnNodeStateChanged(node); } return *node; @@ -422,7 +441,7 @@ void Core::HandleNeighborTableChanged(otNeighborTableEvent aEvent, const otNeigh const Mac::ExtAddress *extAddr = nullptr; - VerifyOrExit(core.mObserver); + VerifyOrExit(!core.mObservers.IsEmpty()); Log("HandleNeighborTableChanged: Event %d", event); @@ -476,7 +495,10 @@ void Core::HandleNeighborTableChanged(otNeighborTableEvent aEvent, const otNeigh if (foundSrc && foundDst) { - core.mObserver->OnLinkUpdate(srcId, dstId, isActive); + for (Observer &observer : core.mObservers) + { + observer.OnLinkUpdate(srcId, dstId, isActive); + } } else { @@ -492,12 +514,15 @@ void Core::HandleStateChanged(otChangedFlags aFlags, void *aContext) { OT_UNUSED_VARIABLE(aFlags); - Observer *observer = Core::Get().GetObserver(); - Node *node = static_cast(aContext); + Core &core = Core::Get(); + Node *node = static_cast(aContext); - VerifyOrExit(observer != nullptr && node != nullptr); + VerifyOrExit(!core.mObservers.IsEmpty() && node != nullptr); - observer->OnNodeStateChanged(node); + for (Observer &observer : core.mObservers) + { + observer.OnNodeStateChanged(node); + } // Decoupled from flags to capture SED parent changes switch (node->Get().GetRole()) @@ -508,10 +533,9 @@ void Core::HandleStateChanged(otChangedFlags aFlags, void *aContext) if (node->Get().GetParentInfo(parentInfo) == kErrorNone) { - uint32_t srcId = node->GetInstance().GetId(); - uint32_t dstId = 0xffff; - Node *rxNode = - Core::Get().mNodes.FindMatching(static_cast(parentInfo.mExtAddress)); + uint32_t srcId = node->GetInstance().GetId(); + uint32_t dstId = 0xffff; + Node *rxNode = core.mNodes.FindMatching(AsCoreType(&parentInfo.mExtAddress)); if (rxNode != nullptr) { @@ -522,10 +546,16 @@ void Core::HandleStateChanged(otChangedFlags aFlags, void *aContext) { if (node->GetLastParentId() != 0xffff) { - observer->OnLinkUpdate(srcId, node->GetLastParentId(), false); + for (Observer &observer : core.mObservers) + { + observer.OnLinkUpdate(srcId, node->GetLastParentId(), false); + } } node->SetLastParentId(dstId); - observer->OnLinkUpdate(srcId, dstId, true); + for (Observer &observer : core.mObservers) + { + observer.OnLinkUpdate(srcId, dstId, true); + } } } break; @@ -534,7 +564,7 @@ void Core::HandleStateChanged(otChangedFlags aFlags, void *aContext) { uint32_t srcId = node->GetInstance().GetId(); - for (Node &rxNode : Core::Get().mNodes) + for (Node &rxNode : core.mNodes) { if (&rxNode == node) { @@ -546,14 +576,20 @@ void Core::HandleStateChanged(otChangedFlags aFlags, void *aContext) { uint32_t dstId = rxNode.GetInstance().GetId(); - observer->OnLinkUpdate(srcId, dstId, false); - observer->OnLinkUpdate(dstId, srcId, false); + for (Observer &observer : core.mObservers) + { + observer.OnLinkUpdate(srcId, dstId, false); + observer.OnLinkUpdate(dstId, srcId, false); + } } } if (node->GetLastParentId() != 0xffff) { - observer->OnLinkUpdate(srcId, node->GetLastParentId(), false); + for (Observer &observer : core.mObservers) + { + observer.OnLinkUpdate(srcId, node->GetLastParentId(), false); + } node->SetLastParentId(0xffff); } break; @@ -604,7 +640,21 @@ void Core::UpdateNextAlarmMicro(const Alarm &aAlarm) } } -bool Core::IsUiConnected(void) const { return mObserver && mObserver->IsConnected(); } +bool Core::IsUiConnected(void) const +{ + bool connected = false; + + for (const Observer &observer : mObservers) + { + if (observer.IsConnected()) + { + connected = true; + break; + } + } + + return connected; +} void Core::Reset(void) { @@ -612,9 +662,10 @@ void Core::Reset(void) mCurNodeId = 0; mNow = 0; mNextAlarmTime = NumericLimits::kMax; - if (mObserver) + + for (Observer &observer : mObservers) { - mObserver->OnClearEvents(); + observer.OnClearEvents(); } } @@ -691,7 +742,7 @@ void Core::ProcessRadio(Node &aNode) mPcap.WriteFrame(aNode.mRadio.mTxFrame, mNow); - if (mObserver) + if (!mObservers.IsEmpty()) { uint32_t dstNodeId = 0xffff; // Default to broadcast / unknown @@ -718,8 +769,11 @@ void Core::ProcessRadio(Node &aNode) dstAddr.ToString().AsCString()); } - mObserver->OnPacketEvent(aNode.GetInstance().GetId(), dstNodeId, aNode.mRadio.mTxFrame.GetPsdu(), - aNode.mRadio.mTxFrame.GetLength()); + for (Observer &observer : mObservers) + { + observer.OnPacketEvent(aNode.GetInstance().GetId(), dstNodeId, aNode.mRadio.mTxFrame.GetPsdu(), + aNode.mRadio.mTxFrame.GetLength()); + } } otPlatRadioTxStarted(&aNode.GetInstance(), &aNode.mRadio.mTxFrame); diff --git a/tests/nexus/platform/nexus_core.hpp b/tests/nexus/platform/nexus_core.hpp index 6106380ee..6b1ac6b64 100644 --- a/tests/nexus/platform/nexus_core.hpp +++ b/tests/nexus/platform/nexus_core.hpp @@ -29,6 +29,8 @@ #ifndef OT_NEXUS_PLATFORM_NEXUS_CORE_HPP_ #define OT_NEXUS_PLATFORM_NEXUS_CORE_HPP_ +#include "openthread-core-config.h" + #include #include "nexus_alarm.hpp" @@ -54,19 +56,25 @@ public: static Core &Get(void) { return *sCore; } - void SetObserver(Observer *aObserver) { mObserver = aObserver; } - Observer *GetObserver(void) { return mObserver; } + void AddObserver(Observer &aObserver) { mObservers.Push(aObserver); } + void RemoveObserver(Observer &aObserver) { IgnoreError(mObservers.Remove(aObserver)); } + void NotifyHeartbeat(void); + void NotifyDumpState(void); + + Node &CreateNode(void); + Node *FindNodeById(uint32_t aNodeId); - Node &CreateNode(void); LinkedList &GetNodes(void) { return mNodes; } TimeMilli GetNow(void) { return TimeMilli(static_cast(mNow / 1000u)); } TimeMicro GetNowMicro(void) { return TimeMicro(static_cast(mNow)); } uint64_t GetNowMicro64(void) const { return mNow; } void AdvanceTime(uint32_t aDuration); - bool IsUiConnected(void) const; - void Reset(void); - void SetNodeEnabled(uint32_t aNodeId, bool aEnabled); + + bool IsUiConnected(void) const; + + void Reset(void); + void SetNodeEnabled(uint32_t aNodeId, bool aEnabled); //- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // Test specific helper methods @@ -147,7 +155,7 @@ private: uint64_t mNow; uint64_t mNextAlarmTime; - Observer *mObserver; + LinkedList mObservers; }; } // namespace Nexus diff --git a/tests/nexus/platform/nexus_grpc.cpp b/tests/nexus/platform/nexus_grpc.cpp new file mode 100644 index 000000000..f3ed98481 --- /dev/null +++ b/tests/nexus/platform/nexus_grpc.cpp @@ -0,0 +1,582 @@ +/* + * Copyright (c) 2026, The OpenThread Authors. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "nexus_grpc.hpp" +#include "nexus_sim.hpp" + +#if OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "nexus_logging.hpp" +#include "nexus_node.hpp" +#include "simulation.grpc.pb.h" +#include "common/clearable.hpp" +#include "common/code_utils.hpp" +#include "mac/mac_frame.hpp" +#include "thread/neighbor_table.hpp" + +using namespace std::chrono_literals; + +namespace ot { +namespace Nexus { + +void GrpcServer::InitEvent(nexus::SimulationEvent &aEvent) { aEvent.set_timestamp_us(Core::Get().GetNowMicro64()); } + +nexus::NodeRole GrpcServer::GetEnhancedRole(Node *aNode) +{ + const Mle::Mle &mle = aNode->Get(); + Mle::DeviceRole role = mle.GetRole(); + + switch (role) + { + case Mle::kRoleDisabled: + return nexus::ROLE_DISABLED; + case Mle::kRoleDetached: + return nexus::ROLE_DETACHED; + case Mle::kRoleLeader: + return nexus::ROLE_LEADER; + case Mle::kRoleRouter: + return nexus::ROLE_ROUTER; + case Mle::kRoleChild: + if (mle.IsFullThreadDevice()) + { + return mle.IsRouterRoleAllowed() ? nexus::ROLE_REED : nexus::ROLE_FED; + } + + return mle.IsRxOnWhenIdle() ? nexus::ROLE_MED : nexus::ROLE_SED; + + default: + break; + } + + return nexus::ROLE_UNKNOWN; +} + +class GrpcServer::ServiceImpl final : public nexus::NexusService::Service +{ +public: + ServiceImpl(GrpcServer *aServer) + : mServerPtr(aServer) + { + } + + grpc::Status StreamEvents(grpc::ServerContext *aContext, + const nexus::StreamRequest * /* aRequest */, + grpc::ServerWriter *aWriter) override + { + static constexpr uint32_t kPopEventTimeoutMs = 1000; + grpc::Status status = grpc::Status::OK; + auto *queue = mServerPtr->AddEventQueue(); + + mServerPtr->ClientConnected(); + aWriter->SendInitialMetadata(); + mServerPtr->DumpState(queue); + + while (!aContext->IsCancelled()) + { + nexus::SimulationEvent event; + + if (mServerPtr->PopEvent(queue, event, kPopEventTimeoutMs)) + { + if (!aWriter->Write(event)) + { + break; + } + } + else + { + if (!mServerPtr->IsRunning()) + { + break; + } + + { + std::lock_guard lock(Simulation::Get()); + mServerPtr->InitEvent(event); + } + event.mutable_heartbeat(); + + if (!aWriter->Write(event)) + { + break; + } + } + } + + mServerPtr->RemoveEventQueue(queue); + mServerPtr->ClientDisconnected(); + return status; + } + + grpc::Status Reset(grpc::ServerContext * /* aContext */, + const nexus::ResetRequest * /* aRequest */, + nexus::ResetResponse * /* aResponse */) override + { + Log("Reset requested. Signaling simulation loop to restart..."); + + Simulation::Get().RequestRestart(); + + return grpc::Status::OK; + } + + grpc::Status SetSpeed(grpc::ServerContext * /* aContext */, + const nexus::SetSpeedRequest *aRequest, + nexus::SetSpeedResponse * /* aResponse */) override + { + float speedFactor = aRequest->speed_factor(); + + if (speedFactor < 0.0f) + { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Speed factor cannot be negative"); + } + + { + std::lock_guard lock(Simulation::Get()); + Simulation::Get().SetSpeedFactor(speedFactor); + } + + return grpc::Status::OK; + } + + grpc::Status SetNodeState(grpc::ServerContext * /* aContext */, + const nexus::SetNodeStateRequest *aRequest, + nexus::SetNodeStateResponse * /* aResponse */) override + { + std::lock_guard lock(Simulation::Get()); + + Log("SetNodeState: Node %d -> %s", aRequest->node_id(), aRequest->enabled() ? "ENABLE" : "DISABLE"); + Core::Get().SetNodeEnabled(aRequest->node_id(), aRequest->enabled()); + + return grpc::Status::OK; + } + + grpc::Status SetNodePosition(grpc::ServerContext * /* aContext */, + const nexus::SetNodePositionRequest *aRequest, + nexus::SetNodePositionResponse * /* aResponse */) override + { + grpc::Status status = grpc::Status::OK; + std::lock_guard lock(Simulation::Get()); + + Node *node = Core::Get().FindNodeById(aRequest->node_id()); + + VerifyOrExit(node != nullptr, status = grpc::Status(grpc::StatusCode::NOT_FOUND, "Node not found")); + + node->SetPosition(aRequest->x(), aRequest->y()); + mServerPtr->OnNodeStateChanged(node); + + exit: + return status; + } + + grpc::Status CreateNode(grpc::ServerContext * /* aContext */, + const nexus::CreateNodeRequest *aRequest, + nexus::CreateNodeResponse *aResponse) override + { + std::lock_guard lock(Simulation::Get()); + + Log("CreateNode: x=%f, y=%f", aRequest->x(), aRequest->y()); + Node &node = Core::Get().CreateNode(); + node.SetPosition(aRequest->x(), aRequest->y()); + mServerPtr->OnNodeStateChanged(&node); + aResponse->set_node_id(node.GetId()); + Log("CreateNode: done, id=%u", node.GetId()); + + return grpc::Status::OK; + } + + grpc::Status FormNetwork(grpc::ServerContext * /* aContext */, + const nexus::FormNetworkRequest *aRequest, + nexus::FormNetworkResponse * /* aResponse */) override + { + grpc::Status status = grpc::Status::OK; + std::lock_guard lock(Simulation::Get()); + + Node *node = Core::Get().FindNodeById(aRequest->node_id()); + + VerifyOrExit(node != nullptr, status = grpc::Status(grpc::StatusCode::NOT_FOUND, "Node not found")); + + node->Form(); + + exit: + return status; + } + + grpc::Status JoinNetwork(grpc::ServerContext * /* aContext */, + const nexus::JoinNetworkRequest *aRequest, + nexus::JoinNetworkResponse * /* aResponse */) override + { + grpc::Status status = grpc::Status::OK; + std::lock_guard lock(Simulation::Get()); + + Node *joiner = Core::Get().FindNodeById(aRequest->node_id()); + Node *target = Core::Get().FindNodeById(aRequest->target_node_id()); + + VerifyOrExit(joiner != nullptr && target != nullptr, + status = grpc::Status(grpc::StatusCode::NOT_FOUND, "Node not found")); + + { + static constexpr Node::JoinMode kJoinModes[] = { + Node::kAsFtd, // JOIN_MODE_FTD + Node::kAsMed, // JOIN_MODE_MED + Node::kAsSed, // JOIN_MODE_SED + Node::kAsFed, // JOIN_MODE_FED + }; + + VerifyOrExit(static_cast(aRequest->mode()) < GetArrayLength(kJoinModes), + status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid join mode")); + + Node::JoinMode mode = kJoinModes[aRequest->mode()]; + + joiner->Join(*target, mode); + } + + exit: + return status; + } + +private: + GrpcServer *mServerPtr; +}; + +GrpcServer::GrpcServer(const char *aExecutablePath) + : mRunning(true) + , mClientCount(0) + , mExecutablePath(aExecutablePath != nullptr ? aExecutablePath : "") +{ + mService = std::make_unique(this); + + grpc::ServerBuilder builder; + builder.AddListeningPort(kDefaultGrpcServerAddress, grpc::InsecureServerCredentials()); + builder.RegisterService(mService.get()); + + mServer = builder.BuildAndStart(); + + if (mServer) + { + Log("gRPC Server started at %s", kDefaultGrpcServerAddress); + mThread = std::thread([this]() { mServer->Wait(); }); + { + std::lock_guard lock(Simulation::Get()); + Core::Get().AddObserver(*this); + } + } + else + { + Log("Failed to start gRPC server!"); + } +} + +GrpcServer::~GrpcServer() +{ + { + std::lock_guard lock(Simulation::Get()); + Core::Get().RemoveObserver(*this); + } + mRunning = false; + mCv.notify_all(); + if (mServer) + { + mServer->Shutdown(); + } + if (mThread.joinable()) + { + mThread.join(); + } +} + +void GrpcServer::ClientConnected(void) +{ + mClientCount++; + Log("UI connected!"); +} + +void GrpcServer::ClientDisconnected(void) +{ + if (mClientCount.fetch_sub(1) == 1) + { + Log("All UI clients disconnected."); + } +} + +void GrpcServer::PushEvent(const nexus::SimulationEvent &aEvent, std::queue *aTargetQueue) +{ + std::lock_guard lock(mMutex); + + if (aTargetQueue != nullptr) + { + if (aTargetQueue->size() >= kMaxQueueSize) + { + aTargetQueue->pop(); + } + aTargetQueue->push(aEvent); + } + else + { + for (auto &queue : mClientQueues) + { + if (queue.size() >= kMaxQueueSize) + { + queue.pop(); + } + queue.push(aEvent); + } + } + mCv.notify_all(); +} + +bool GrpcServer::PopEvent(std::queue *aQueue, + nexus::SimulationEvent &aEvent, + uint32_t aTimeoutMs) +{ + bool found = false; + std::unique_lock lock(mMutex); + + VerifyOrExit(mCv.wait_for(lock, std::chrono::milliseconds(aTimeoutMs), + [this, aQueue] { return !aQueue->empty() || !mRunning; })); + + VerifyOrExit(mRunning || !aQueue->empty()); + + aEvent = aQueue->front(); + aQueue->pop(); + found = true; + +exit: + return found; +} + +std::queue *GrpcServer::AddEventQueue(void) +{ + std::lock_guard lock(mMutex); + mClientQueues.emplace_back(); + return &mClientQueues.back(); +} + +void GrpcServer::RemoveEventQueue(std::queue *aQueue) +{ + std::lock_guard lock(mMutex); + + for (auto it = mClientQueues.begin(); it != mClientQueues.end(); ++it) + { + if (&(*it) == aQueue) + { + mClientQueues.erase(it); + break; + } + } +} + +void GrpcServer::ClearQueue(void) +{ + std::lock_guard lock(mMutex); + + for (auto &queue : mClientQueues) + { + std::queue empty; + std::swap(queue, empty); + } +} + +GrpcServer::PacketInfo GrpcServer::GetPacketInfo(const Mac::Frame &aFrame) +{ + PacketInfo info; + + if (aFrame.IsAck()) + { + info.mProtocol = "IEEE 802.15.4 ACK"; + info.mSummary = "ACK"; + } + else if (aFrame.GetType() == Mac::Frame::kTypeData) + { + info.mProtocol = "IEEE 802.15.4 Data"; + info.mSummary = "Data"; + } + else if (aFrame.IsMacCommand()) + { + info.mProtocol = "IEEE 802.15.4 Command"; + info.mSummary = "Command"; + } + else + { + info.mProtocol = "IEEE 802.15.4"; + info.mSummary = "Other"; + } + + if (aFrame.IsSequencePresent()) + { + char buf[32]; + + snprintf(buf, sizeof(buf), " (seq=%u)", aFrame.GetSequence()); + info.mSummary += buf; + } + + return info; +} + +void GrpcServer::PushPacketEvent(uint32_t aSenderId, + uint32_t aDestinationId, + const uint8_t *aData, + uint16_t aLen, + std::queue *aTargetQueue) +{ + nexus::SimulationEvent event; + otRadioFrame radioFrame; + + InitEvent(event); + + auto *packetEvent = event.mutable_packet_captured(); + packetEvent->set_source_node_id(aSenderId); + packetEvent->set_destination_node_id(aDestinationId); + packetEvent->set_raw_payload(aData, aLen); + + ClearAllBytes(radioFrame); + radioFrame.mPsdu = const_cast(aData); + radioFrame.mLength = aLen; + + { + const Mac::Frame &frame = static_cast(radioFrame); + PacketInfo info = GetPacketInfo(frame); + + packetEvent->set_protocol(info.mProtocol); + packetEvent->set_summary(info.mSummary); + } + + PushEvent(event, aTargetQueue); +} + +void GrpcServer::PushNodeEvent(Node *aNode, std::queue *aTargetQueue) +{ + nexus::SimulationEvent event; + InitEvent(event); + + auto *nodeEvent = event.mutable_node_update(); + nodeEvent->set_node_id(aNode->GetId()); + nodeEvent->set_role(GetEnhancedRole(aNode)); + nodeEvent->set_rloc16(aNode->Get().GetRloc16()); + nodeEvent->set_x(aNode->GetPositionX()); + nodeEvent->set_y(aNode->GetPositionY()); + + Log("PushNodeEvent: id=%u, role=%d, x=%f, y=%f", nodeEvent->node_id(), static_cast(nodeEvent->role()), + nodeEvent->x(), nodeEvent->y()); + + PushEvent(event, aTargetQueue); +} + +void GrpcServer::PushLinkUpdate(uint32_t aSrcId, + uint32_t aDstId, + bool aIsActive, + std::queue *aTargetQueue) +{ + Log("PushLinkUpdate: src=%u, dst=%u, active=%d", aSrcId, aDstId, aIsActive); + + nexus::SimulationEvent event; + InitEvent(event); + + auto *linkEvent = event.mutable_link_update(); + linkEvent->set_source_node_id(aSrcId); + linkEvent->set_destination_node_id(aDstId); + linkEvent->set_is_active(aIsActive); + + PushEvent(event, aTargetQueue); +} + +void GrpcServer::OnHeartbeat(uint64_t aTimestampUs) +{ + nexus::SimulationEvent event; + event.set_timestamp_us(aTimestampUs); + event.mutable_heartbeat(); + PushEvent(event); +} + +void GrpcServer::DumpState(std::queue *aQueue) +{ + std::lock_guard lock(Simulation::Get()); + + { + nexus::SimulationEvent event; + event.set_timestamp_us(Core::Get().GetNowMicro64()); + event.mutable_heartbeat(); + PushEvent(event, aQueue); + } + + Log("DumpState: Starting sync..."); + + // 1. Sync Nodes + for (Node &node : Core::Get().GetNodes()) + { + PushNodeEvent(&node, aQueue); + } + + // 2. Sync Links + for (Node &node : Core::Get().GetNodes()) + { + otNeighborInfoIterator iterator = OT_NEIGHBOR_INFO_ITERATOR_INIT; + Neighbor::Info neighborInfo; + + while (node.Get().GetNextNeighborInfo(iterator, neighborInfo) == kErrorNone) + { + Node *neighborNode = + Core::Get().GetNodes().FindMatching(static_cast(neighborInfo.mExtAddress)); + + if (neighborNode != nullptr) + { + PushLinkUpdate(node.GetId(), neighborNode->GetId(), true, aQueue); + } + } + } +} + +void GrpcServer::OnNodeStateChanged(Node *aNode) { PushNodeEvent(aNode); } + +void GrpcServer::OnLinkUpdate(uint32_t aSrcId, uint32_t aDstId, bool aIsActive) +{ + PushLinkUpdate(aSrcId, aDstId, aIsActive); +} + +void GrpcServer::OnPacketEvent(uint32_t aSenderId, uint32_t aDestinationId, const uint8_t *aData, uint16_t aLen) +{ + PushPacketEvent(aSenderId, aDestinationId, aData, aLen); +} + +void GrpcServer::OnClearEvents(void) { ClearQueue(); } + +bool GrpcServer::IsConnected(void) const { return mClientCount > 0; } + +} // namespace Nexus +} // namespace ot + +#endif // OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE diff --git a/tests/nexus/platform/nexus_grpc.hpp b/tests/nexus/platform/nexus_grpc.hpp new file mode 100644 index 000000000..d466ca0c9 --- /dev/null +++ b/tests/nexus/platform/nexus_grpc.hpp @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2026, The OpenThread Authors. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef OT_NEXUS_PLATFORM_NEXUS_GRPC_HPP_ +#define OT_NEXUS_PLATFORM_NEXUS_GRPC_HPP_ + +#include "openthread-core-config.h" + +#if OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "nexus_core.hpp" +#include "simulation.pb.h" + +namespace grpc { +class Server; +} // namespace grpc + +namespace ot { +namespace Nexus { + +constexpr char kDefaultGrpcServerAddress[] = "127.0.0.1:50052"; + +class GrpcServer : public Observer +{ +public: + /** + * Initializes the gRPC server. + * + * @param[in] aExecutablePath The path to the executable (used for Reset RPC). + */ + GrpcServer(const char *aExecutablePath = nullptr); + + /** + * Destructor for the gRPC server. + */ + ~GrpcServer() override; + + // Observer implementation + void OnNodeStateChanged(Node *aNode) override; + void OnLinkUpdate(uint32_t aSrcId, uint32_t aDstId, bool aIsActive) override; + void OnPacketEvent(uint32_t aSenderId, uint32_t aDestinationId, const uint8_t *aData, uint16_t aLen) override; + void OnClearEvents(void) override; + void OnHeartbeat(uint64_t aTimestampUs) override; + void DumpState(void) override { DumpState(nullptr); } + void DumpState(std::queue *aQueue); + bool IsConnected(void) const override; + + /** + * Pushes a simulation event to the connected clients. + * + * @param[in] aEvent The event to push. + * @param[in] aTargetQueue An optional pointer to a specific client's event queue. + */ + void PushEvent(const nexus::SimulationEvent &aEvent, std::queue *aTargetQueue = nullptr); + + /** + * Gets the executable path. + * + * @returns The executable path. + */ + const std::string &GetExecutablePath(void) const { return mExecutablePath; } + + /** + * Indicates whether or not the server is running. + * + * @retval TRUE If the server is running. + * @retval FALSE If the server is not running. + */ + bool IsRunning(void) const { return mRunning; } + +private: + static constexpr size_t kMaxQueueSize = 1000; + + class ServiceImpl; + + struct PacketInfo + { + const char *mProtocol; + std::string mSummary; + }; + + static nexus::NodeRole GetEnhancedRole(Node *aNode); + static PacketInfo GetPacketInfo(const Mac::Frame &aFrame); + + /** + * Handles a new client connection. + */ + void ClientConnected(void); + + /** + * Handles a client disconnection. + */ + void ClientDisconnected(void); + + /** + * Pops an event from the event queue for a specific client. + * + * @param[in] aQueue A pointer to the client's event queue. + * @param[out] aEvent A reference to store the popped event. + * @param[in] aTimeoutMs The timeout in milliseconds to wait for an event. + * + * @returns TRUE if an event was popped, FALSE otherwise. + */ + bool PopEvent(std::queue *aQueue, nexus::SimulationEvent &aEvent, uint32_t aTimeoutMs); + + /** + * Adds an event queue for a new client. + * + * @returns A pointer to the newly created event queue. + */ + std::queue *AddEventQueue(void); + + /** + * Removes an event queue for a disconnected client. + * + * @param[in] aQueue A pointer to the event queue to remove. + */ + void RemoveEventQueue(std::queue *aQueue); + + /** + * Initializes a simulation event with common fields (like timestamp). + * + * @param[out] aEvent A reference to the event to initialize. + */ + void InitEvent(nexus::SimulationEvent &aEvent); + + /** + * Pushes a node update event. + * + * @param[in] aNode The node that was updated. + * @param[in] aTargetQueue An optional pointer to a specific client's event queue. + */ + void PushNodeEvent(Node *aNode, std::queue *aTargetQueue = nullptr); + + /** + * Pushes a link update event. + * + * @param[in] aSrcId The source node ID. + * @param[in] aDstId The destination node ID. + * @param[in] aIsActive TRUE if the link is active, FALSE otherwise. + * @param[in] aTargetQueue An optional pointer to a specific client's event queue. + */ + void PushLinkUpdate(uint32_t aSrcId, + uint32_t aDstId, + bool aIsActive, + std::queue *aTargetQueue = nullptr); + + /** + * Pushes a packet capture event. + * + * @param[in] aSenderId The sender node ID. + * @param[in] aDestinationId The destination node ID. + * @param[in] aData A pointer to the packet data. + * @param[in] aLen The length of the packet data. + * @param[in] aTargetQueue An optional pointer to a specific client's event queue. + */ + void PushPacketEvent(uint32_t aSenderId, + uint32_t aDestinationId, + const uint8_t *aData, + uint16_t aLen, + std::queue *aTargetQueue = nullptr); + + /** + * Clears the event queue. + */ + void ClearQueue(void); + + std::unique_ptr mService; + std::unique_ptr mServer; + std::thread mThread; + std::mutex mMutex; + std::condition_variable mCv; + std::list> mClientQueues; + std::atomic mRunning; + std::atomic mClientCount; + std::string mExecutablePath; +}; + +} // namespace Nexus +} // namespace ot + +#endif // OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE + +#endif // OT_NEXUS_PLATFORM_NEXUS_GRPC_HPP_ diff --git a/tests/nexus/platform/nexus_native.cpp b/tests/nexus/platform/nexus_native.cpp new file mode 100644 index 000000000..1164033ff --- /dev/null +++ b/tests/nexus/platform/nexus_native.cpp @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2026, The OpenThread Authors. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "openthread-core-config.h" + +#include +#include +#include +#include +#include + +#include + +#include "platform/nexus_core.hpp" +#include "platform/nexus_sim.hpp" +#if OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE +#include "platform/nexus_grpc.hpp" +#endif +#include "platform/nexus_node.hpp" + +namespace ot { +namespace Nexus { + +static std::atomic sRunning(true); + +static void HandleSignal(int aSignal) +{ + OT_UNUSED_VARIABLE(aSignal); + sRunning = false; +} + +static void WaitForPlay(Simulation &aSim) +{ + static constexpr uint32_t kWaitTimeoutMs = 1000; + static constexpr float kPlaySpeedThreshold = 0.1f; + + Log("Waiting for Play command to start simulation..."); + + while (sRunning) + { + float speedFactor; + + { + std::lock_guard lock(aSim); + speedFactor = aSim.GetSpeedFactor(); + } + + if (speedFactor >= kPlaySpeedThreshold) + { + break; + } + + aSim.WaitSpeedChange(kWaitTimeoutMs); + } + + if (sRunning) + { + Log("Play command received! Running simulation loop..."); + } +} + +static void HandleSimulationStep(Core &aNexus, + Simulation &aSim, + uint32_t aAdvanceTimeMs, + uint32_t &aHeartbeatTimerRealTimeUs) +{ + std::lock_guard lock(aSim); + float speedFactor; + + aNexus.AdvanceTime(aAdvanceTimeMs); + speedFactor = aSim.GetSpeedFactor(); + +#if OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE + if (speedFactor > 0.0f) + { + static constexpr uint32_t kHeartbeatSyncIntervalUs = 100000; // 100ms real-time + + uint64_t increment = static_cast((static_cast(aAdvanceTimeMs) * 1000.0) / speedFactor); + uint64_t newTotal = static_cast(aHeartbeatTimerRealTimeUs) + increment; + + if (newTotal >= kHeartbeatSyncIntervalUs) + { + aNexus.NotifyHeartbeat(); + aHeartbeatTimerRealTimeUs = static_cast(newTotal % kHeartbeatSyncIntervalUs); + } + else + { + aHeartbeatTimerRealTimeUs = static_cast(newTotal); + } + } +#else + OT_UNUSED_VARIABLE(aHeartbeatTimerRealTimeUs); + OT_UNUSED_VARIABLE(speedFactor); +#endif +} + +static void SleepUntilIntendedTime(std::chrono::steady_clock::time_point aStartTime, + uint32_t aAdvanceTimeMs, + float aSpeedFactor) +{ + std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now(); + auto elapsedUs = std::chrono::duration_cast(endTime - aStartTime).count(); + uint64_t intendedSleep = static_cast((aAdvanceTimeMs * 1000.0f) / aSpeedFactor); + + if (intendedSleep > static_cast(elapsedUs)) + { + std::this_thread::sleep_for(std::chrono::microseconds(intendedSleep - elapsedUs)); + } +} + +void LiveDemo(int argc, char *argv[]) +{ + static constexpr uint32_t kAdvanceTimeMs = 100; + static constexpr uint32_t kIdleSleepMs = 100; + + Core nexus; + Simulation sim; +#if OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE + auto grpcServer = std::make_unique(argv[0]); +#endif + + signal(SIGINT, HandleSignal); + signal(SIGTERM, HandleSignal); + + Log("================================================================================"); + Log("Starting OpenThread Nexus Simulator - Dynamic Topology"); + Log("================================================================================"); + + WaitForPlay(sim); + + uint32_t heartbeatTimerRealTimeUs = 0; + + while (sRunning) + { + float speedFactor; + bool isUiConnected; + std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now(); + + if (sim.IsRestartRequested()) + { + break; + } + + { + std::lock_guard lock(sim); + isUiConnected = nexus.IsUiConnected(); + speedFactor = sim.GetSpeedFactor(); + } + + if (speedFactor <= 0.0f) + { + sim.WaitSpeedChange(kAdvanceTimeMs); + continue; + } + + if (isUiConnected) + { + HandleSimulationStep(nexus, sim, kAdvanceTimeMs, heartbeatTimerRealTimeUs); + SleepUntilIntendedTime(startTime, kAdvanceTimeMs, speedFactor); + } + else + { + sim.WaitSpeedChange(kIdleSleepMs); + } + } + + if (sim.IsRestartRequested()) + { + Log("Restarting process..."); + +#if OPENTHREAD_NEXUS_CONFIG_GRPC_ENABLE + std::string execPath = grpcServer->GetExecutablePath(); +#else + std::string execPath = argv[0]; +#endif + if (execPath.empty()) + { + execPath = argv[0]; + } + + std::vector args; + args.push_back(const_cast(execPath.data())); + for (int i = 1; i < argc; i++) + { + args.push_back(argv[i]); + } + args.push_back(nullptr); + execvp(args[0], args.data()); + + // If execvp returns, it failed + Log("execvp failed: %s", strerror(errno)); + _exit(1); + } + + Log("Simulation terminated gracefully."); +} + +} // namespace Nexus +} // namespace ot + +int main(int argc, char *argv[]) +{ + ot::Nexus::LiveDemo(argc, argv); + return 0; +} diff --git a/tests/nexus/platform/nexus_observer.hpp b/tests/nexus/platform/nexus_observer.hpp index b8c61dbcd..431433b27 100644 --- a/tests/nexus/platform/nexus_observer.hpp +++ b/tests/nexus/platform/nexus_observer.hpp @@ -36,6 +36,8 @@ #include +#include "common/linked_list.hpp" + namespace ot { namespace Nexus { @@ -46,7 +48,7 @@ class Node; * * This class defines the interface for receiving simulation events from the Nexus core. */ -class Observer +class Observer : public LinkedListEntry { public: virtual ~Observer(void) = default; @@ -82,6 +84,18 @@ public: */ virtual void OnClearEvents(void) = 0; + /** + * This method is called to notify a heartbeat with current simulation time. + * + * @param[in] aTimestampUs The current simulation time in microseconds. + */ + virtual void OnHeartbeat(uint64_t aTimestampUs) = 0; + + /** + * This method is called to dump the current simulation state. + */ + virtual void DumpState(void) = 0; + /** * This method indicates whether or not the observer is connected. * @@ -89,6 +103,10 @@ public: * @retval FALSE If the observer is not connected. */ virtual bool IsConnected(void) const = 0; + +private: + friend class LinkedListEntry; + Observer *mNext; }; } // namespace Nexus diff --git a/tests/nexus/platform/nexus_sim.cpp b/tests/nexus/platform/nexus_sim.cpp new file mode 100644 index 000000000..7424049b0 --- /dev/null +++ b/tests/nexus/platform/nexus_sim.cpp @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2026, The OpenThread Authors. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "nexus_sim.hpp" + +namespace ot { +namespace Nexus { + +Simulation *Simulation::sSimulation = nullptr; + +Simulation::Simulation(void) + : mSpeedFactor(0.0f) + , mRestartRequested(false) +{ + sSimulation = this; +} + +void Simulation::SetSpeedFactor(float aFactor) +{ + mSpeedFactor = aFactor; + mCv.notify_all(); +} + +void Simulation::RequestRestart(void) +{ + mRestartRequested = true; + mCv.notify_all(); +} + +void Simulation::WaitSpeedChange(uint32_t aTimeoutMs) +{ + std::unique_lock lock(mMutex); + WaitSpeedChange(lock, aTimeoutMs); +} + +void Simulation::Lock(void) { mMutex.lock(); } + +void Simulation::Unlock(void) { mMutex.unlock(); } + +} // namespace Nexus +} // namespace ot diff --git a/tests/nexus/platform/nexus_sim.hpp b/tests/nexus/platform/nexus_sim.hpp new file mode 100644 index 000000000..1592aa57e --- /dev/null +++ b/tests/nexus/platform/nexus_sim.hpp @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2026, The OpenThread Authors. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef OT_NEXUS_PLATFORM_NEXUS_SIM_HPP_ +#define OT_NEXUS_PLATFORM_NEXUS_SIM_HPP_ + +#include +#include +#include + +namespace ot { +namespace Nexus { + +class Simulation +{ +public: + Simulation(void); + + /** + * Sets the simulation speed factor. + * + * The caller must hold the simulation lock. + * + * @param[in] aFactor The speed factor (e.g., 1.0 for real-time, 0.0 for paused). + */ + void SetSpeedFactor(float aFactor); + + /** + * Gets the simulation speed factor. + * + * This method is thread-safe and does not require holding the simulation lock. + * + * @returns The simulation speed factor. + */ + float GetSpeedFactor(void) const { return mSpeedFactor; } + + /** + * Waits for the speed factor to change or a timeout. + * + * The caller must NOT hold the simulation lock. + * + * @param[in] aTimeoutMs The timeout in milliseconds. + */ + void WaitSpeedChange(uint32_t aTimeoutMs); + + /** + * Waits for the speed factor to change or a timeout. + * + * The caller must hold the simulation lock and pass it as @p aLock. + * The @p aLock must be a Lockable type (e.g., std::unique_lock). + * + * @param[in] aLock The simulation lock. + * @param[in] aTimeoutMs The timeout in milliseconds. + */ + template void WaitSpeedChange(LockType &aLock, uint32_t aTimeoutMs) + { + float initialFactor = mSpeedFactor; + mCv.wait_for(aLock, std::chrono::milliseconds(aTimeoutMs), + [this, initialFactor] { return mSpeedFactor != initialFactor || mRestartRequested; }); + } + + /** + * Requests a restart of the simulation. + */ + void RequestRestart(void); + + /** + * Indicates whether or not a restart has been requested. + * + * @retval TRUE If a restart has been requested. + * @retval FALSE If a restart has not been requested. + */ + bool IsRestartRequested(void) const { return mRestartRequested; } + + /** + * Locks the simulation. + */ + void Lock(void); + + /** + * Unlocks the simulation. + */ + void Unlock(void); + + // Methods to support `Lockable` concept and `std::lock_guard` + void lock(void) { Lock(); } + void unlock(void) { Unlock(); } + + static Simulation &Get(void) { return *sSimulation; } + +private: + std::atomic mSpeedFactor; + std::atomic mRestartRequested; + mutable std::mutex mMutex; + std::condition_variable_any mCv; + + static Simulation *sSimulation; +}; + +} // namespace Nexus +} // namespace ot + +#endif // OT_NEXUS_PLATFORM_NEXUS_SIM_HPP_ diff --git a/tests/nexus/platform/simulation.proto b/tests/nexus/platform/simulation.proto new file mode 100644 index 000000000..cd1975d61 --- /dev/null +++ b/tests/nexus/platform/simulation.proto @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2026, The OpenThread Authors. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +syntax = "proto3"; + +package nexus; + +/** + * Represents an event occurring within the simulation. + */ +message SimulationEvent { + uint64 timestamp_us = 1; // Simulation time in microseconds + oneof event { + NodeUpdate node_update = 2; // Node state or position update + LinkUpdate link_update = 3; // Link connectivity change + PacketCaptured packet_captured = 4; // Radio packet capture + Heartbeat heartbeat = 5; // Periodic keep-alive and clock sync + } +} + +/** + * Periodic event sent to maintain the gRPC stream and sync simulation time. + */ +message Heartbeat {} + +/** + * Detailed Thread device roles. + */ +enum NodeRole { + ROLE_UNKNOWN = 0; + ROLE_DISABLED = 1; + ROLE_DETACHED = 2; + ROLE_CHILD = 3; + ROLE_ROUTER = 4; + ROLE_LEADER = 5; + ROLE_REED = 6; // Router Eligible End Device + ROLE_FED = 7; // Full End Device + ROLE_MED = 8; // Minimal End Device + ROLE_SED = 9; // Sleepy End Device +} + +/** + * Information about a node's current state and position. + */ +message NodeUpdate { + uint32 node_id = 1; + NodeRole role = 2; + uint32 rloc16 = 3; + float x = 4; + float y = 5; +} + +/** + * Information about a radio link between two nodes. + */ +message LinkUpdate { + uint32 source_node_id = 1; + uint32 destination_node_id = 2; + bool is_active = 3; + float link_quality = 4; // e.g., RSSI or link quality index +} + +/** + * Captured radio packet data. + */ +message PacketCaptured { + uint32 source_node_id = 1; + uint32 destination_node_id = 2; + string protocol = 3; // Human-readable protocol name (e.g., "IEEE 802.15.4 ACK") + string summary = 4; // Short summary of packet content + bytes raw_payload = 5; // Raw PSDU data + string decoded_info = 6; // Reserved for future use +} + +message ResetRequest {} +message ResetResponse {} + +/** + * Request to change simulation speed. + */ +message SetSpeedRequest { + float speed_factor = 1; // 1.0 for real-time, 0.0 for paused +} +message SetSpeedResponse {} + +/** + * Request to enable or disable a node's radio. + */ +message SetNodeStateRequest { + uint32 node_id = 1; + bool enabled = 2; +} +message SetNodeStateResponse {} + +/** + * Request to move a node in the simulation space. + */ +message SetNodePositionRequest { + uint32 node_id = 1; + float x = 2; + float y = 3; +} +message SetNodePositionResponse {} + +/** + * Request to create a new node at a given position. + */ +message CreateNodeRequest { + float x = 1; + float y = 2; +} + +message CreateNodeResponse { + uint32 node_id = 1; +} + +/** + * Request to form a new Thread network on the specified node. + */ +message FormNetworkRequest { + uint32 node_id = 1; +} + +message FormNetworkResponse {} + +/** + * Thread device types for joining. + */ +enum JoinMode { + JOIN_MODE_FTD = 0; + JOIN_MODE_MED = 1; + JOIN_MODE_SED = 2; + JOIN_MODE_FED = 3; +} + +/** + * Request for a node to join an existing network. + */ +message JoinNetworkRequest { + uint32 node_id = 1; + uint32 target_node_id = 2; + JoinMode mode = 3; +} + +message JoinNetworkResponse {} + +/** + * Nexus simulation control and monitoring service. + */ +service NexusService { + /** + * Streams simulation events to the client. + */ + rpc StreamEvents(StreamRequest) returns (stream SimulationEvent); + + /** + * Resets the simulation to the initial state. + */ + rpc Reset(ResetRequest) returns (ResetResponse); + + /** + * Sets the simulation speed factor. + */ + rpc SetSpeed(SetSpeedRequest) returns (SetSpeedResponse); + + /** + * Enables or disables a node. + */ + rpc SetNodeState(SetNodeStateRequest) returns (SetNodeStateResponse); + + /** + * Moves a node to a new position. + */ + rpc SetNodePosition(SetNodePositionRequest) returns (SetNodePositionResponse); + + /** + * Adds a new node to the simulation. + */ + rpc CreateNode(CreateNodeRequest) returns (CreateNodeResponse); + + /** + * Forms a network on a node. + */ + rpc FormNetwork(FormNetworkRequest) returns (FormNetworkResponse); + + /** + * Joins a node to a network. + */ + rpc JoinNetwork(JoinNetworkRequest) returns (JoinNetworkResponse); +} + +message StreamRequest { + bool live = 1; // Reserved for future use +} diff --git a/tests/nexus/test_grpc.cpp b/tests/nexus/test_grpc.cpp new file mode 100644 index 000000000..0c478542a --- /dev/null +++ b/tests/nexus/test_grpc.cpp @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2026, The OpenThread Authors. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "openthread-core-config.h" + +#include + +#include "platform/nexus_core.hpp" +#include "platform/nexus_grpc.hpp" +#include "platform/nexus_node.hpp" +#include "platform/nexus_sim.hpp" + +#include "simulation.grpc.pb.h" + +namespace ot { +namespace Nexus { + +void TestGrpc(void) +{ + Core nexus; + Simulation sim; + GrpcServer server; + + auto channel = grpc::CreateChannel(kDefaultGrpcServerAddress, grpc::InsecureChannelCredentials()); + + // Wait for the channel to be ready with a timeout + VerifyOrQuit(channel->WaitForConnected(std::chrono::system_clock::now() + std::chrono::seconds(5)), + "gRPC channel failed to connect within timeout"); + + auto stub = nexus::NexusService::NewStub(channel); + + uint32_t leaderId; + + Log("---------------------------------------------------------------------------------------"); + Log("Test CreateNode RPC"); + { + nexus::CreateNodeRequest request; + request.set_x(10.0); + request.set_y(20.0); + nexus::CreateNodeResponse response; + grpc::ClientContext context; + grpc::Status status = stub->CreateNode(&context, request, &response); + + VerifyOrQuit(status.ok(), "CreateNode RPC failed"); + leaderId = response.node_id(); + VerifyOrQuit(leaderId == 0, "Invalid node ID returned"); + + std::lock_guard lock(sim); + Node *node = nexus.FindNodeById(leaderId); + VerifyOrQuit(node != nullptr, "Node not found in Core"); + VerifyOrQuit(node->GetPositionX() == 10.0, "Wrong X position"); + VerifyOrQuit(node->GetPositionY() == 20.0, "Wrong Y position"); + } + + Log("---------------------------------------------------------------------------------------"); + Log("Test SetSpeed RPC"); + { + nexus::SetSpeedRequest request; + request.set_speed_factor(2.5); + nexus::SetSpeedResponse response; + grpc::ClientContext context; + grpc::Status status = stub->SetSpeed(&context, request, &response); + + VerifyOrQuit(status.ok(), "SetSpeed RPC failed"); + + { + std::lock_guard lock(sim); + VerifyOrQuit(sim.GetSpeedFactor() == 2.5, "Speed factor not set correctly"); + } + } + + Log("---------------------------------------------------------------------------------------"); + Log("Test StreamEvents"); + { + grpc::ClientContext context; + nexus::StreamRequest request; + auto reader = stub->StreamEvents(&context, request); + + // The StreamEvents call should trigger DumpState + nexus::SimulationEvent event; + VerifyOrQuit(reader->Read(&event), "Failed to read event from stream"); + + // Push an event manually to verify it comes through + nexus::SimulationEvent manualEvent; + manualEvent.set_timestamp_us(987654); + server.PushEvent(manualEvent); + + bool foundManualEvent = false; + // We might get multiple events (DumpState pushes many) + for (int i = 0; i < 100; i++) + { + if (!reader->Read(&event)) + break; + if (event.timestamp_us() == 987654) + { + foundManualEvent = true; + break; + } + } + VerifyOrQuit(foundManualEvent, "Failed to receive manual event via stream"); + } + + Log("---------------------------------------------------------------------------------------"); + Log("Test SetNodePosition RPC"); + { + nexus::SetNodePositionRequest request; + request.set_node_id(leaderId); + request.set_x(30.0); + request.set_y(40.0); + nexus::SetNodePositionResponse response; + grpc::ClientContext context; + grpc::Status status = stub->SetNodePosition(&context, request, &response); + + VerifyOrQuit(status.ok(), "SetNodePosition RPC failed"); + + std::lock_guard lock(sim); + Node *node = nexus.FindNodeById(leaderId); + VerifyOrQuit(node != nullptr, "Leader node not found"); + VerifyOrQuit(node->GetPositionX() == 30.0, "Wrong X position after SetNodePosition"); + VerifyOrQuit(node->GetPositionY() == 40.0, "Wrong Y position after SetNodePosition"); + } + + Log("---------------------------------------------------------------------------------------"); + Log("Test FormNetwork RPC"); + { + nexus::FormNetworkRequest request; + request.set_node_id(leaderId); + nexus::FormNetworkResponse response; + grpc::ClientContext context; + grpc::Status status = stub->FormNetwork(&context, request, &response); + + VerifyOrQuit(status.ok(), "FormNetwork RPC failed"); + + // Advancing time to allow forming + { + std::lock_guard lock(sim); + nexus.AdvanceTime(13000); + } + + std::lock_guard lock(sim); + Node *node = nexus.FindNodeById(leaderId); + VerifyOrQuit(node != nullptr, "Leader node not found"); + VerifyOrQuit(node->Get().IsLeader(), "Node failed to become leader"); + } + + Log("---------------------------------------------------------------------------------------"); + Log("Test JoinNetwork RPC"); + { + nexus::CreateNodeRequest createRequest; + createRequest.set_x(50.0); + createRequest.set_y(50.0); + nexus::CreateNodeResponse createResponse; + grpc::ClientContext createCtx; + stub->CreateNode(&createCtx, createRequest, &createResponse); + uint32_t joinerId = createResponse.node_id(); + + nexus::JoinNetworkRequest joinRequest; + joinRequest.set_node_id(joinerId); + joinRequest.set_target_node_id(leaderId); + joinRequest.set_mode(nexus::JOIN_MODE_FTD); + nexus::JoinNetworkResponse joinResponse; + grpc::ClientContext joinCtx; + grpc::Status status = stub->JoinNetwork(&joinCtx, joinRequest, &joinResponse); + + VerifyOrQuit(status.ok(), "JoinNetwork RPC failed"); + + { + std::lock_guard lock(sim); + nexus.AdvanceTime(10000); + } + + std::lock_guard lock(sim); + Node *joiner = nexus.FindNodeById(joinerId); + VerifyOrQuit(joiner != nullptr, "Joiner node not found"); + VerifyOrQuit(joiner->Get().IsChild() || joiner->Get().IsRouter(), + "Node failed to join network"); + } + + Log("---------------------------------------------------------------------------------------"); + Log("Test SetNodeState RPC (Disable)"); + { + nexus::SetNodeStateRequest request; + request.set_node_id(leaderId); + request.set_enabled(false); + nexus::SetNodeStateResponse response; + grpc::ClientContext context; + grpc::Status status = stub->SetNodeState(&context, request, &response); + + VerifyOrQuit(status.ok(), "SetNodeState (Disable) failed"); + + std::lock_guard lock(sim); + Node *node = nexus.FindNodeById(leaderId); + VerifyOrQuit(node != nullptr, "Leader node not found"); + VerifyOrQuit(node->Get().GetRole() == ot::Mle::kRoleDisabled, "Node role not disabled"); + } + + Log("---------------------------------------------------------------------------------------"); + Log("Test SetNodeState RPC (Enable)"); + { + nexus::SetNodeStateRequest request; + request.set_node_id(leaderId); + request.set_enabled(true); + nexus::SetNodeStateResponse response; + grpc::ClientContext context; + grpc::Status status = stub->SetNodeState(&context, request, &response); + + VerifyOrQuit(status.ok(), "SetNodeState (Enable) failed"); + + { + std::lock_guard lock(sim); + nexus.AdvanceTime(1000); + } + + std::lock_guard lock(sim); + Node *node = nexus.FindNodeById(leaderId); + VerifyOrQuit(node != nullptr, "Leader node not found"); + VerifyOrQuit(node->Get().GetRole() != ot::Mle::kRoleDisabled, "Node role still disabled"); + } +} + +} // namespace Nexus +} // namespace ot + +int main(void) +{ + ot::Nexus::TestGrpc(); + ot::Nexus::Log("All tests passed"); + return 0; +}