[nexus] add test 9.2.19 and improve IPv6 verification (#12565)

This commit adds Nexus test case 9.2.19 which verifies that the DUT
can properly get Pending Operational Dataset parameters using the
MGMT_PENDING_GET.req command.

It also addresses review comments and improves the robustness of IPv6
address verification across several Nexus tests.

Implementation details:
- tests/nexus/test_9_2_19.cpp: Implemented the test procedure. Fixed
  incorrectly escaped newline characters.
- tests/nexus/test_9_2_3.cpp: Fixed incorrectly escaped newline
  characters.
- tests/nexus/verify_utils.py: Added a robust helper function
  'is_leader_aloc_or_rloc' using the 'ipaddress' module to identify
  Leader ALOC and RLOC addresses by their IID.
- tests/nexus/verify_9_2_19.py, tests/nexus/verify_9_2_3.py,
  tests/nexus/verify_9_2_5.py: Replaced fragile string-slicing and
  duplicated code with the shared helper in 'verify_utils.py'.
- tests/nexus/CMakeLists.txt: Added nexus_9_2_19 target.
- tests/nexus/run_nexus_tests.sh: Added 9_2_19 to the test list.

The tests have been verified to pass in the Nexus simulation
environment.
This commit is contained in:
Jonathan Hui
2026-02-26 01:08:53 -06:00
committed by GitHub
parent 4a275b365c
commit fd9fc518ea
8 changed files with 705 additions and 10 deletions
+1
View File
@@ -212,6 +212,7 @@ ot_nexus_test(9_2_15 "cert;nexus")
ot_nexus_test(9_2_16 "cert;nexus")
ot_nexus_test(9_2_17 "cert;nexus")
ot_nexus_test(9_2_18 "cert;nexus")
ot_nexus_test(9_2_19 "cert;nexus")
# Misc tests
ot_nexus_test(border_admitter "core;nexus")
+2 -1
View File
@@ -148,6 +148,7 @@ DEFAULT_TESTS=(
"9_2_16"
"9_2_17"
"9_2_18"
"9_2_19"
)
# Use provided arguments or the default test list
@@ -237,7 +238,7 @@ run_test()
expanded_tests=()
for t in "${TESTS_TO_RUN[@]}"; do
case "$t" in
6_1_1 | 6_1_2 | 6_1_3 | 6_1_6 | 6_2_1 | 6_2_2 | 6_3_1 | 6_3_2 | 6_4_1 | 6_4_2 | 6_5_1 | 6_5_2 | 6_5_3 | 6_6_1 | 6_6_2 | 9_2_1 | 9_2_3 | 9_2_4)
6_1_1 | 6_1_2 | 6_1_3 | 6_1_6 | 6_2_1 | 6_2_2 | 6_3_1 | 6_3_2 | 6_4_1 | 6_4_2 | 6_5_1 | 6_5_2 | 6_5_3 | 6_6_1 | 6_6_2 | 9_2_1 | 9_2_3 | 9_2_4 | 9_2_19)
expanded_tests+=("${t}_A" "${t}_B")
;;
*)
+400
View File
@@ -0,0 +1,400 @@
/*
* Copyright (c) 2026, The OpenThread Authors.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the copyright holder nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <string.h>
#include "meshcop/commissioner.hpp"
#include "meshcop/dataset_manager.hpp"
#include "platform/nexus_core.hpp"
#include "platform/nexus_node.hpp"
namespace ot {
namespace Nexus {
/**
* Time to advance for a node to form a network and become leader, in milliseconds.
*/
static constexpr uint32_t kFormNetworkTime = 13 * 1000;
/**
* Time to advance for a node to join a network, in milliseconds.
*/
static constexpr uint32_t kJoinTime = 10 * 1000;
/**
* Time to advance for a commissioner to become active, in milliseconds.
*/
static constexpr uint32_t kPetitionTime = 5 * 1000;
/**
* Time to wait for a response, in milliseconds.
*/
static constexpr uint32_t kResponseTime = 2000;
/**
* Delay timer value in milliseconds (1 minute).
*/
static constexpr uint32_t kDelayTimer = 60 * 1000;
/**
* Time to wait for pending data to become operational, in milliseconds.
*/
static constexpr uint32_t kWaitDelayTime = 120 * 1000;
/**
* New PAN ID value.
*/
static constexpr uint16_t kNewPanId = 0xAFCE;
/**
* Active Timestamp value.
*/
static constexpr uint64_t kActiveTimestamp = 60;
/**
* Pending Timestamp value.
*/
static constexpr uint64_t kPendingTimestamp = 30;
enum Topology
{
kTopologyA,
kTopologyB,
};
void RunTest9_2_19(Topology aTopology, const char *aJsonFile)
{
/**
* 9.2.19 Getting the Pending Operational Dataset
*
* 9.2.19.1 Topology
* - Topology A: DUT as Leader, Commissioner (Non-DUT)
* - Topology B: Leader (Non-DUT), DUT as Commissioner
*
* 9.2.19.2 Purpose & Description
* - DUT as Leader (Topology A): The purpose of this test case is to verify the DUTs behavior when receiving
* MGMT_PENDING_GET.req directly from the active Commissioner.
* - DUT as Commissioner (Topology B): The purpose of this test case is to verify that the DUT can read Pending
* Operational Dataset parameters direct from the Leader using the MGMT_PENDING_GET.req command.
*
* Spec Reference | V1.1 Section | V1.3.0 Section
* ------------------------------------------|--------------|---------------
* Updating the Pending Operational Dataset | 8.7.5 | 8.7.5
*/
Core nexus;
Node &leader = nexus.CreateNode();
Node &commissioner = nexus.CreateNode();
leader.SetName("LEADER");
commissioner.SetName("COMMISSIONER");
OT_UNUSED_VARIABLE(aTopology);
nexus.AdvanceTime(0);
Instance::SetLogLevel(kLogLevelNote);
/**
* Step 1: All
* - Description: Ensure topology is formed correctly.
* - Pass Criteria: N/A.
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 1: All");
leader.AllowList(commissioner);
commissioner.AllowList(leader);
leader.Form();
nexus.AdvanceTime(kFormNetworkTime);
VerifyOrQuit(leader.Get<Mle::Mle>().IsLeader());
commissioner.Join(leader);
nexus.AdvanceTime(kJoinTime);
VerifyOrQuit(commissioner.Get<Mle::Mle>().IsAttached());
SuccessOrQuit(commissioner.Get<MeshCoP::Commissioner>().Start(nullptr, nullptr, nullptr));
nexus.AdvanceTime(kPetitionTime);
VerifyOrQuit(commissioner.Get<MeshCoP::Commissioner>().IsActive());
// Wait for nodes to stabilize and potentially become routers
nexus.AdvanceTime(kFormNetworkTime);
/**
* Step 2: Topology B Commissioner DUT / Topology A Leader DUT
* - Description:
* - Topology B: User instructs DUT to send MGMT_PENDING_GET.req to Leader.
* - Topology A: Harness instructs Commissioner to send MGMT_PENDING_GET.req to DUT Anycast or Routing Locator:
* - CoAP Request URI: coap://[<L>]:MM/c/pg
* - CoAP Payload: <empty>
* - Pass Criteria:
* - Topology B: The MGMT_PENDING_GET.req frame MUST have the following format:
* - CoAP Request URI: coap://[<L>]:MM/c/pg
* - CoAP Payload: <empty> (get all Pending Operational Dataset parameters)
* - The Destination Address of MGMT_PENDING_GET.req frame MUST be Leaders Anycast or Routing Locator (ALOC or
* RLOC):
* - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
* - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
* Router ID
* - Topology A: N/A.
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 2: Topology B Commissioner DUT / Topology A Leader DUT");
SuccessOrQuit(commissioner.Get<MeshCoP::PendingDatasetManager>().SendGetRequest(MeshCoP::Dataset::Components(),
nullptr, 0, nullptr));
nexus.AdvanceTime(kResponseTime);
/**
* Step 3: Leader
* - Description: Automatically responds to MGMT_PENDING_GET.req with a MGMT_PENDING_GET.rsp to Commissioner.
* - Pass Criteria: For DUT = Leader: The MGMT_PENDING_GET.rsp frame MUST have the following format:
* - CoAP Response Code: 2.04 Changed
* - CoAP Payload: <empty> (no Pending Operational Dataset)
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 3: Leader");
/**
* Step 4: Topology B Commissioner DUT / Topology A Leader DUT
* - Description:
* - Topology B: User instructs DUT to send MGMT_PENDING_SET.req to Leader.
* - Topology A: Harness instructs Commissioner to send MGMT_PENDING_SET.req to DUTs Anycast or Routing Locator:
* - CoAP Request URI: coap://[<L>]:MM/c/ps
* - CoAP Payload: Active Timestamp TLV: 60s, Commissioner Session ID TLV (valid), Delay Timer TLV: 1 minute,
* PAN ID TLV: 0xAFCE (new value), Pending Timestamp TLV: 30s.
* - Pass Criteria:
* - Topology B: The MGMT_PENDING_SET.req frame MUST have the following format:
* - CoAP Request URI: coap://[<L>]:MM/c/ps
* - CoAP Payload: Active Timestamp TLV: 60s, Commissioner Session ID TLV (valid), Delay Timer TLV: 1 minute,
* Pending Timestamp TLV: 30s, PAN ID TLV: 0xAFCE (new value).
* - The Destination Address of MGMT_PENDING_SET.req frame MUST be the Leaders Anycast or Routing Locator (ALOC
* or RLOC):
* - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
* - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
* Router ID.
* - Topology A: N/A.
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 4: Topology B Commissioner DUT / Topology A Leader DUT");
{
MeshCoP::Dataset::Info dataset;
MeshCoP::Timestamp timestamp;
dataset.Clear();
timestamp.Clear();
timestamp.SetSeconds(kActiveTimestamp);
dataset.Set<MeshCoP::Dataset::kActiveTimestamp>(timestamp);
timestamp.Clear();
timestamp.SetSeconds(kPendingTimestamp);
dataset.Set<MeshCoP::Dataset::kPendingTimestamp>(timestamp);
dataset.Set<MeshCoP::Dataset::kDelay>(kDelayTimer);
dataset.Set<MeshCoP::Dataset::kPanId>(kNewPanId);
SuccessOrQuit(
commissioner.Get<MeshCoP::PendingDatasetManager>().SendSetRequest(dataset, nullptr, 0, nullptr, nullptr));
}
nexus.AdvanceTime(kResponseTime);
/**
* Step 5: Leader
* - Description: Automatically responds to MGMT_PENDING_SET.req with a MGMT_PENDING_SET.rsp to Commissioner.
* - Pass Criteria: For DUT = Leader: The MGMT_PENDING_SET.rsp frame MUST have the following format:
* - CoAP Response Code: 2.04 Changed
* - CoAP Payload: State TLV (value = Accept (01))
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 5: Leader");
/**
* Step 6: Topology B Commissioner DUT / Topology A Leader DUT
* - Description:
* - Topology B: User instructs DUT to send MGMT_PENDING_GET.req to Leader.
* - Topology A: Harness instructs Commissioner to send MGMT_PENDING_GET.req to DUTs Anycast or Routing Locator:
* - CoAP Request URI: coap://[<L>]:MM/c/pg
* - CoAP Payload: <empty>
* - Pass Criteria:
* - Topology B: The MGMT_PENDING_GET.req frame MUST have the following format:
* - CoAP Request URI: coap://[<L>]:MM/c/pg
* - CoAP Payload: <empty> (get all Pending Operational Dataset parameters)
* - The Destination Address of MGMT_PENDING_GET.req frame MUST be the Leaders Anycast or Routing Locator (ALOC
* or RLOC):
* - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
* - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
* Router ID
* - Topology A: N/A.
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 6: Topology B Commissioner DUT / Topology A Leader DUT");
SuccessOrQuit(commissioner.Get<MeshCoP::PendingDatasetManager>().SendGetRequest(MeshCoP::Dataset::Components(),
nullptr, 0, nullptr));
nexus.AdvanceTime(kResponseTime);
/**
* Step 7: Leader
* - Description: Automatically responds to MGMT_PENDING_GET.req with a MGMT_PENDING_GET.rsp to the Commissioner.
* - Pass Criteria: For DUT = Leader: The MGMT_PENDING_GET.rsp frame MUST have the following format:
* - CoAP Response Code: 2.04 Changed
* - CoAP Payload:
* - Active Timestamp TLV
* - Channel TLV
* - Channel Mask TLV
* - Delay Timer TLV
* - Extended PAN ID TLV
* - Mesh-Local Prefix TLV
* - Network Master Key TLV
* - Network Name TLV
* - PAN ID TLV
* - Pending Timestamp TLV
* - PSKc TLV
* - Security Policy TLV
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 7: Leader");
/**
* Step 8: Topology B Commissioner DUT / Topology A Leader DUT
* - Description:
* - Topology B: User instructs DUT to send MGMT_PENDING_GET.req to Leader.
* - Topology A: Harness instructs Commissioner to send MGMT_PENDING_GET.req to DUTs Anycast or Routing Locator:
* - CoAP Request URI: coap://[<L>]:MM/c/pg
* - CoAP Payload: Get TLV specifying: PAN ID TLV
* - Pass Criteria:
* - Topology B: The MGMT_PENDING_GET.req frame MUST have the following format:
* - CoAP Request URI: coap://[<L>]:MM/c/pg
* - CoAP Payload: Get TLV specifying: PAN ID TLV
* - The Destination Address of MGMT_PENDING_GET.req frame MUST be the Leaders Anycast or Routing Locator (ALOC
* or RLOC):
* - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
* - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
* Router ID
* - Topology A: N/A.
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 8: Topology B Commissioner DUT / Topology A Leader DUT");
{
uint8_t tlvTypes[] = {MeshCoP::Tlv::kPanId};
SuccessOrQuit(commissioner.Get<MeshCoP::PendingDatasetManager>().SendGetRequest(
MeshCoP::Dataset::Components(), tlvTypes, sizeof(tlvTypes), nullptr));
}
nexus.AdvanceTime(kResponseTime);
/**
* Step 9: Leader
* - Description: Automatically responds to MGMT_PENDING_GET.req with a MGMT_PENDING_GET.rsp to the Commissioner.
* - Pass Criteria: For DUT = Leader: The MGMT_PENDING_GET.rsp frame MUST have the following format:
* - CoAP Response Code: 2.04 Changed
* - CoAP Payload: Delay Timer TLV, PAN ID TLV
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 9: Leader");
/**
* Step 10: Harness
* - Description: Wait for 92 seconds to allow pending data to become operational.
* - Pass Criteria: N/A.
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 10: Harness");
nexus.AdvanceTime(kWaitDelayTime);
/**
* Step 11: Topology B Commissioner DUT / Topology A Leader DUT
* - Description:
* - Topology B: User instructs DUT to send MGMT_PENDING_GET.req to Leader.
* - Topology A: Harness instructs Commissioner to send MGMT_PENDING_GET.req to DUTs Anycast or Routing Locator:
* - CoAP Request URI: coap://[<L>]:MM/c/pg
* - CoAP Payload: <empty>
* - Pass Criteria:
* - Topology B: The MGMT_PENDING_GET.req frame MUST have the following format:
* - CoAP Request URI: coap://[<L>]:MM/c/pg
* - CoAP Payload: <empty> (get all Pending Operational Dataset parameters)
* - The Destination Address of MGMT_PENDING_GET.req frame MUST be the Leaders Anycast or Routing Locator (ALOC
* or RLOC):
* - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
* - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
* Router ID
* - Topology A: N/A.
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 11: Topology B Commissioner DUT / Topology A Leader DUT");
SuccessOrQuit(commissioner.Get<MeshCoP::PendingDatasetManager>().SendGetRequest(MeshCoP::Dataset::Components(),
nullptr, 0, nullptr));
nexus.AdvanceTime(kResponseTime);
/**
* Step 12: Leader
* - Description: Automatically responds to MGMT_PENDING_GET.req with a MGMT_PENDING_GET.rsp to the Commissioner.
* - Pass Criteria: For DUT = Leader: The MGMT_PENDING_GET.rsp frame MUST have the following format:
* - CoAP Response Code: 2.04 Changed
* - CoAP Payload: <empty> (no Pending Operational Dataset)
*/
Log("---------------------------------------------------------------------------------------");
Log("Step 12: Leader");
nexus.AdvanceTime(kResponseTime);
nexus.SaveTestInfo(aJsonFile);
}
} // namespace Nexus
} // namespace ot
int main(int argc, char *argv[])
{
if (argc < 2)
{
ot::Nexus::RunTest9_2_19(ot::Nexus::kTopologyA, "test_9_2_19_A.json");
ot::Nexus::RunTest9_2_19(ot::Nexus::kTopologyB, "test_9_2_19_B.json");
}
else if (strcmp(argv[1], "A") == 0)
{
ot::Nexus::RunTest9_2_19(ot::Nexus::kTopologyA, (argc > 2) ? argv[2] : "test_9_2_19_A.json");
}
else if (strcmp(argv[1], "B") == 0)
{
ot::Nexus::RunTest9_2_19(ot::Nexus::kTopologyB, (argc > 2) ? argv[2] : "test_9_2_19_B.json");
}
else
{
fprintf(stderr, "Error: Invalid topology '%s'. Must be 'A' or 'B'.\n", argv[1]);
return 1;
}
printf("All tests passed\n");
return 0;
}
+2 -2
View File
@@ -301,10 +301,10 @@ int main(int argc, char *argv[])
}
else
{
fprintf(stderr, "Error: Invalid topology '%s'. Must be 'A' or 'B'.\\n", argv[1]);
fprintf(stderr, "Error: Invalid topology '%s'. Must be 'A' or 'B'.\n", argv[1]);
return 1;
}
printf("All tests passed\\n");
printf("All tests passed\n");
return 0;
}
+281
View File
@@ -0,0 +1,281 @@
#!/usr/bin/env python3
#
# Copyright (c) 2026, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import sys
import os
# Add the current directory to sys.path to find verify_utils
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(CUR_DIR)
import verify_utils
from pktverify import consts
from pktverify.null_field import nullField
def verify(pv):
# 9.2.19 Getting the Pending Operational Dataset
#
# 9.2.19.1 Topology
# - Topology A: DUT as Leader, Commissioner (Non-DUT)
# - Topology B: Leader (Non-DUT), DUT as Commissioner
#
# 9.2.19.2 Purpose & Description
# - DUT as Leader (Topology A): The purpose of this test case is to verify the DUTs behavior when receiving
# MGMT_PENDING_GET.req directly from the active Commissioner.
# - DUT as Commissioner (Topology B): The purpose of this test case is to verify that the DUT can read Pending
# Operational Dataset parameters direct from the Leader using the MGMT_PENDING_GET.req command.
#
# Spec Reference | V1.1 Section | V1.3.0 Section
# ------------------------------------------|--------------|---------------
# Updating the Pending Operational Dataset | 8.7.5 | 8.7.5
pkts = pv.pkts
pv.summary.show()
IS_DUT_COMMISSIONER = pv.test_info.testcase == 'test_9_2_19_B'
# Step 1: All
# - Description: Ensure topology is formed correctly.
# - Pass Criteria: N/A.
print("Step 1: All")
# Step 2: Topology B Commissioner DUT / Topology A Leader DUT
# - Description:
# - Topology B: User instructs DUT to send MGMT_PENDING_GET.req to Leader.
# - Topology A: Harness instructs Commissioner to send MGMT_PENDING_GET.req to DUT Anycast or Routing Locator:
# - CoAP Request URI: coap://[<L>]:MM/c/pg
# - CoAP Payload: <empty>
# - Pass Criteria:
# - Topology B: The MGMT_PENDING_GET.req frame MUST have the following format:
# - CoAP Request URI: coap://[<L>]:MM/c/pg
# - CoAP Payload: <empty> (get all Pending Operational Dataset parameters)
# - The Destination Address of MGMT_PENDING_GET.req frame MUST be Leaders Anycast or Routing Locator (ALOC or
# RLOC):
# - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
# - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
# Router ID
# - Topology A: N/A.
print("Step 2: Topology B Commissioner DUT / Topology A Leader DUT")
_pkt = pkts.filter_coap_request(consts.MGMT_PENDING_GET_URI).\
filter(lambda p: p.coap.payload is nullField).\
must_next()
if IS_DUT_COMMISSIONER:
_pkt.must_verify(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst))
# Step 3: Leader
# - Description: Automatically responds to MGMT_PENDING_GET.req with a MGMT_PENDING_GET.rsp to Commissioner.
# - Pass Criteria: For DUT = Leader: The MGMT_PENDING_GET.rsp frame MUST have the following format:
# - CoAP Response Code: 2.04 Changed
# - CoAP Payload: <empty> (no Pending Operational Dataset)
print("Step 3: Leader")
pkts.filter_coap_ack(consts.MGMT_PENDING_GET_URI).\
filter(lambda p: p.coap.payload is nullField).\
must_next()
# Step 4: Topology B Commissioner DUT / Topology A Leader DUT
# - Description:
# - Topology B: User instructs DUT to send MGMT_PENDING_SET.req to Leader.
# - Topology A: Harness instructs Commissioner to send MGMT_PENDING_SET.req to DUTs Anycast or Routing Locator:
# - CoAP Request URI: coap://[<L>]:MM/c/ps
# - CoAP Payload: Active Timestamp TLV: 60s, Commissioner Session ID TLV (valid), Delay Timer TLV: 1 minute,
# PAN ID TLV: 0xAFCE (new value), Pending Timestamp TLV: 30s.
# - Pass Criteria:
# - Topology B: The MGMT_PENDING_SET.req frame MUST have the following format:
# - CoAP Request URI: coap://[<L>]:MM/c/ps
# - CoAP Payload: Active Timestamp TLV: 60s, Commissioner Session ID TLV (valid), Delay Timer TLV: 1 minute,
# Pending Timestamp TLV: 30s, PAN ID TLV: 0xAFCE (new value).
# - The Destination Address of MGMT_PENDING_SET.req frame MUST be the Leaders Anycast or Routing Locator (ALOC
# or RLOC):
# - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
# - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
# Router ID.
# - Topology A: N/A.
print("Step 4: Topology B Commissioner DUT / Topology A Leader DUT")
_pkt = pkts.filter_coap_request(consts.MGMT_PENDING_SET_URI).\
filter(lambda p: p.coap.tlv.active_timestamp == 60).\
filter(lambda p: p.coap.tlv.delay_timer == 60000).\
filter(lambda p: p.coap.tlv.pending_timestamp == 30).\
filter(lambda p: p.coap.tlv.pan_id == 0xafce).\
filter(lambda p: consts.NM_COMMISSIONER_SESSION_ID_TLV in p.coap.tlv.type).\
must_next()
if IS_DUT_COMMISSIONER:
_pkt.must_verify(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst))
# Step 5: Leader
# - Description: Automatically responds to MGMT_PENDING_SET.req with a MGMT_PENDING_SET.rsp to Commissioner.
# - Pass Criteria: For DUT = Leader: The MGMT_PENDING_SET.rsp frame MUST have the following format:
# - CoAP Response Code: 2.04 Changed
# - CoAP Payload: State TLV (value = Accept (01))
print("Step 5: Leader")
pkts.filter_coap_ack(consts.MGMT_PENDING_SET_URI).\
filter(lambda p: p.coap.tlv.state == 1).\
must_next()
# Step 6: Topology B Commissioner DUT / Topology A Leader DUT
# - Description:
# - Topology B: User instructs DUT to send MGMT_PENDING_GET.req to Leader.
# - Topology A: Harness instructs Commissioner to send MGMT_PENDING_GET.req to DUTs Anycast or Routing Locator:
# - CoAP Request URI: coap://[<L>]:MM/c/pg
# - CoAP Payload: <empty>
# - Pass Criteria:
# - Topology B: The MGMT_PENDING_GET.req frame MUST have the following format:
# - CoAP Request URI: coap://[<L>]:MM/c/pg
# - CoAP Payload: <empty> (get all Pending Operational Dataset parameters)
# - The Destination Address of MGMT_PENDING_GET.req frame MUST be the Leaders Anycast or Routing Locator (ALOC
# or RLOC):
# - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
# - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
# Router ID
# - Topology A: N/A.
print("Step 6: Topology B Commissioner DUT / Topology A Leader DUT")
_pkt = pkts.filter_coap_request(consts.MGMT_PENDING_GET_URI).\
filter(lambda p: p.coap.payload is nullField).\
must_next()
if IS_DUT_COMMISSIONER:
_pkt.must_verify(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst))
# Step 7: Leader
# - Description: Automatically responds to MGMT_PENDING_GET.req with a MGMT_PENDING_GET.rsp to the Commissioner.
# - Pass Criteria: For DUT = Leader: The MGMT_PENDING_GET.rsp frame MUST have the following format:
# - CoAP Response Code: 2.04 Changed
# - CoAP Payload:
# - Active Timestamp TLV
# - Channel TLV
# - Channel Mask TLV
# - Delay Timer TLV
# - Extended PAN ID TLV
# - Mesh-Local Prefix TLV
# - Network Master Key TLV
# - Network Name TLV
# - PAN ID TLV
# - Pending Timestamp TLV
# - PSKc TLV
# - Security Policy TLV
print("Step 7: Leader")
pkts.filter_coap_ack(consts.MGMT_PENDING_GET_URI).\
filter(lambda p: {
consts.NM_ACTIVE_TIMESTAMP_TLV,
consts.NM_CHANNEL_TLV,
consts.NM_CHANNEL_MASK_TLV,
consts.NM_DELAY_TIMER_TLV,
consts.NM_EXTENDED_PAN_ID_TLV,
consts.NM_NETWORK_MESH_LOCAL_PREFIX_TLV,
consts.NM_NETWORK_KEY_TLV,
consts.NM_NETWORK_NAME_TLV,
consts.NM_PAN_ID_TLV,
consts.NM_PENDING_TIMESTAMP_TLV,
consts.NM_PSKC_TLV,
consts.NM_SECURITY_POLICY_TLV
} <= set(p.coap.tlv.type)).\
must_next()
# Step 8: Topology B Commissioner DUT / Topology A Leader DUT
# - Description:
# - Topology B: User instructs DUT to send MGMT_PENDING_GET.req to Leader.
# - Topology A: Harness instructs Commissioner to send MGMT_PENDING_GET.req to DUTs Anycast or Routing Locator:
# - CoAP Request URI: coap://[<L>]:MM/c/pg
# - CoAP Payload: Get TLV specifying: PAN ID TLV
# - Pass Criteria:
# - Topology B: The MGMT_PENDING_GET.req frame MUST have the following format:
# - CoAP Request URI: coap://[<L>]:MM/c/pg
# - CoAP Payload: Get TLV specifying: PAN ID TLV
# - The Destination Address of MGMT_PENDING_GET.req frame MUST be the Leaders Anycast or Routing Locator (ALOC
# or RLOC):
# - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
# - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
# Router ID
# - Topology A: N/A.
print("Step 8: Topology B Commissioner DUT / Topology A Leader DUT")
_pkt = pkts.filter_coap_request(consts.MGMT_PENDING_GET_URI).\
filter(lambda p: consts.TLV_REQUEST_TLV in p.coap.tlv.type and\
p.coap.tlv.tlv_request == bytes([consts.NM_PAN_ID_TLV]).hex()).\
must_next()
if IS_DUT_COMMISSIONER:
_pkt.must_verify(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst))
# Step 9: Leader
# - Description: Automatically responds to MGMT_PENDING_GET.req with a MGMT_PENDING_GET.rsp to the Commissioner.
# - Pass Criteria: For DUT = Leader: The MGMT_PENDING_GET.rsp frame MUST have the following format:
# - CoAP Response Code: 2.04 Changed
# - CoAP Payload: Delay Timer TLV, PAN ID TLV
print("Step 9: Leader")
pkts.filter_coap_ack(consts.MGMT_PENDING_GET_URI).\
filter(lambda p: {
consts.NM_DELAY_TIMER_TLV,
consts.NM_PAN_ID_TLV
} <= set(p.coap.tlv.type)).\
must_next()
# Step 10: Harness
# - Description: Wait for 92 seconds to allow pending data to become operational.
# - Pass Criteria: N/A.
print("Step 10: Harness")
# Step 11: Topology B Commissioner DUT / Topology A Leader DUT
# - Description:
# - Topology B: User instructs DUT to send MGMT_PENDING_GET.req to Leader.
# - Topology A: Harness instructs Commissioner to send MGMT_PENDING_GET.req to DUTs Anycast or Routing Locator:
# - CoAP Request URI: coap://[<L>]:MM/c/pg
# - CoAP Payload: <empty>
# - Pass Criteria:
# - Topology B: The MGMT_PENDING_GET.req frame MUST have the following format:
# - CoAP Request URI: coap://[<L>]:MM/c/pg
# - CoAP Payload: <empty> (get all Pending Operational Dataset parameters)
# - The Destination Address of MGMT_PENDING_GET.req frame MUST be the Leaders Anycast or Routing Locator (ALOC
# or RLOC):
# - ALOC: Mesh Local prefix with an IID of 0000:00FF:FE00:FC00
# - RLOC: Mesh Local prefix with and IID of 0000:00FF:FE00:xxxx where xxxx is a 16-bit value that embeds the
# Router ID
# - Topology A: N/A.
print("Step 11: Topology B Commissioner DUT / Topology A Leader DUT")
_pkt = pkts.filter_coap_request(consts.MGMT_PENDING_GET_URI).\
filter(lambda p: p.coap.payload is nullField).\
must_next()
if IS_DUT_COMMISSIONER:
_pkt.must_verify(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst))
# Step 12: Leader
# - Description: Automatically responds to MGMT_PENDING_GET.req with a MGMT_PENDING_GET.rsp to the Commissioner.
# - Pass Criteria: For DUT = Leader: The MGMT_PENDING_GET.rsp frame MUST have the following format:
# - CoAP Response Code: 2.04 Changed
# - CoAP Payload: <empty> (no Pending Operational Dataset)
print("Step 12: Leader")
pkts.filter_coap_ack(consts.MGMT_PENDING_GET_URI).\
filter(lambda p: p.coap.payload is nullField).\
must_next()
if __name__ == '__main__':
verify_utils.run_main(verify)
+3 -3
View File
@@ -107,7 +107,7 @@ def verify(pv):
pkts.filter_wpan_src64(COMMISSIONER).\
filter_coap_request(consts.MGMT_ACTIVE_GET_URI).\
filter(lambda p: p.coap.payload is nullField or len(p.coap.payload) == 0).\
filter(lambda p: p.ipv6.dst[8:14] == Bytes("000000fffe00")).\
filter(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst)).\
must_next()
# Step 3: Leader
@@ -159,7 +159,7 @@ def verify(pv):
consts.NM_NETWORK_MESH_LOCAL_PREFIX_TLV,
consts.NM_NETWORK_NAME_TLV
}).\
filter(lambda p: p.ipv6.dst[8:14] == Bytes("000000fffe00")).\
filter(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst)).\
must_next()
# Step 5: Leader
@@ -207,7 +207,7 @@ def verify(pv):
consts.NM_SCAN_DURATION,
consts.NM_ENERGY_LIST_TLV
}).\
filter(lambda p: p.ipv6.dst[8:14] == Bytes("000000fffe00")).\
filter(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst)).\
must_next()
# Step 7: Leader
+4 -4
View File
@@ -99,7 +99,7 @@ def verify(pv):
} <= set(p.coap.tlv.type)).\
filter(lambda p: p.coap.tlv.active_timestamp == 100 and\
p.coap.tlv.network_name == 'TEST_1').\
filter(lambda p: p.ipv6.dst.endswith(bytes.fromhex("000000fffe00fc00"))).\
filter(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst)).\
must_next()
# Step 3: Leader (DUT)
@@ -219,7 +219,7 @@ def verify(pv):
} <= set(p.coap.tlv.type)).\
filter(lambda p: p.coap.tlv.active_timestamp == 99 and\
p.coap.tlv.network_name == 'TEST_2').\
filter(lambda p: p.ipv6.dst.endswith(bytes.fromhex("000000fffe00fc00"))).\
filter(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst)).\
must_next()
# Step 8: Leader (DUT)
@@ -275,7 +275,7 @@ def verify(pv):
filter(lambda p: p.coap.tlv.active_timestamp == 101 and\
p.coap.tlv.network_name == 'TEST_3').\
filter(lambda p: 130 in p.coap.tlv.type).\
filter(lambda p: p.ipv6.dst.endswith(bytes.fromhex("000000fffe00fc00"))).\
filter(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst)).\
must_next()
# Step 10: Leader (DUT)
@@ -389,7 +389,7 @@ def verify(pv):
} <= set(p.coap.tlv.type)).\
filter(lambda p: p.coap.tlv.active_timestamp == 102 and\
p.coap.tlv.channel == 63).\
filter(lambda p: p.ipv6.dst.endswith(bytes.fromhex("000000fffe00fc00"))).\
filter(lambda p: verify_utils.is_leader_aloc_or_rloc(p.ipv6.dst)).\
must_next()
# Step 15: Leader (DUT)
+12
View File
@@ -33,6 +33,7 @@ import json
import traceback
import struct
import logging
import ipaddress
# Add the thread-cert directory to sys.path to find pktverify
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -188,6 +189,17 @@ def thread_coap_tlv_parse(t, v, layer=None):
return kvs
def is_leader_aloc_or_rloc(addr_str: str) -> bool:
"""Checks if an IPv6 address is a Leader ALOC or an RLOC."""
addr = ipaddress.ip_address(addr_str)
iid = addr.packed[8:]
# Leader ALOC IID is 0000:00ff:fe00:fc00
is_aloc = (iid == b'\x00\x00\x00\xff\xfe\x00\xfc\x00')
# RLOC IID is 0000:00ff:fe00:xxxx
is_rloc = (iid[:6] == b'\x00\x00\x00\xff\xfe\x00')
return is_aloc or is_rloc
def apply_patches():
CoapTlvParser.parse = staticmethod(thread_coap_tlv_parse)