[style] apply google python style guide (#4501)

This commit applies and enforces Google's python style for tests.
This commit is contained in:
Yakun Xu
2020-02-05 02:27:51 +08:00
committed by GitHub
parent 8368d440dd
commit 33808ebfba
501 changed files with 6674 additions and 5515 deletions
+2 -2
View File
@@ -31,12 +31,12 @@ set -e -x -o pipefail
setup_python()
{
python -m pip install flake8
python3 -m pip install yapf
}
check_python()
{
flake8 --config=script/pystyle.cfg tests tools
python3 -m yapf --style google -dpr tests tools
}
check_clang()
+1 -1
View File
@@ -185,4 +185,4 @@
## Format and Style
- All code should adhere to [PEP 8](https://www.python.org/dev/peps/pep-0008/).
- All code should adhere to [Google Python Style Guide](http://google.github.io/styleguide/pyguide.html).
+7
View File
@@ -56,6 +56,10 @@ install_packages_apt()
# add clang-format for pretty
sudo apt-get -y install clang-format-6.0
# add yapf for pretty
python3 -m pip install yapf || echo 'Failed to install python code formatter yapf. Install it manually if you need.'
}
install_packages_opkg()
@@ -89,6 +93,9 @@ install_packages_brew()
brew install llvm@6
sudo ln -s "$(brew --prefix llvm@6)/bin/clang-format" /usr/local/bin/clang-format-6.0
}
# add yapf for pretty
python3 -m pip install yapf || echo 'Failed to install python code formatter yapf. Install it manually if you need.'
}
install_packages_source()
+1
View File
@@ -53,6 +53,7 @@ main()
{
trap at_exit INT TERM EXIT
make_pretty
python3 -m yapf --style google -ipr "${SRC_DIR}/tests" "${SRC_DIR}/tools"
}
main "$@"
-16
View File
@@ -1,16 +0,0 @@
[flake8]
# it's not a bug that we aren't using all of hacking, ignore:
# E203: White space before ':' (see #python#black#315).
# W503: Line break occurred before a binary operator (see #python#black#README.md).
ignore = E203, W503
# relax the length limitation for now.
max-line-length = 119
max-doc-length = 119
# Print the source code generating the error/warning in question.
show_source = True
show-source = True
# print the total number of errors.
count = True
@@ -39,6 +39,7 @@ ROUTER = 2
class Cert_5_1_01_RouterAttach(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -110,9 +111,7 @@ class Cert_5_1_01_RouterAttach(unittest.TestCase):
msg.assertMleMessageContainsTlv(mle.Version)
# 4 - Router
msg = router_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST
)
msg = router_messages.next_mle_message(mle.CommandType.CHILD_ID_REQUEST)
msg.assertSentToNode(self.nodes[LEADER])
msg.assertMleMessageContainsTlv(mle.Response)
msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
@@ -125,8 +124,7 @@ class Cert_5_1_01_RouterAttach(unittest.TestCase):
# 5 - Leader
msg = leader_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE)
msg.assertSentToNode(self.nodes[ROUTER])
msg.assertMleMessageContainsTlv(mle.SourceAddress)
msg.assertMleMessageContainsTlv(mle.LeaderData)
@@ -162,8 +160,7 @@ class Cert_5_1_01_RouterAttach(unittest.TestCase):
# 9 - Leader
msg = leader_messages.next_mle_message(
mle.CommandType.LINK_ACCEPT_AND_REQUEST
)
mle.CommandType.LINK_ACCEPT_AND_REQUEST)
msg.assertMleMessageContainsTlv(mle.SourceAddress)
msg.assertMleMessageContainsTlv(mle.LeaderData)
msg.assertMleMessageContainsTlv(mle.Response)
@@ -42,6 +42,7 @@ MTDS = [ED, SED]
class Cert_5_1_02_ChildAddressTimeout(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -40,6 +40,7 @@ ROUTER2 = 3
class Cert_5_1_03_RouterAddressReallocation(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -124,8 +125,7 @@ class Cert_5_1_03_RouterAddressReallocation(unittest.TestCase):
# Leader or Router1 can be parent of Router2
if leader_messages.contains_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
):
mle.CommandType.CHILD_ID_RESPONSE):
leader_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
msg = router2_messages.next_coap_message("0.02")
@@ -134,11 +134,8 @@ class Cert_5_1_03_RouterAddressReallocation(unittest.TestCase):
msg = leader_messages.next_coap_message("2.04")
elif router1_messages.contains_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
):
router1_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE):
router1_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
msg = router2_messages.next_coap_message("0.02")
msg.assertCoapMessageRequestUriPath("/a/as")
@@ -154,8 +151,7 @@ class Cert_5_1_03_RouterAddressReallocation(unittest.TestCase):
# Router1 make two attempts to reconnect to its current Partition.
for _ in range(4):
msg = router1_messages.next_mle_message(
mle.CommandType.PARENT_REQUEST
)
mle.CommandType.PARENT_REQUEST)
msg.assertSentWithHopLimit(255)
msg.assertSentToDestinationAddress("ff02::2")
msg.assertMleMessageContainsTlv(mle.Mode)
@@ -182,8 +178,7 @@ class Cert_5_1_03_RouterAddressReallocation(unittest.TestCase):
# 7 - Router1
msg = router1_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST
)
mle.CommandType.CHILD_ID_REQUEST)
msg.assertSentToNode(self.nodes[ROUTER2])
msg.assertMleMessageContainsTlv(mle.Response)
msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
@@ -40,6 +40,7 @@ ROUTER2 = 3
class Cert_5_1_04_RouterAddressReallocation(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -123,8 +124,7 @@ class Cert_5_1_04_RouterAddressReallocation(unittest.TestCase):
# Leader or Router1 can be parent of Router2
if leader_messages.contains_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
):
mle.CommandType.CHILD_ID_RESPONSE):
leader_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
msg = router2_messages.next_coap_message("0.02")
@@ -133,11 +133,8 @@ class Cert_5_1_04_RouterAddressReallocation(unittest.TestCase):
msg = leader_messages.next_coap_message("2.04")
elif router1_messages.contains_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
):
router1_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE):
router1_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
msg = router2_messages.next_coap_message("0.02")
msg.assertCoapMessageRequestUriPath("/a/as")
@@ -153,8 +150,7 @@ class Cert_5_1_04_RouterAddressReallocation(unittest.TestCase):
# Router1 make two attempts to reconnect to its current Partition.
for _ in range(4):
msg = router1_messages.next_mle_message(
mle.CommandType.PARENT_REQUEST
)
mle.CommandType.PARENT_REQUEST)
msg.assertSentWithHopLimit(255)
msg.assertSentToDestinationAddress("ff02::2")
msg.assertMleMessageContainsTlv(mle.Mode)
@@ -184,9 +180,7 @@ class Cert_5_1_04_RouterAddressReallocation(unittest.TestCase):
router2_messages.next_mle_message(mle.CommandType.CHILD_ID_REQUEST)
# 9 - Router1
msg = router1_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router1_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[ROUTER2])
msg.assertMleMessageContainsTlv(mle.SourceAddress)
msg.assertMleMessageContainsTlv(mle.LeaderData)
@@ -199,8 +193,7 @@ class Cert_5_1_04_RouterAddressReallocation(unittest.TestCase):
msg.assertMleMessageContainsTlv(mle.Version)
msg = router1_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE)
msg.assertSentToNode(self.nodes[ROUTER2])
msg.assertMleMessageContainsTlv(mle.SourceAddress)
msg.assertMleMessageContainsTlv(mle.LeaderData)
@@ -39,6 +39,7 @@ ROUTER1 = 2
class Cert_5_1_05_RouterAddressTimeout(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -40,6 +40,7 @@ ROUTER1 = 2
class Cert_5_1_06_RemoveRouterId(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -105,8 +106,7 @@ class Cert_5_1_06_RemoveRouterId(unittest.TestCase):
command.check_parent_request(msg, is_first_request=True)
msg = router1_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST, sent_to_node=self.nodes[LEADER]
)
mle.CommandType.CHILD_ID_REQUEST, sent_to_node=self.nodes[LEADER])
command.check_child_id_request(
msg,
tlv_request=CheckType.CONTAIN,
@@ -38,6 +38,7 @@ SED1 = 7
class Cert_5_1_07_MaxChildCount(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ ROUTER4 = 5
class Cert_5_1_08_RouterAttachConnectivity(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -135,20 +136,15 @@ class Cert_5_1_08_RouterAttachConnectivity(unittest.TestCase):
self.assertEqual(0, scan_mask_tlv.end_device)
# 3 - Router2, Router3
msg = router2_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router2_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[ROUTER4])
msg = router3_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router3_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[ROUTER4])
# 4 - Router4
msg = router4_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST
)
mle.CommandType.CHILD_ID_REQUEST)
msg.assertSentToNode(self.nodes[ROUTER3])
msg.assertMleMessageContainsTlv(mle.Response)
msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
@@ -41,6 +41,7 @@ ROUTER2 = 5
class Cert_5_1_09_REEDAttachConnectivity(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -161,8 +162,7 @@ class Cert_5_1_09_REEDAttachConnectivity(unittest.TestCase):
# 6 - Router2
msg = router2_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST
)
mle.CommandType.CHILD_ID_REQUEST)
msg.assertSentToNode(self.nodes[REED1])
msg.assertMleMessageContainsTlv(mle.Response)
msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
@@ -40,6 +40,7 @@ ROUTER3 = 4
class Cert_5_1_10_RouterAttachLinkQuality(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -63,9 +64,8 @@ class Cert_5_1_10_RouterAttachLinkQuality(unittest.TestCase):
self.nodes[ROUTER2].set_panid(0xface)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER2].add_whitelist(
self.nodes[ROUTER3].get_addr64(), rssi=-85
)
self.nodes[ROUTER2].add_whitelist(self.nodes[ROUTER3].get_addr64(),
rssi=-85)
self.nodes[ROUTER2].enable_whitelist()
self.nodes[ROUTER2].set_router_selection_jitter(1)
@@ -142,20 +142,15 @@ class Cert_5_1_10_RouterAttachLinkQuality(unittest.TestCase):
msg.assertMleMessageContainsTlv(mle.Version)
# 4 - Router1, Router2
msg = router1_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router1_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[ROUTER3])
msg = router2_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router2_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[ROUTER3])
# 5 - Router3
msg = router3_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST
)
mle.CommandType.CHILD_ID_REQUEST)
msg.assertSentToNode(self.nodes[ROUTER1])
msg.assertMleMessageContainsTlv(mle.Response)
msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
@@ -40,6 +40,7 @@ ROUTER1 = 4
class Cert_5_1_11_REEDAttachLinkQuality(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -63,9 +64,8 @@ class Cert_5_1_11_REEDAttachLinkQuality(unittest.TestCase):
self.nodes[ROUTER2].set_panid(0xface)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER2].add_whitelist(
self.nodes[ROUTER1].get_addr64(), rssi=-85
)
self.nodes[ROUTER2].add_whitelist(self.nodes[ROUTER1].get_addr64(),
rssi=-85)
self.nodes[ROUTER2].enable_whitelist()
self.nodes[ROUTER2].set_router_selection_jitter(1)
@@ -141,9 +141,7 @@ class Cert_5_1_11_REEDAttachLinkQuality(unittest.TestCase):
self.assertEqual(0, scan_mask_tlv.end_device)
# 4 - Router2
msg = router2_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router2_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[ROUTER1])
# 5 - Router1
@@ -161,8 +159,7 @@ class Cert_5_1_11_REEDAttachLinkQuality(unittest.TestCase):
# 6 - Router1
msg = router1_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST
)
mle.CommandType.CHILD_ID_REQUEST)
msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
msg.assertMleMessageContainsTlv(mle.Mode)
msg.assertMleMessageContainsTlv(mle.Response)
@@ -39,6 +39,7 @@ ROUTER2 = 3
class Cert_5_1_12_NewRouterSync(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -70,21 +71,17 @@ class Cert_5_1_12_NewRouterSync(unittest.TestCase):
n.destroy()
self.simulator.stop()
def verify_step_4(
self, router1_messages, router2_messages, req_receiver, accept_receiver
):
def verify_step_4(self, router1_messages, router2_messages, req_receiver,
accept_receiver):
if router2_messages.contains_mle_message(
mle.CommandType.LINK_REQUEST
) and (
router1_messages.contains_mle_message(mle.CommandType.LINK_ACCEPT)
or router1_messages.contains_mle_message(
mle.CommandType.LINK_ACCEPT_AND_REQUEST
)
):
mle.CommandType.LINK_REQUEST) and (
router1_messages.contains_mle_message(
mle.CommandType.LINK_ACCEPT) or
router1_messages.contains_mle_message(
mle.CommandType.LINK_ACCEPT_AND_REQUEST)):
msg = router2_messages.next_mle_message(
mle.CommandType.LINK_REQUEST
)
mle.CommandType.LINK_REQUEST)
msg.assertSentToNode(self.nodes[req_receiver])
msg.assertMleMessageContainsTlv(mle.SourceAddress)
@@ -154,13 +151,10 @@ class Cert_5_1_12_NewRouterSync(unittest.TestCase):
# 4 - Router1, Router2
self.assertTrue(
self.verify_step_4(
router1_messages, router2_messages, ROUTER1, ROUTER2
)
or self.verify_step_4(
router2_messages, router1_messages, ROUTER2, ROUTER1
)
)
self.verify_step_4(router1_messages, router2_messages, ROUTER1,
ROUTER2) or
self.verify_step_4(router2_messages, router1_messages, ROUTER2,
ROUTER1))
if __name__ == '__main__':
@@ -38,6 +38,7 @@ ROUTER = 2
class Cert_5_1_13_RouterReset(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -34,7 +34,6 @@ import mle
import config
import command
LEADER = 1
DUT_ROUTER1 = 2
REED1 = 3
@@ -42,6 +41,7 @@ MED1 = 4
class Cert_5_2_01_REEDAttach(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -102,16 +102,13 @@ class Cert_5_2_01_REEDAttach(unittest.TestCase):
# 3 DUT_ROUTER1: Verify MLE Parent Response
router1_messages = self.simulator.get_messages_sent_by(DUT_ROUTER1)
msg = router1_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router1_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[REED1])
command.check_parent_response(msg)
# 4 DUT_ROUTER1: Verify MLE Child ID Response
msg = router1_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE)
msg.assertSentToNode(self.nodes[REED1])
command.check_child_id_response(msg)
@@ -126,8 +123,7 @@ class Cert_5_2_01_REEDAttach(unittest.TestCase):
reed1_messages = self.simulator.get_messages_sent_by(REED1)
msg = reed1_messages.next_coap_message('0.02')
reed1_ipv6_address = (
msg.ipv6_packet.ipv6_header.source_address.compressed
)
msg.ipv6_packet.ipv6_header.source_address.compressed)
msg.assertSentToNode(self.nodes[DUT_ROUTER1])
msg.assertCoapMessageRequestUriPath('/a/as')
@@ -41,14 +41,13 @@ ROUTER_32 = 33
class Cert_5_2_3_LeaderReject2Hops(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
self.nodes = {}
self.nodes[DUT_LEADER] = node.Node(
DUT_LEADER, simulator=self.simulator
)
self.nodes[DUT_LEADER] = node.Node(DUT_LEADER, simulator=self.simulator)
self.nodes[DUT_LEADER].set_panid(0xface)
self.nodes[DUT_LEADER].set_mode('rsdn')
self.nodes[DUT_LEADER].enable_whitelist()
@@ -126,9 +125,8 @@ class Cert_5_2_3_LeaderReject2Hops(unittest.TestCase):
msg.assertCoapMessageContainsTlv(network_layer.Status)
status_tlv = msg.get_coap_message_tlv(network_layer.Status)
self.assertEqual(
network_layer.StatusValues.NO_ADDRESS_AVAILABLE, status_tlv.status
)
self.assertEqual(network_layer.StatusValues.NO_ADDRESS_AVAILABLE,
status_tlv.status)
if __name__ == '__main__':
@@ -46,6 +46,7 @@ ROUTER_SELECTION_JITTER = 1
class Cert_5_2_4_REEDUpgrade(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -115,9 +116,8 @@ class Cert_5_2_4_REEDUpgrade(unittest.TestCase):
msg.assertMleMessageDoesNotContainTlv(mle.Route64)
# 4 Wait for DUT_REED to send the second packet.
self.simulator.go(
REED_ADVERTISEMENT_INTERVAL + REED_ADVERTISEMENT_MAX_JITTER
)
self.simulator.go(REED_ADVERTISEMENT_INTERVAL +
REED_ADVERTISEMENT_MAX_JITTER)
# 5 DUT_REED: Verify the second MLE Advertisement.
reed_messages = self.simulator.get_messages_sent_by(DUT_REED)
@@ -166,10 +166,8 @@ class Cert_5_2_4_REEDUpgrade(unittest.TestCase):
# Leader.
mleid = None
for addr in self.nodes[LEADER].get_addrs():
if (
addr.find(MESH_LOCAL_PREFIX) != -1
and addr.find(ROUTING_LACATOR) == -1
):
if (addr.find(MESH_LOCAL_PREFIX) != -1 and
addr.find(ROUTING_LACATOR) == -1):
mleid = addr
break
@@ -42,6 +42,7 @@ ROUTER_SELECTION_JITTER = 1
class Cert_5_2_5_AddressQuery(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -126,9 +127,7 @@ class Cert_5_2_5_AddressQuery(unittest.TestCase):
# 6. Verify DUT_REED would send Address Notification when ping to its
# ML-EID.
mleid = self.nodes[DUT_REED].get_ip6_address(
config.ADDRESS_TYPE.ML_EID
)
mleid = self.nodes[DUT_REED].get_ip6_address(config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[ED1].ping(mleid))
# Wait for sniffer collecting packets
@@ -136,17 +135,15 @@ class Cert_5_2_5_AddressQuery(unittest.TestCase):
reed_messages = self.simulator.get_messages_sent_by(DUT_REED)
msg = reed_messages.next_coap_message('0.02', '/a/an')
command.check_address_notification(
msg, self.nodes[DUT_REED], self.nodes[LEADER]
)
command.check_address_notification(msg, self.nodes[DUT_REED],
self.nodes[LEADER])
# 7 & 8. Verify DUT_REED would send Address Notification when ping to
# its 2001::EID and 2002::EID.
flag2001 = 0
flag2002 = 0
for global_address in self.nodes[DUT_REED].get_ip6_address(
config.ADDRESS_TYPE.GLOBAL
):
config.ADDRESS_TYPE.GLOBAL):
if global_address[0:4] == '2001':
flag2001 += 1
elif global_address[0:4] == '2002':
@@ -160,9 +157,8 @@ class Cert_5_2_5_AddressQuery(unittest.TestCase):
reed_messages = self.simulator.get_messages_sent_by(DUT_REED)
msg = reed_messages.next_coap_message('0.02', '/a/an')
command.check_address_notification(
msg, self.nodes[DUT_REED], self.nodes[LEADER]
)
command.check_address_notification(msg, self.nodes[DUT_REED],
self.nodes[LEADER])
assert flag2001 == 1, "Error: Expecting address 2001::EID not appear."
assert flag2002 == 1, "Error: Expecting address 2002::EID not appear."
@@ -41,6 +41,7 @@ ROUTER24 = 24
class Cert_5_2_06_RouterDowngrade(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -99,8 +100,7 @@ class Cert_5_2_06_RouterDowngrade(unittest.TestCase):
# 4 & 5
router1_rloc = self.nodes[DUT_ROUTER1].get_ip6_address(
config.ADDRESS_TYPE.RLOC
)
config.ADDRESS_TYPE.RLOC)
self.assertTrue(self.nodes[LEADER].ping(router1_rloc))
@@ -43,6 +43,7 @@ MLE_MIN_LINKS = 3
class Cert_5_2_7_REEDSynchronization(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -74,9 +75,8 @@ class Cert_5_2_7_REEDSynchronization(unittest.TestCase):
# 2. DUT_REED: Attach to network. Verify it didn't send an Address Solicit Request.
# Avoid DUT_REED attach to DUT_ROUTER1.
self.nodes[DUT_REED].add_whitelist(
self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_1']
)
self.nodes[DUT_REED].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64(),
config.RSSI['LINK_QULITY_1'])
self.nodes[DUT_REED].start()
self.simulator.go(config.MAX_ADVERTISEMENT_INTERVAL)
@@ -85,9 +85,8 @@ class Cert_5_2_7_REEDSynchronization(unittest.TestCase):
# The DUT_REED must not send a coap message here.
reed_messages = self.simulator.get_messages_sent_by(DUT_REED)
msg = reed_messages.does_not_contain_coap_message()
assert (
msg is True
), "Error: The DUT_REED sent an Address Solicit Request"
assert (msg is
True), "Error: The DUT_REED sent an Address Solicit Request"
# 3. DUT_REED: Verify sent a Link Request to at least 3 neighboring
# Routers.
@@ -104,29 +103,24 @@ class Cert_5_2_7_REEDSynchronization(unittest.TestCase):
link_accept_count = 0
destination_link_local = self.nodes[DUT_REED].get_ip6_address(
config.ADDRESS_TYPE.LINK_LOCAL
)
config.ADDRESS_TYPE.LINK_LOCAL)
for i in range(1, DUT_REED):
dut_messages = self.simulator.get_messages_sent_by(i)
while True:
msg = dut_messages.next_mle_message(
mle.CommandType.LINK_ACCEPT, False
)
msg = dut_messages.next_mle_message(mle.CommandType.LINK_ACCEPT,
False)
if msg is None:
break
if (
ipv6.ip_address(destination_link_local)
== msg.ipv6_packet.ipv6_header.destination_address
):
if (ipv6.ip_address(destination_link_local) ==
msg.ipv6_packet.ipv6_header.destination_address):
command.check_link_accept(msg, self.nodes[DUT_REED])
link_accept_count += 1
break
assert (
link_accept_count >= MLE_MIN_LINKS
) is True, "Error: too few Link Accept sent to DUT_REED"
assert (link_accept_count >= MLE_MIN_LINKS
) is True, "Error: too few Link Accept sent to DUT_REED"
if __name__ == '__main__':
@@ -37,6 +37,7 @@ DUT_ROUTER1 = 2
class Cert_5_3_1_LinkLocal(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -69,8 +70,7 @@ class Cert_5_3_1_LinkLocal(unittest.TestCase):
# 2 & 3
link_local = self.nodes[DUT_ROUTER1].get_ip6_address(
config.ADDRESS_TYPE.LINK_LOCAL
)
config.ADDRESS_TYPE.LINK_LOCAL)
self.assertTrue(self.nodes[LEADER].ping(link_local, size=256))
self.assertTrue(self.nodes[LEADER].ping(link_local))
@@ -83,11 +83,8 @@ class Cert_5_3_1_LinkLocal(unittest.TestCase):
self.assertTrue(self.nodes[LEADER].ping('ff02::2'))
# 8
self.assertTrue(
self.nodes[LEADER].ping(
config.LINK_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS
)
)
self.assertTrue(self.nodes[LEADER].ping(
config.LINK_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS))
if __name__ == '__main__':
@@ -39,6 +39,7 @@ SED1 = 4
class Cert_5_3_2_RealmLocal(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -97,15 +98,14 @@ class Cert_5_3_2_RealmLocal(unittest.TestCase):
# 2 & 3
mleid = self.nodes[DUT_ROUTER2].get_ip6_address(
config.ADDRESS_TYPE.ML_EID
)
config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[LEADER].ping(mleid, size=256))
self.assertTrue(self.nodes[LEADER].ping(mleid))
# 4 & 5
self.assertTrue(
self.nodes[LEADER].ping('ff03::1', num_responses=2, size=256)
)
self.assertTrue(self.nodes[LEADER].ping('ff03::1',
num_responses=2,
size=256))
sed_messages = self.simulator.get_messages_sent_by(SED1)
self.assertFalse(sed_messages.contains_icmp_message())
@@ -114,9 +114,9 @@ class Cert_5_3_2_RealmLocal(unittest.TestCase):
self.assertFalse(sed_messages.contains_icmp_message())
# 6 & 7
self.assertTrue(
self.nodes[LEADER].ping('ff03::2', num_responses=2, size=256)
)
self.assertTrue(self.nodes[LEADER].ping('ff03::2',
num_responses=2,
size=256))
sed_messages = self.simulator.get_messages_sent_by(SED1)
self.assertFalse(sed_messages.contains_icmp_message())
@@ -125,23 +125,19 @@ class Cert_5_3_2_RealmLocal(unittest.TestCase):
self.assertFalse(sed_messages.contains_icmp_message())
# 8
self.assertTrue(
self.nodes[LEADER].ping(
config.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS,
num_responses=3,
size=256,
)
)
self.assertTrue(self.nodes[LEADER].ping(
config.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS,
num_responses=3,
size=256,
))
self.simulator.go(2)
sed_messages = self.simulator.get_messages_sent_by(SED1)
self.assertTrue(sed_messages.contains_icmp_message())
self.assertTrue(
self.nodes[LEADER].ping(
config.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS,
num_responses=3,
)
)
self.assertTrue(self.nodes[LEADER].ping(
config.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS,
num_responses=3,
))
self.simulator.go(2)
sed_messages = self.simulator.get_messages_sent_by(SED1)
self.assertTrue(sed_messages.contains_icmp_message())
@@ -42,6 +42,7 @@ MED1_TIMEOUT = 3
class Cert_5_3_3_AddressQuery(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -112,8 +113,7 @@ class Cert_5_3_3_AddressQuery(unittest.TestCase):
dut_messages = self.simulator.get_messages_sent_by(DUT_ROUTER2)
router3_mleid = self.nodes[ROUTER3].get_ip6_address(
config.ADDRESS_TYPE.ML_EID
)
config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[MED1].ping(router3_mleid))
# Verify DUT_ROUTER2 sent an Address Query Request to the Realm local
@@ -136,16 +136,14 @@ class Cert_5_3_3_AddressQuery(unittest.TestCase):
dut_messages = self.simulator.get_messages_sent_by(DUT_ROUTER2)
med1_mleid = self.nodes[MED1].get_ip6_address(
config.ADDRESS_TYPE.ML_EID
)
config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[ROUTER1].ping(med1_mleid))
# Verify DUT_ROUTER2 responded with an Address Notification.
dut_messages = self.simulator.get_messages_sent_by(DUT_ROUTER2)
msg = dut_messages.next_coap_message('0.02', '/a/an')
command.check_address_notification(
msg, self.nodes[DUT_ROUTER2], self.nodes[ROUTER1]
)
command.check_address_notification(msg, self.nodes[DUT_ROUTER2],
self.nodes[ROUTER1])
# 4
# Wait the finish of address resolution traffic triggerred by previous
@@ -45,6 +45,7 @@ MTDS = [SED1, ED1, ED2, ED3, ED4]
class Cert_5_3_4_AddressMapCache(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -112,8 +113,7 @@ class Cert_5_3_4_AddressMapCache(unittest.TestCase):
# 2
for ED in [ED1, ED2, ED3, ED4]:
ed_mleid = self.nodes[ED].get_ip6_address(
config.ADDRESS_TYPE.ML_EID
)
config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[SED1].ping(ed_mleid))
self.simulator.go(5)
@@ -134,17 +134,15 @@ class Cert_5_3_4_AddressMapCache(unittest.TestCase):
for ED in [ED1, ED2, ED3, ED4]:
ed_mleid = self.nodes[ED].get_ip6_address(
config.ADDRESS_TYPE.ML_EID
)
config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[SED1].ping(ed_mleid))
self.simulator.go(5)
# Verify DUT_ROUTER1 didn't generate an Address Query Request.
dut_messages = self.simulator.get_messages_sent_by(DUT_ROUTER1)
msg = dut_messages.next_coap_message('0.02', '/a/aq', False)
assert (
msg is None
), "Error: The DUT sent an unexpected Address Query Request"
assert (msg is None
), "Error: The DUT sent an unexpected Address Query Request"
if __name__ == '__main__':
@@ -40,6 +40,7 @@ ROUTER3 = 4
class Cert_5_3_5_RoutingLinkQuality(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -95,8 +96,7 @@ class Cert_5_3_5_RoutingLinkQuality(unittest.TestCase):
# 2 & 3
leader_rloc = self.nodes[LEADER].get_ip6_address(
config.ADDRESS_TYPE.RLOC
)
config.ADDRESS_TYPE.RLOC)
# Verify the ICMPv6 Echo Request took the least cost path.
self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
@@ -104,12 +104,10 @@ class Cert_5_3_5_RoutingLinkQuality(unittest.TestCase):
command.check_icmp_path(self.simulator, path, self.nodes)
# 4 & 5
self.nodes[LEADER].add_whitelist(
self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_1']
)
self.nodes[DUT_ROUTER1].add_whitelist(
self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_1']
)
self.nodes[LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64(),
config.RSSI['LINK_QULITY_1'])
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(),
config.RSSI['LINK_QULITY_1'])
self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL)
# Verify the ICMPv6 Echo Request took the longer path because it cost
@@ -119,12 +117,10 @@ class Cert_5_3_5_RoutingLinkQuality(unittest.TestCase):
command.check_icmp_path(self.simulator, path, self.nodes)
# 6 & 7
self.nodes[LEADER].add_whitelist(
self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_2']
)
self.nodes[DUT_ROUTER1].add_whitelist(
self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_2']
)
self.nodes[LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64(),
config.RSSI['LINK_QULITY_2'])
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(),
config.RSSI['LINK_QULITY_2'])
self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL)
# Verify the direct neighbor would be prioritized when there are two
@@ -134,18 +130,15 @@ class Cert_5_3_5_RoutingLinkQuality(unittest.TestCase):
command.check_icmp_path(self.simulator, path, self.nodes)
# 8 & 9
self.nodes[LEADER].add_whitelist(
self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_0']
)
self.nodes[DUT_ROUTER1].add_whitelist(
self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_0']
)
self.nodes[LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64(),
config.RSSI['LINK_QULITY_0'])
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(),
config.RSSI['LINK_QULITY_0'])
self.simulator.go(3 * config.MAX_ADVERTISEMENT_INTERVAL)
# Verify the ICMPv6 Echo Request took the longer path.
leader_rloc = self.nodes[LEADER].get_ip6_address(
config.ADDRESS_TYPE.RLOC
)
config.ADDRESS_TYPE.RLOC)
self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
path = [ROUTER3, DUT_ROUTER1, ROUTER2, LEADER]
command.check_icmp_path(self.simulator, path, self.nodes)
@@ -40,6 +40,7 @@ ROUTER2 = 3
class Cert_5_3_6_RouterIdMask(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -111,8 +112,7 @@ class Cert_5_3_6_RouterIdMask(unittest.TestCase):
leader_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = leader_messages.last_mle_message(
mle.CommandType.ADVERTISEMENT, False
)
mle.CommandType.ADVERTISEMENT, False)
if msg is None:
continue
@@ -123,9 +123,8 @@ class Cert_5_3_6_RouterIdMask(unittest.TestCase):
break
self.assertTrue(routing_cost == 0)
self.simulator.go(
config.INFINITE_COST_TIMEOUT + config.MAX_ADVERTISEMENT_INTERVAL
)
self.simulator.go(config.INFINITE_COST_TIMEOUT +
config.MAX_ADVERTISEMENT_INTERVAL)
leader_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = leader_messages.last_mle_message(mle.CommandType.ADVERTISEMENT)
self.assertFalse(command.check_id_set(msg, router2_id))
@@ -150,16 +149,14 @@ class Cert_5_3_6_RouterIdMask(unittest.TestCase):
router1_id = self.nodes[ROUTER1].get_router_id()
router2_id = self.nodes[ROUTER2].get_router_id()
self.simulator.go(
config.MAX_NEIGHBOR_AGE + config.MAX_ADVERTISEMENT_INTERVAL
)
self.simulator.go(config.MAX_NEIGHBOR_AGE +
config.MAX_ADVERTISEMENT_INTERVAL)
leader_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = leader_messages.last_mle_message(mle.CommandType.ADVERTISEMENT)
self.assertEqual(command.get_routing_cost(msg, router1_id), 0)
self.simulator.go(
config.INFINITE_COST_TIMEOUT + config.MAX_ADVERTISEMENT_INTERVAL
)
self.simulator.go(config.INFINITE_COST_TIMEOUT +
config.MAX_ADVERTISEMENT_INTERVAL)
leader_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = leader_messages.last_mle_message(mle.CommandType.ADVERTISEMENT)
self.assertFalse(command.check_id_set(msg, router1_id))
@@ -38,6 +38,7 @@ ROUTER2 = 3
class Cert_5_3_6_RouterIdMask(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -45,6 +45,7 @@ MTDS = [MED1, SED1, MED3]
class Cert_5_3_7_DuplicateAddress(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -139,9 +140,8 @@ class Cert_5_3_7_DuplicateAddress(unittest.TestCase):
# address.
dut_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = dut_messages.next_coap_message('0.02', '/a/aq')
command.check_address_query(
msg, self.nodes[DUT_LEADER], config.REALM_LOCAL_ALL_ROUTERS_ADDRESS
)
command.check_address_query(msg, self.nodes[DUT_LEADER],
config.REALM_LOCAL_ALL_ROUTERS_ADDRESS)
# 5 & 6
# Verify DUT_LEADER sent an Address Error Notification to the Realm
@@ -150,8 +150,7 @@ class Cert_5_3_7_DuplicateAddress(unittest.TestCase):
dut_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = dut_messages.next_coap_message('0.02', '/a/ae')
command.check_address_error_notification(
msg, self.nodes[DUT_LEADER], config.REALM_LOCAL_ALL_ROUTERS_ADDRESS
)
msg, self.nodes[DUT_LEADER], config.REALM_LOCAL_ALL_ROUTERS_ADDRESS)
if __name__ == '__main__':
@@ -44,6 +44,7 @@ MTDS = [MED1, MED2]
class Cert_5_3_8_ChildAddressSet(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -109,8 +110,7 @@ class Cert_5_3_8_ChildAddressSet(unittest.TestCase):
# 4 MED1: MED1 send an ICMPv6 Echo Request to the MED2 ML-EID
med2_ml_eid = self.nodes[MED2].get_ip6_address(
config.ADDRESS_TYPE.ML_EID
)
config.ADDRESS_TYPE.ML_EID)
self.assertTrue(med2_ml_eid is not None)
self.assertTrue(self.nodes[MED1].ping(med2_ml_eid))
@@ -127,9 +127,8 @@ class Cert_5_3_8_ChildAddressSet(unittest.TestCase):
# Verify MED2 sent an ICMPv6 Echo Reply
med2_messages = self.simulator.get_messages_sent_by(MED2)
msg = med2_messages.get_icmp_message(ipv6.ICMP_ECHO_RESPONSE)
assert (
msg is not None
), "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
assert (msg is not None
), "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
# 5 MED1: MED1 send an ICMPv6 Echo Request to the MED2 2001::GUA
addr = self.nodes[MED2].get_addr("2001::/64")
@@ -149,9 +148,8 @@ class Cert_5_3_8_ChildAddressSet(unittest.TestCase):
# Verify MED2 sent an ICMPv6 Echo Reply
med2_messages = self.simulator.get_messages_sent_by(MED2)
msg = med2_messages.get_icmp_message(ipv6.ICMP_ECHO_RESPONSE)
assert (
msg is not None
), "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
assert (msg is not None
), "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
# 6 MED1: MED1 send an ICMPv6 Echo Request to the MED2 2002::GUA
addr = self.nodes[MED2].get_addr("2002::/64")
@@ -171,9 +169,8 @@ class Cert_5_3_8_ChildAddressSet(unittest.TestCase):
# Verify MED2 sent an ICMPv6 Echo Reply
med2_messages = self.simulator.get_messages_sent_by(MED2)
msg = med2_messages.get_icmp_message(ipv6.ICMP_ECHO_RESPONSE)
assert (
msg is not None
), "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
assert (msg is not None
), "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
# 7 MED1: MED1 send an ICMPv6 Echo Request to the MED2 2003::GUA
addr = self.nodes[MED2].get_addr("2003::/64")
@@ -193,9 +190,8 @@ class Cert_5_3_8_ChildAddressSet(unittest.TestCase):
# Verify MED2 sent an ICMPv6 Echo Reply
med2_messages = self.simulator.get_messages_sent_by(MED2)
msg = med2_messages.get_icmp_message(ipv6.ICMP_ECHO_RESPONSE)
assert (
msg is not None
), "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
assert (msg is not None
), "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
if __name__ == '__main__':
@@ -43,6 +43,7 @@ SED1 = 5
class Cert_5_3_09_AddressQuery(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -130,8 +131,7 @@ class Cert_5_3_09_AddressQuery(unittest.TestCase):
dut_router2_messages = self.simulator.get_messages_sent_by(DUT_ROUTER2)
msg = dut_router2_messages.next_coap_message('0.02', '/a/aq')
msg.assertSentToDestinationAddress(
config.REALM_LOCAL_ALL_ROUTERS_ADDRESS
)
config.REALM_LOCAL_ALL_ROUTERS_ADDRESS)
command.check_address_query(
msg,
self.nodes[DUT_ROUTER2],
@@ -157,9 +157,8 @@ class Cert_5_3_09_AddressQuery(unittest.TestCase):
# Verify DUT_ROUTER2 sent an Address Notification message
dut_router2_messages = self.simulator.get_messages_sent_by(DUT_ROUTER2)
msg = dut_router2_messages.next_coap_message('0.02', '/a/an')
command.check_address_notification(
msg, self.nodes[DUT_ROUTER2], self.nodes[ROUTER1]
)
command.check_address_notification(msg, self.nodes[DUT_ROUTER2],
self.nodes[ROUTER1])
# 5 SED1: SED1 sends an ICMPv6 Echo Request to the ROUTER3 using GUA
# 2001:: address
@@ -178,8 +177,7 @@ class Cert_5_3_09_AddressQuery(unittest.TestCase):
# Verify DUT_ROUTER2 forwarded the ICMPv6 Echo Reply to SED1
msg = dut_router2_messages_temp.get_icmp_message(
ipv6.ICMP_ECHO_RESPONSE
)
ipv6.ICMP_ECHO_RESPONSE)
assert (
msg is not None
), "Error: The DUT_ROUTER2 didn't forward ICMPv6 Echo Reply to SED1"
@@ -197,8 +195,7 @@ class Cert_5_3_09_AddressQuery(unittest.TestCase):
dut_router2_messages = self.simulator.get_messages_sent_by(DUT_ROUTER2)
msg = dut_router2_messages.next_coap_message('0.02', '/a/aq')
msg.assertSentToDestinationAddress(
config.REALM_LOCAL_ALL_ROUTERS_ADDRESS
)
config.REALM_LOCAL_ALL_ROUTERS_ADDRESS)
# 7 SED1: Power off SED1 and wait to allow DUT_ROUTER2 to timeout the
# child
@@ -43,6 +43,7 @@ MED1 = 5
class Cert_5_3_10_AddressQuery(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -158,9 +159,8 @@ class Cert_5_3_10_AddressQuery(unittest.TestCase):
# Verify DUT_ROUTER2 sent an Address Notification message
dut_router2_messages = self.simulator.get_messages_sent_by(DUT_ROUTER2)
msg = dut_router2_messages.next_coap_message('0.02', '/a/an')
command.check_address_notification(
msg, self.nodes[DUT_ROUTER2], self.nodes[BR]
)
command.check_address_notification(msg, self.nodes[DUT_ROUTER2],
self.nodes[BR])
# 5 MED1: MED1 sends an ICMPv6 Echo Request to ROUTER1 using GUA 2003::
# address
@@ -181,8 +181,7 @@ class Cert_5_3_10_AddressQuery(unittest.TestCase):
# Verify DUT_ROUTER2 forwarded ICMPv6 Echo Reply to MED1
msg = dut_router2_messages_temp.get_icmp_message(
ipv6.ICMP_ECHO_RESPONSE
)
ipv6.ICMP_ECHO_RESPONSE)
assert (
msg is not None
), "Error: The DUT_ROUTER2 didn't forward ICMPv6 Echo Reply to MED1"
@@ -199,16 +198,14 @@ class Cert_5_3_10_AddressQuery(unittest.TestCase):
# Verify the DUT_ROUTER2 has removed all entries based on ROUTER1's
# Router ID
command.check_router_id_cached(
self.nodes[DUT_ROUTER2], router1_id, False
)
command.check_router_id_cached(self.nodes[DUT_ROUTER2], router1_id,
False)
# Verify DUT_ROUTER2 sent an Address Query Request
dut_router2_messages = self.simulator.get_messages_sent_by(DUT_ROUTER2)
msg = dut_router2_messages.next_coap_message('0.02', '/a/aq')
msg.assertSentToDestinationAddress(
config.REALM_LOCAL_ALL_ROUTERS_ADDRESS
)
config.REALM_LOCAL_ALL_ROUTERS_ADDRESS)
# 7 MED1: Power off MED1 and wait to allow DUT_ROUTER2 to timeout the
# child
@@ -39,6 +39,7 @@ MED1 = 3
class Cert_5_3_11_AddressQueryTimeoutIntervals(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -40,6 +40,7 @@ DUT_ROUTER1 = 2
class Cert_5_5_1_LeaderReboot(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -54,15 +55,13 @@ class Cert_5_5_1_LeaderReboot(unittest.TestCase):
self.nodes[DUT_ROUTER1].set_panid(0xface)
self.nodes[DUT_ROUTER1].set_mode('rsdn')
self.nodes[DUT_ROUTER1].add_whitelist(
self.nodes[DUT_LEADER].get_addr64()
)
self.nodes[DUT_LEADER].get_addr64())
self.nodes[DUT_ROUTER1].enable_whitelist()
self.nodes[DUT_ROUTER1].set_router_selection_jitter(1)
def _setUpLeader(self):
self.nodes[DUT_LEADER].add_whitelist(
self.nodes[DUT_ROUTER1].get_addr64()
)
self.nodes[DUT_ROUTER1].get_addr64())
self.nodes[DUT_LEADER].enable_whitelist()
def tearDown(self):
@@ -93,13 +92,11 @@ class Cert_5_5_1_LeaderReboot(unittest.TestCase):
# Send a harness helper ping to the DUT
router1_rloc = self.nodes[DUT_ROUTER1].get_ip6_address(
config.ADDRESS_TYPE.RLOC
)
config.ADDRESS_TYPE.RLOC)
self.assertTrue(self.nodes[DUT_LEADER].ping(router1_rloc))
leader_rloc = self.nodes[DUT_LEADER].get_ip6_address(
config.ADDRESS_TYPE.RLOC
)
config.ADDRESS_TYPE.RLOC)
self.assertTrue(self.nodes[DUT_ROUTER1].ping(leader_rloc))
# 3 DUT_LEADER: Reset DUT_LEADER
@@ -116,9 +113,8 @@ class Cert_5_5_1_LeaderReboot(unittest.TestCase):
# Verify DUT_LEADER didn't send MLE Advertisement messages
leader_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = leader_messages.next_mle_message(
mle.CommandType.ADVERTISEMENT, False
)
msg = leader_messages.next_mle_message(mle.CommandType.ADVERTISEMENT,
False)
self.assertTrue(msg is None)
self.nodes[DUT_LEADER].start()
@@ -153,8 +149,7 @@ class Cert_5_5_1_LeaderReboot(unittest.TestCase):
)
else:
msg = router1_messages_temp.next_mle_message(
mle.CommandType.LINK_ACCEPT_AND_REQUEST
)
mle.CommandType.LINK_ACCEPT_AND_REQUEST)
self.assertTrue(msg is not None)
command.check_link_accept(
msg,
@@ -167,18 +162,14 @@ class Cert_5_5_1_LeaderReboot(unittest.TestCase):
# 6 DUT_LEADER: Verify DUT_LEADER didn't send a Parent Request message
msg = leader_messages_temp.next_mle_message(
mle.CommandType.PARENT_REQUEST, False
)
mle.CommandType.PARENT_REQUEST, False)
self.assertTrue(msg is None)
# 7 ALL: Verify connectivity by sending an ICMPv6 Echo Request from
# DUT_LEADER to DUT_ROUTER1 link local address
router1_link_local_address = self.nodes[DUT_ROUTER1].get_ip6_address(
config.ADDRESS_TYPE.LINK_LOCAL
)
self.assertTrue(
self.nodes[DUT_LEADER].ping(router1_link_local_address)
)
config.ADDRESS_TYPE.LINK_LOCAL)
self.assertTrue(self.nodes[DUT_LEADER].ping(router1_link_local_address))
if __name__ == '__main__':
@@ -38,6 +38,7 @@ ED = 3
class Cert_5_5_2_LeaderReboot(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -43,6 +43,7 @@ MTDS = [ED1, ED2, ED3]
class Cert_5_5_3_SplitMergeChildren(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -40,6 +40,7 @@ ROUTER4 = 5
class Cert_5_5_4_SplitMergeRouters(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ REED1 = 17
class Cert_5_5_5_SplitMergeREED(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -39,6 +39,7 @@ ROUTER3 = 4
class Cert_5_5_7_SplitMergeThreeWay(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -40,6 +40,7 @@ ED1 = 5
class Cert_5_5_8_SplitRoutersLostLeader(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ MTDS = [ED1, SED1]
class Cert_5_6_1_NetworkDataLeaderAsBr(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ MTDS = [ED1, SED1]
class Cert_5_6_2_NetworkDataRouterAsBr(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ MTDS = [ED1, SED1]
class Cert_5_6_3_NetworkDataRegisterAfterAttachLeader(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ MTDS = [ED1, SED1]
class Cert_5_6_4_NetworkDataRegisterAfterAttachRouter(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ MTDS = [ED1, SED1]
class Cert_5_6_5_NetworkDataRegisterAfterAttachRouter(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ MTDS = [ED1, SED1]
class Cert_5_6_6_NetworkDataExpiration(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -38,6 +38,7 @@ REED = 3
class Cert_5_6_7_NetworkDataRequestREED(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -38,6 +38,7 @@ ED = 3
class Cert_5_6_8_ContextManagement(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -42,6 +42,7 @@ MTDS = [ED, SED]
class Cert_5_6_9_NetworkDataForwarding(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -37,6 +37,7 @@ ED = 2
class Cert_5_8_1_KeySynchronization(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -37,6 +37,7 @@ ROUTER = 2
class Cert_5_8_2_KeyIncrement(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -37,6 +37,7 @@ ROUTER = 2
class Cert_5_8_3_KeyIncrementRollOver(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -38,6 +38,7 @@ ED = 2
class Cert_6_1_1_RouterAttach(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -109,8 +110,7 @@ class Cert_6_1_1_RouterAttach(unittest.TestCase):
# 5 - leader
msg = leader_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE)
msg.assertSentToNode(self.nodes[ED])
# 6 - leader
@@ -43,6 +43,7 @@ MED = 3
class Cert_6_1_2_REEDAttach_MED(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -99,9 +100,8 @@ class Cert_6_1_2_REEDAttach_MED(unittest.TestCase):
check_parent_request(msg, is_first_request=False)
# Step 6 - DUT sends Child ID Request
msg = med_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST, sent_to_node=self.nodes[REED]
)
msg = med_messages.next_mle_message(mle.CommandType.CHILD_ID_REQUEST,
sent_to_node=self.nodes[REED])
check_child_id_request(
msg,
address_registration=CheckType.CONTAIN,
@@ -117,8 +117,7 @@ class Cert_6_1_2_REEDAttach_MED(unittest.TestCase):
# Step 8 - DUT sends Child Update messages
msg = med_messages.next_mle_message(
mle.CommandType.CHILD_UPDATE_REQUEST
)
mle.CommandType.CHILD_UPDATE_REQUEST)
check_child_update_request_from_child(
msg,
source_address=CheckType.CONTAIN,
@@ -44,6 +44,7 @@ SED = 3
class Cert_6_1_2_REEDAttach_SED(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -100,9 +101,8 @@ class Cert_6_1_2_REEDAttach_SED(unittest.TestCase):
check_parent_request(msg, is_first_request=False)
# Step 6 - DUT sends Child ID Request
msg = sed_messages.next_mle_message(
mle.CommandType.CHILD_ID_REQUEST, sent_to_node=self.nodes[REED]
)
msg = sed_messages.next_mle_message(mle.CommandType.CHILD_ID_REQUEST,
sent_to_node=self.nodes[REED])
check_child_id_request(
msg,
address_registration=CheckType.CONTAIN,
@@ -119,9 +119,8 @@ class Cert_6_1_2_REEDAttach_SED(unittest.TestCase):
# Step 11 - SED sends periodic 802.15.4 Data Request messages
msg = sed_messages.next_message()
self.assertEqual(
False, msg.isMacAddressTypeLong()
) # Extra check, keep-alive messages are of short types of mac address
self.assertEqual(False, msg.isMacAddressTypeLong(
)) # Extra check, keep-alive messages are of short types of mac address
self.assertEqual(msg.type, message.MessageType.COMMAND)
self.assertEqual(
msg.mac_header.command_type,
@@ -40,6 +40,7 @@ ED = 5
class Cert_6_1_3_RouterAttachConnectivity(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -40,6 +40,7 @@ ED = 5
class Cert_6_1_4_REEDAttachConnectivity(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -39,6 +39,7 @@ ED = 4
class Cert_6_1_5_RouterAttachLinkQuality(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -62,9 +63,7 @@ class Cert_6_1_5_RouterAttachLinkQuality(unittest.TestCase):
self.nodes[ROUTER2].set_panid(0xface)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER2].add_whitelist(
self.nodes[ED].get_addr64(), rssi=-85
)
self.nodes[ROUTER2].add_whitelist(self.nodes[ED].get_addr64(), rssi=-85)
self.nodes[ROUTER2].enable_whitelist()
self.nodes[ROUTER2].set_router_selection_jitter(1)
@@ -40,6 +40,7 @@ ED = 4
class Cert_6_1_6_REEDAttachLinkQuality_ED(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -63,9 +64,7 @@ class Cert_6_1_6_REEDAttachLinkQuality_ED(unittest.TestCase):
self.nodes[ROUTER2].set_panid(0xface)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER2].add_whitelist(
self.nodes[ED].get_addr64(), rssi=-85
)
self.nodes[ROUTER2].add_whitelist(self.nodes[ED].get_addr64(), rssi=-85)
self.nodes[ROUTER2].enable_whitelist()
self.nodes[ROUTER2].set_router_selection_jitter(1)
@@ -140,9 +139,7 @@ class Cert_6_1_6_REEDAttachLinkQuality_ED(unittest.TestCase):
self.assertEqual(0, scan_mask_tlv.end_device)
# 4 - Router2
msg = router2_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router2_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[ED])
# 5 - ED
@@ -159,9 +156,7 @@ class Cert_6_1_6_REEDAttachLinkQuality_ED(unittest.TestCase):
self.assertEqual(1, scan_mask_tlv.end_device)
# 6 - REED
msg = router2_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router2_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[ED])
msg = reed_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
@@ -40,6 +40,7 @@ SED = 4
class Cert_6_1_6_REEDAttachLinkQuality_SED(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -63,9 +64,8 @@ class Cert_6_1_6_REEDAttachLinkQuality_SED(unittest.TestCase):
self.nodes[ROUTER2].set_panid(0xface)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER2].add_whitelist(
self.nodes[SED].get_addr64(), rssi=-85
)
self.nodes[ROUTER2].add_whitelist(self.nodes[SED].get_addr64(),
rssi=-85)
self.nodes[ROUTER2].enable_whitelist()
self.nodes[ROUTER2].set_router_selection_jitter(1)
@@ -141,9 +141,7 @@ class Cert_6_1_6_REEDAttachLinkQuality_SED(unittest.TestCase):
self.assertEqual(0, scan_mask_tlv.end_device)
# 4 - Router2
msg = router2_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router2_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[SED])
# 5 - SED
@@ -160,9 +158,7 @@ class Cert_6_1_6_REEDAttachLinkQuality_SED(unittest.TestCase):
self.assertEqual(1, scan_mask_tlv.end_device)
# 6 - REED
msg = router2_messages.next_mle_message(
mle.CommandType.PARENT_RESPONSE
)
msg = router2_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
msg.assertSentToNode(self.nodes[SED])
msg = reed_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
@@ -40,6 +40,7 @@ ROUTER3 = 5
class Cert_6_1_7_EDSynchronization(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -38,6 +38,7 @@ ED = 3
class Cert_6_2_1_NewPartition(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -39,6 +39,7 @@ ED = 4
class Cert_6_2_2_NewPartition(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -38,6 +38,7 @@ ED = 3
class Cert_6_3_1_OrphanReattach(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -37,6 +37,7 @@ ED = 2
class Cert_6_3_2_NetworkDataUpdate(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -37,6 +37,7 @@ ED = 2
class Cert_6_4_1_LinkLocal(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -38,6 +38,7 @@ ED = 3
class Cert_5_3_2_RealmLocal(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -87,21 +88,15 @@ class Cert_5_3_2_RealmLocal(unittest.TestCase):
self.assertTrue(self.nodes[LEADER].ping(addr, size=256))
self.assertTrue(self.nodes[LEADER].ping(addr))
self.assertTrue(
self.nodes[LEADER].ping('ff03::1', num_responses=2, size=256)
)
self.assertTrue(self.nodes[LEADER].ping('ff03::1',
num_responses=2,
size=256))
self.assertTrue(self.nodes[LEADER].ping('ff03::1', num_responses=2))
self.assertTrue(
self.nodes[LEADER].ping(
'ff33:0040:fdde:ad00:beef:0:0:1', num_responses=2, size=256
)
)
self.assertTrue(
self.nodes[LEADER].ping(
'ff33:0040:fdde:ad00:beef:0:0:1', num_responses=2
)
)
self.assertTrue(self.nodes[LEADER].ping(
'ff33:0040:fdde:ad00:beef:0:0:1', num_responses=2, size=256))
self.assertTrue(self.nodes[LEADER].ping(
'ff33:0040:fdde:ad00:beef:0:0:1', num_responses=2))
if __name__ == '__main__':
@@ -37,6 +37,7 @@ ED = 2
class Cert_6_5_1_ChildResetSynchronize(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -37,6 +37,7 @@ ED = 2
class Cert_6_5_2_ChildResetReattach(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -37,6 +37,7 @@ ED = 2
class Cert_6_6_1_KeyIncrement(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -37,6 +37,7 @@ ED = 2
class Cert_6_6_2_KeyIncrement1(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -51,6 +51,7 @@ MTDS = [SED1, MED1]
class Cert_7_1_1_BorderRouterAsLeader(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -138,68 +139,52 @@ class Cert_7_1_1_BorderRouterAsLeader(unittest.TestCase):
msg = leader_messages.next_mle_message(mle.CommandType.DATA_RESPONSE)
check_data_response(
msg,
network_data_check=NetworkDataCheck(
prefixes_check=PrefixesCheck(
prefix_check_list=[
SinglePrefixCheck(prefix=b'2001000200000001'),
SinglePrefixCheck(prefix=b'2001000200000002'),
]
)
),
network_data_check=NetworkDataCheck(prefixes_check=PrefixesCheck(
prefix_check_list=[
SinglePrefixCheck(prefix=b'2001000200000001'),
SinglePrefixCheck(prefix=b'2001000200000002'),
])),
)
# Step 4 - DUT sends a MLE Child ID Response to Router1
msg = leader_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE)
check_child_id_response(
msg,
network_data_check=NetworkDataCheck(
prefixes_check=PrefixesCheck(prefix_cnt=2)
),
network_data_check=NetworkDataCheck(prefixes_check=PrefixesCheck(
prefix_cnt=2)),
)
# Step 6 - DUT sends a MLE Child ID Response to SED1
msg = leader_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE)
check_child_id_response(
msg,
network_data_check=NetworkDataCheck(
prefixes_check=PrefixesCheck(
prefix_check_list=[
SinglePrefixCheck(border_router_16=0xfffe)
]
)
),
network_data_check=NetworkDataCheck(prefixes_check=PrefixesCheck(
prefix_check_list=[SinglePrefixCheck(
border_router_16=0xfffe)])),
)
# For Step 10
msg_chd_upd_res_to_sed = leader_messages.next_mle_message(
mle.CommandType.CHILD_UPDATE_RESPONSE
)
mle.CommandType.CHILD_UPDATE_RESPONSE)
# Step 8 - DUT sends a MLE Child ID Response to MED1
msg = leader_messages.next_mle_message(
mle.CommandType.CHILD_ID_RESPONSE
)
mle.CommandType.CHILD_ID_RESPONSE)
check_child_id_response(
msg,
network_data_check=NetworkDataCheck(
prefixes_check=PrefixesCheck(prefix_cnt=2)
),
network_data_check=NetworkDataCheck(prefixes_check=PrefixesCheck(
prefix_cnt=2)),
)
# Step 10 - DUT sends Child Update Response
msg_chd_upd_res_to_med = leader_messages.next_mle_message(
mle.CommandType.CHILD_UPDATE_RESPONSE
)
mle.CommandType.CHILD_UPDATE_RESPONSE)
msg = med1_messages.next_mle_message(
mle.CommandType.CHILD_UPDATE_REQUEST
)
mle.CommandType.CHILD_UPDATE_REQUEST)
check_child_update_request_from_child(
msg, address_registration=CheckType.CONTAIN, CIDs=[0, 1, 2]
)
msg, address_registration=CheckType.CONTAIN, CIDs=[0, 1, 2])
check_child_update_response(
msg_chd_upd_res_to_med,
@@ -208,11 +193,9 @@ class Cert_7_1_1_BorderRouterAsLeader(unittest.TestCase):
)
msg = sed1_messages.next_mle_message(
mle.CommandType.CHILD_UPDATE_REQUEST
)
mle.CommandType.CHILD_UPDATE_REQUEST)
check_child_update_request_from_child(
msg, address_registration=CheckType.CONTAIN, CIDs=[0, 1]
)
msg, address_registration=CheckType.CONTAIN, CIDs=[0, 1])
check_child_update_response(
msg_chd_upd_res_to_sed,
address_registration=CheckType.CONTAIN,
@@ -41,6 +41,7 @@ MTDS = [ED2, SED2]
class Cert_7_1_2_BorderRouterAsRouter(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -51,6 +51,7 @@ MTDS = [SED1, MED1]
class Cert_7_1_3_BorderRouterAsLeader(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -143,24 +144,19 @@ class Cert_7_1_3_BorderRouterAsLeader(unittest.TestCase):
msg = leader_messages.next_mle_message(mle.CommandType.DATA_RESPONSE)
check_data_response(
msg,
network_data_check=NetworkDataCheck(
prefixes_check=PrefixesCheck(
prefix_check_list=[
SinglePrefixCheck(b'2001000200000001'),
SinglePrefixCheck(b'2001000200000002'),
]
)
),
network_data_check=NetworkDataCheck(prefixes_check=PrefixesCheck(
prefix_check_list=[
SinglePrefixCheck(b'2001000200000001'),
SinglePrefixCheck(b'2001000200000002'),
])),
)
# 4 - N/A
# Get addresses registered by MED1
msg = med1_messages.next_mle_message(
mle.CommandType.CHILD_UPDATE_REQUEST
)
mle.CommandType.CHILD_UPDATE_REQUEST)
check_child_update_request_from_child(
msg, address_registration=CheckType.CONTAIN, CIDs=[0, 1, 2]
)
msg, address_registration=CheckType.CONTAIN, CIDs=[0, 1, 2])
# 5 - Leader
# Make a copy of leader's messages to ensure that we don't miss
@@ -170,9 +166,9 @@ class Cert_7_1_3_BorderRouterAsLeader(unittest.TestCase):
mle.CommandType.CHILD_UPDATE_RESPONSE,
sent_to_node=self.nodes[MED1],
)
check_child_update_response(
msg, address_registration=CheckType.CONTAIN, CIDs=[1, 2]
)
check_child_update_response(msg,
address_registration=CheckType.CONTAIN,
CIDs=[1, 2])
# 6A & 6B - Leader
if config.LEADER_NOTIFY_SED_BY_CHILD_UPDATE_REQUEST:
@@ -188,27 +184,24 @@ class Cert_7_1_3_BorderRouterAsLeader(unittest.TestCase):
)
else:
msg = leader_messages.next_mle_message(
mle.CommandType.DATA_RESPONSE, sent_to_node=self.nodes[SED1]
)
mle.CommandType.DATA_RESPONSE, sent_to_node=self.nodes[SED1])
check_data_response(msg, network_data_check=NetworkDataCheck())
# 7 - N/A
# Get addresses registered by SED1
msg = sed1_messages.next_mle_message(
mle.CommandType.CHILD_UPDATE_REQUEST
)
mle.CommandType.CHILD_UPDATE_REQUEST)
check_child_update_request_from_child(
msg, address_registration=CheckType.CONTAIN, CIDs=[0, 1]
)
msg, address_registration=CheckType.CONTAIN, CIDs=[0, 1])
# 8 - Leader
msg = leader_messages.next_mle_message(
mle.CommandType.CHILD_UPDATE_RESPONSE,
sent_to_node=self.nodes[SED1],
)
check_child_update_response(
msg, address_registration=CheckType.CONTAIN, CIDs=[1]
)
check_child_update_response(msg,
address_registration=CheckType.CONTAIN,
CIDs=[1])
if __name__ == '__main__':
@@ -41,6 +41,7 @@ MTDS = [SED2, ED2]
class Cert_7_1_4_BorderRouterAsRouter(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -41,6 +41,7 @@ MTDS = [ED2, SED2]
class Cert_7_1_5_BorderRouterAsRouter(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -127,11 +128,8 @@ class Cert_7_1_5_BorderRouterAsRouter(unittest.TestCase):
self.assertTrue(any('2001:2:0:2' in addr[0:10] for addr in addrs))
self.assertTrue(any('2001:2:0:3' in addr[0:10] for addr in addrs))
for addr in addrs:
if (
addr[0:10] == '2001:2:0:1'
or addr[0:10] == '2001:2:0:2'
or addr[0:10] == '2001:2:0:3'
):
if (addr[0:10] == '2001:2:0:1' or addr[0:10] == '2001:2:0:2' or
addr[0:10] == '2001:2:0:3'):
self.assertTrue(self.nodes[LEADER].ping(addr))
addrs = self.nodes[SED2].get_addrs()
@@ -139,11 +137,8 @@ class Cert_7_1_5_BorderRouterAsRouter(unittest.TestCase):
self.assertFalse(any('2001:2:0:2' in addr[0:10] for addr in addrs))
self.assertTrue(any('2001:2:0:3' in addr[0:10] for addr in addrs))
for addr in addrs:
if (
addr[0:10] == '2001:2:0:1'
or addr[0:10] == '2001:2:0:2'
or addr[0:10] == '2001:2:0:3'
):
if (addr[0:10] == '2001:2:0:1' or addr[0:10] == '2001:2:0:2' or
addr[0:10] == '2001:2:0:3'):
self.assertTrue(self.nodes[LEADER].ping(addr))
@@ -42,6 +42,7 @@ JOINER = 2
class Cert_8_1_01_Commissioning(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -52,8 +53,7 @@ class Cert_8_1_01_Commissioning(unittest.TestCase):
self.nodes[COMMISSIONER].set_panid(0xface)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].set_masterkey(
'00112233445566778899aabbccddeeff'
)
'00112233445566778899aabbccddeeff')
self.nodes[JOINER].set_mode('rsdn')
self.nodes[JOINER].set_masterkey('deadbeefdeadbeefdeadbeefdeadbeef')
@@ -73,15 +73,13 @@ class Cert_8_1_01_Commissioning(unittest.TestCase):
self.nodes[COMMISSIONER].commissioner_start()
self.simulator.go(3)
self.nodes[COMMISSIONER].commissioner_add_joiner(
self.nodes[JOINER].get_eui64(), 'OPENTHREAD'
)
self.nodes[JOINER].get_eui64(), 'OPENTHREAD')
self.nodes[JOINER].interface_up()
self.nodes[JOINER].joiner_start('OPENTHREAD')
self.simulator.go(10)
self.simulator.read_cert_messages_in_commissioning_log(
[COMMISSIONER, JOINER]
)
[COMMISSIONER, JOINER])
self.assertEqual(
self.nodes[JOINER].get_masterkey(),
self.nodes[COMMISSIONER].get_masterkey(),
@@ -89,66 +87,55 @@ class Cert_8_1_01_Commissioning(unittest.TestCase):
joiner_messages = self.simulator.get_messages_sent_by(JOINER)
commissioner_messages = self.simulator.get_messages_sent_by(
COMMISSIONER
)
COMMISSIONER)
# 2 - N/A
# 3 - Joiner_1
msg = joiner_messages.next_mle_message(
mle.CommandType.DISCOVERY_REQUEST
)
mle.CommandType.DISCOVERY_REQUEST)
command.check_discovery_request(msg)
request_src_addr = msg.mac_header.src_address
# 4 - Commissioner
msg = commissioner_messages.next_mle_message(
mle.CommandType.DISCOVERY_RESPONSE
)
command.check_discovery_response(
msg, request_src_addr, steering_data=CheckType.CONTAIN
)
mle.CommandType.DISCOVERY_RESPONSE)
command.check_discovery_response(msg,
request_src_addr,
steering_data=CheckType.CONTAIN)
udp_port_set_by_commissioner = command.get_joiner_udp_port_in_discovery_response(
msg)
# 5.2 - Joiner_1
msg = joiner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.CLIENT_HELLO
)
msg = joiner_messages.next_dtls_message(dtls.ContentType.HANDSHAKE,
dtls.HandshakeType.CLIENT_HELLO)
self.assertEqual(msg.get_dst_udp_port(), udp_port_set_by_commissioner)
# 5.3 - Commissioner
msg = commissioner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.HELLO_VERIFY_REQUEST
)
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.HELLO_VERIFY_REQUEST)
commissioner_cookie = msg.dtls.body.cookie
# 5.4 - Joiner_1
msg = joiner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.CLIENT_HELLO
)
msg = joiner_messages.next_dtls_message(dtls.ContentType.HANDSHAKE,
dtls.HandshakeType.CLIENT_HELLO)
self.assertEqual(commissioner_cookie, msg.dtls.body.cookie)
self.assertEqual(msg.get_dst_udp_port(), udp_port_set_by_commissioner)
# 5.5 - Commissioner
commissioner_messages.next_dtls_message(dtls.ContentType.HANDSHAKE,
dtls.HandshakeType.SERVER_HELLO)
commissioner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.SERVER_HELLO
)
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.SERVER_KEY_EXCHANGE)
commissioner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.SERVER_KEY_EXCHANGE
)
commissioner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.SERVER_HELLO_DONE
)
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.SERVER_HELLO_DONE)
# 5.6 - Joiner_1
msg = joiner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.CLIENT_KEY_EXCHANGE
)
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.CLIENT_KEY_EXCHANGE)
self.assertEqual(msg.get_dst_udp_port(), udp_port_set_by_commissioner)
msg = joiner_messages.next_dtls_message(
dtls.ContentType.CHANGE_CIPHER_SPEC
)
dtls.ContentType.CHANGE_CIPHER_SPEC)
self.assertEqual(msg.get_dst_udp_port(), udp_port_set_by_commissioner)
# TODO(wgtdkp): It's required to verify DTLS FINISHED message here.
@@ -156,8 +143,7 @@ class Cert_8_1_01_Commissioning(unittest.TestCase):
# 5.7 - Commissioner
commissioner_messages.next_dtls_message(
dtls.ContentType.CHANGE_CIPHER_SPEC
)
dtls.ContentType.CHANGE_CIPHER_SPEC)
# TODO(wgtdkp): It's required to verify DTLS FINISHED message here.
# Currently not handled as it is encrypted.
@@ -165,16 +151,13 @@ class Cert_8_1_01_Commissioning(unittest.TestCase):
# 5.8,9,10,11
# - Joiner_1
command.check_joiner_commissioning_messages(
joiner_messages.commissioning_messages
)
joiner_messages.commissioning_messages)
# - Commissioner
command.check_commissioner_commissioning_messages(
commissioner_messages.commissioning_messages
)
commissioner_messages.commissioning_messages)
# As commissioner is also joiner router
command.check_joiner_router_commissioning_messages(
commissioner_messages.commissioning_messages
)
commissioner_messages.commissioning_messages)
self.nodes[JOINER].thread_start()
self.simulator.go(5)
@@ -37,6 +37,7 @@ JOINER = 2
class Cert_8_1_02_Commissioning(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -47,8 +48,7 @@ class Cert_8_1_02_Commissioning(unittest.TestCase):
self.nodes[COMMISSIONER].set_panid(0xface)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].set_masterkey(
'deadbeefdeadbeefdeadbeefdeadbeef'
)
'deadbeefdeadbeefdeadbeefdeadbeef')
self.nodes[JOINER].set_mode('rsdn')
self.nodes[JOINER].set_masterkey('00112233445566778899aabbccddeeff')
@@ -68,8 +68,7 @@ class Cert_8_1_02_Commissioning(unittest.TestCase):
self.nodes[COMMISSIONER].commissioner_start()
self.simulator.go(3)
self.nodes[COMMISSIONER].commissioner_add_joiner(
self.nodes[JOINER].get_eui64(), 'OPENTHREAD'
)
self.nodes[JOINER].get_eui64(), 'OPENTHREAD')
self.nodes[JOINER].interface_up()
self.nodes[JOINER].joiner_start('DAERHTNEPO')
@@ -38,6 +38,7 @@ JOINER = 3
class Cert_8_2_01_JoinerRouter(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -48,15 +49,13 @@ class Cert_8_2_01_JoinerRouter(unittest.TestCase):
self.nodes[COMMISSIONER].set_panid(0xface)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].set_masterkey(
'deadbeefdeadbeefdeadbeefdeadbeef'
)
'deadbeefdeadbeefdeadbeefdeadbeef')
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[JOINER_ROUTER].set_mode('rsdn')
self.nodes[JOINER_ROUTER].set_masterkey(
'00112233445566778899aabbccddeeff'
)
'00112233445566778899aabbccddeeff')
self.nodes[JOINER_ROUTER].enable_whitelist()
self.nodes[JOINER_ROUTER].set_router_selection_jitter(1)
@@ -80,19 +79,15 @@ class Cert_8_2_01_JoinerRouter(unittest.TestCase):
self.nodes[COMMISSIONER].commissioner_start()
self.simulator.go(5)
self.nodes[COMMISSIONER].commissioner_add_joiner(
self.nodes[JOINER_ROUTER].get_eui64(), 'OPENTHREAD'
)
self.nodes[JOINER_ROUTER].get_eui64(), 'OPENTHREAD')
self.nodes[COMMISSIONER].commissioner_add_joiner(
self.nodes[JOINER].get_eui64(), 'OPENTHREAD2'
)
self.nodes[JOINER].get_eui64(), 'OPENTHREAD2')
self.simulator.go(5)
self.nodes[COMMISSIONER].add_whitelist(
self.nodes[JOINER_ROUTER].get_joiner_id()
)
self.nodes[JOINER_ROUTER].get_joiner_id())
self.nodes[JOINER_ROUTER].add_whitelist(
self.nodes[COMMISSIONER].get_addr64()
)
self.nodes[COMMISSIONER].get_addr64())
self.nodes[JOINER_ROUTER].interface_up()
self.nodes[JOINER_ROUTER].joiner_start('OPENTHREAD')
@@ -103,19 +98,15 @@ class Cert_8_2_01_JoinerRouter(unittest.TestCase):
)
self.nodes[COMMISSIONER].add_whitelist(
self.nodes[JOINER_ROUTER].get_addr64()
)
self.nodes[JOINER_ROUTER].get_addr64())
self.nodes[JOINER_ROUTER].thread_start()
self.simulator.go(5)
self.assertEqual(self.nodes[JOINER_ROUTER].get_state(), 'router')
self.nodes[JOINER_ROUTER].add_whitelist(
self.nodes[JOINER].get_joiner_id()
)
self.nodes[JOINER].add_whitelist(
self.nodes[JOINER_ROUTER].get_addr64()
)
self.nodes[JOINER].get_joiner_id())
self.nodes[JOINER].add_whitelist(self.nodes[JOINER_ROUTER].get_addr64())
self.nodes[JOINER].interface_up()
self.nodes[JOINER].joiner_start('OPENTHREAD2')
@@ -125,9 +116,7 @@ class Cert_8_2_01_JoinerRouter(unittest.TestCase):
self.nodes[COMMISSIONER].get_masterkey(),
)
self.nodes[JOINER_ROUTER].add_whitelist(
self.nodes[JOINER].get_addr64()
)
self.nodes[JOINER_ROUTER].add_whitelist(self.nodes[JOINER].get_addr64())
self.nodes[JOINER].thread_start()
self.simulator.go(5)
@@ -38,6 +38,7 @@ JOINER = 3
class Cert_8_2_02_JoinerRouter(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -48,15 +49,13 @@ class Cert_8_2_02_JoinerRouter(unittest.TestCase):
self.nodes[COMMISSIONER].set_panid(0xface)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].set_masterkey(
'deadbeefdeadbeefdeadbeefdeadbeef'
)
'deadbeefdeadbeefdeadbeefdeadbeef')
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[JOINER_ROUTER].set_mode('rsdn')
self.nodes[JOINER_ROUTER].set_masterkey(
'00112233445566778899aabbccddeeff'
)
'00112233445566778899aabbccddeeff')
self.nodes[JOINER_ROUTER].enable_whitelist()
self.nodes[JOINER_ROUTER].set_router_selection_jitter(1)
@@ -80,19 +79,15 @@ class Cert_8_2_02_JoinerRouter(unittest.TestCase):
self.nodes[COMMISSIONER].commissioner_start()
self.simulator.go(5)
self.nodes[COMMISSIONER].commissioner_add_joiner(
self.nodes[JOINER_ROUTER].get_eui64(), 'OPENTHREAD'
)
self.nodes[JOINER_ROUTER].get_eui64(), 'OPENTHREAD')
self.nodes[COMMISSIONER].commissioner_add_joiner(
self.nodes[JOINER].get_eui64(), 'OPENTHREAD2'
)
self.nodes[JOINER].get_eui64(), 'OPENTHREAD2')
self.simulator.go(5)
self.nodes[COMMISSIONER].add_whitelist(
self.nodes[JOINER_ROUTER].get_joiner_id()
)
self.nodes[JOINER_ROUTER].get_joiner_id())
self.nodes[JOINER_ROUTER].add_whitelist(
self.nodes[COMMISSIONER].get_addr64()
)
self.nodes[COMMISSIONER].get_addr64())
self.nodes[JOINER_ROUTER].interface_up()
self.nodes[JOINER_ROUTER].joiner_start('OPENTHREAD')
@@ -103,19 +98,15 @@ class Cert_8_2_02_JoinerRouter(unittest.TestCase):
)
self.nodes[COMMISSIONER].add_whitelist(
self.nodes[JOINER_ROUTER].get_addr64()
)
self.nodes[JOINER_ROUTER].get_addr64())
self.nodes[JOINER_ROUTER].thread_start()
self.simulator.go(5)
self.assertEqual(self.nodes[JOINER_ROUTER].get_state(), 'router')
self.nodes[JOINER_ROUTER].add_whitelist(
self.nodes[JOINER].get_joiner_id()
)
self.nodes[JOINER].add_whitelist(
self.nodes[JOINER_ROUTER].get_addr64()
)
self.nodes[JOINER].get_joiner_id())
self.nodes[JOINER].add_whitelist(self.nodes[JOINER_ROUTER].get_addr64())
self.nodes[JOINER].interface_up()
self.nodes[JOINER].joiner_start('2DAERHTNEPO')
@@ -41,6 +41,7 @@ LEADER = 2
class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -88,15 +89,13 @@ class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
leader_messages = self.simulator.get_messages_sent_by(LEADER)
msg = leader_messages.next_coap_message('2.04', assert_enabled=True)
commissioner_session_id_tlv = command.get_sub_tlv(
msg.coap.payload, mesh_cop.CommissionerSessionId
)
msg.coap.payload, mesh_cop.CommissionerSessionId)
# Step 2 - Harness instructs commissioner to send
# MGMT_COMMISSIONER_SET.req to Leader
steering_data_tlv = mesh_cop.SteeringData(bytes([0xff]))
self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs(
[steering_data_tlv]
)
[steering_data_tlv])
self.simulator.go(5)
# Step 3 - Leader responds to MGMT_COMMISSIONER_SET.req with
@@ -105,19 +104,16 @@ class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
msg = leader_messages.next_coap_message('2.04')
# (mesh_cop.State(mesh_cop.MeshCopState.REJECT),) <- this a tuple, don't delete the comma
command.check_coap_message(
msg, [mesh_cop.State(mesh_cop.MeshCopState.REJECT)]
)
msg, [mesh_cop.State(mesh_cop.MeshCopState.REJECT)])
self.simulator.get_messages_sent_by(COMMISSIONER) # Skip LEAD_PET.req
# Step 4 - Harness instructs commissioner to send
# MGMT_COMMISSIONER_SET.req to Leader
self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs(
[steering_data_tlv, commissioner_session_id_tlv]
)
[steering_data_tlv, commissioner_session_id_tlv])
self.simulator.go(5)
commissioner_messages = self.simulator.get_messages_sent_by(
COMMISSIONER
)
COMMISSIONER)
msg = commissioner_messages.next_coap_message('0.02', uri_path='/c/cs')
rloc = ip_address(self.nodes[LEADER].get_rloc())
leader_aloc = ip_address(self.nodes[LEADER].get_addr_leader_aloc())
@@ -131,8 +127,7 @@ class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
leader_messages = self.simulator.get_messages_sent_by(LEADER)
msg = leader_messages.next_coap_message('2.04')
command.check_coap_message(
msg, [mesh_cop.State(mesh_cop.MeshCopState.ACCEPT)]
)
msg, [mesh_cop.State(mesh_cop.MeshCopState.ACCEPT)])
# Step 6 - Leader sends a multicast MLE Data Response
msg = leader_messages.next_mle_message(mle.CommandType.DATA_RESPONSE)
@@ -146,16 +141,14 @@ class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
mesh_cop.SteeringData,
mesh_cop.BorderAgentLocator,
],
)
),
)),
)
# Step 7 - Harness instructs commissioner to send
# MGMT_COMMISSIONER_SET.req to Leader
border_agent_locator_tlv = mesh_cop.BorderAgentLocator(0x0400)
self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs(
[commissioner_session_id_tlv, border_agent_locator_tlv]
)
[commissioner_session_id_tlv, border_agent_locator_tlv])
self.simulator.go(5)
# Step 8 - Leader responds to MGMT_COMMISSIONER_SET.req with
@@ -163,18 +156,15 @@ class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
leader_messages = self.simulator.get_messages_sent_by(LEADER)
msg = leader_messages.next_coap_message('2.04')
command.check_coap_message(
msg, [mesh_cop.State(mesh_cop.MeshCopState.REJECT)]
)
msg, [mesh_cop.State(mesh_cop.MeshCopState.REJECT)])
# Step 9 - Harness instructs commissioner to send
# MGMT_COMMISSIONER_SET.req to Leader
self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs(
[
steering_data_tlv,
commissioner_session_id_tlv,
border_agent_locator_tlv,
]
)
self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([
steering_data_tlv,
commissioner_session_id_tlv,
border_agent_locator_tlv,
])
self.simulator.go(5)
# Step 10 - Leader responds to MGMT_COMMISSIONER_SET.req with
@@ -182,14 +172,12 @@ class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
leader_messages = self.simulator.get_messages_sent_by(LEADER)
msg = leader_messages.next_coap_message('2.04')
command.check_coap_message(
msg, [mesh_cop.State(mesh_cop.MeshCopState.REJECT)]
)
msg, [mesh_cop.State(mesh_cop.MeshCopState.REJECT)])
# Step 11 - Harness instructs commissioner to send
# MGMT_COMMISSIONER_SET.req to Leader
self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs(
[mesh_cop.CommissionerSessionId(0xffff), steering_data_tlv]
)
[mesh_cop.CommissionerSessionId(0xffff), steering_data_tlv])
self.simulator.go(5)
# Step 12 - Leader responds to MGMT_COMMISSIONER_SET.req with
@@ -197,18 +185,15 @@ class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
leader_messages = self.simulator.get_messages_sent_by(LEADER)
msg = leader_messages.next_coap_message('2.04')
command.check_coap_message(
msg, [mesh_cop.State(mesh_cop.MeshCopState.REJECT)]
)
msg, [mesh_cop.State(mesh_cop.MeshCopState.REJECT)])
# Step 13 - Harness instructs commissioner to send
# MGMT_COMMISSIONER_SET.req to Leader
self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs(
[
commissioner_session_id_tlv,
steering_data_tlv,
mesh_cop.Channel(0x0, 0x0),
]
)
self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([
commissioner_session_id_tlv,
steering_data_tlv,
mesh_cop.Channel(0x0, 0x0),
])
self.simulator.go(5)
# Step 14 - Leader responds to MGMT_COMMISSIONER_SET.req with
@@ -216,8 +201,7 @@ class Cert_9_2_02_MGMTCommissionerSet(unittest.TestCase):
leader_messages = self.simulator.get_messages_sent_by(LEADER)
msg = leader_messages.next_coap_message('2.04')
command.check_coap_message(
msg, [mesh_cop.State(mesh_cop.MeshCopState.ACCEPT)]
)
msg, [mesh_cop.State(mesh_cop.MeshCopState.ACCEPT)])
# Step 15 - Send ICMPv6 Echo Request to Leader
leader_rloc = self.nodes[LEADER].get_rloc()
@@ -37,6 +37,7 @@ LEADER = 2
class Cert_9_2_04_ActiveDataset(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -45,14 +46,12 @@ class Cert_9_2_04_ActiveDataset(unittest.TestCase):
self.nodes[i] = node.Node(i, simulator=self.simulator)
self.nodes[COMMISSIONER].set_active_dataset(
10, panid=0xface, master_key='000102030405060708090a0b0c0d0e0f'
)
10, panid=0xface, master_key='000102030405060708090a0b0c0d0e0f')
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER].set_active_dataset(
10, panid=0xface, master_key='000102030405060708090a0b0c0d0e0f'
)
10, panid=0xface, master_key='000102030405060708090a0b0c0d0e0f')
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].set_router_selection_jitter(1)
@@ -48,6 +48,7 @@ COMMISSIONER_PENDING_PANID = 0xafce
class Cert_9_2_7_DelayTimer(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -70,9 +71,8 @@ class Cert_9_2_7_DelayTimer(unittest.TestCase):
self.nodes[LEADER].set_router_selection_jitter(1)
self.nodes[ROUTER].set_active_dataset(ROUTER_ACTIVE_TIMESTAMP)
self.nodes[ROUTER].set_pending_dataset(
ROUTER_PENDING_TIMESTAMP, ROUTER_PENDING_ACTIVE_TIMESTAMP
)
self.nodes[ROUTER].set_pending_dataset(ROUTER_PENDING_TIMESTAMP,
ROUTER_PENDING_ACTIVE_TIMESTAMP)
self.nodes[ROUTER].set_mode('rsdn')
self.nodes[ROUTER].set_panid(PANID_INIT)
self.nodes[ROUTER].set_partition_id(0x1)
@@ -122,26 +122,21 @@ class Cert_9_2_7_DelayTimer(unittest.TestCase):
panid=COMMISSIONER_PENDING_PANID,
)
self.simulator.go(40)
self.assertEqual(
self.nodes[LEADER].get_panid(), COMMISSIONER_PENDING_PANID
)
self.assertEqual(
self.nodes[COMMISSIONER].get_panid(), COMMISSIONER_PENDING_PANID
)
self.assertEqual(
self.nodes[ROUTER].get_panid(), COMMISSIONER_PENDING_PANID
)
self.assertEqual(self.nodes[LEADER].get_panid(),
COMMISSIONER_PENDING_PANID)
self.assertEqual(self.nodes[COMMISSIONER].get_panid(),
COMMISSIONER_PENDING_PANID)
self.assertEqual(self.nodes[ROUTER].get_panid(),
COMMISSIONER_PENDING_PANID)
self.assertEqual(
self.nodes[LEADER].get_channel(), COMMISSIONER_PENDING_CHANNEL
)
self.assertEqual(self.nodes[LEADER].get_channel(),
COMMISSIONER_PENDING_CHANNEL)
self.assertEqual(
self.nodes[COMMISSIONER].get_channel(),
COMMISSIONER_PENDING_CHANNEL,
)
self.assertEqual(
self.nodes[ROUTER].get_channel(), COMMISSIONER_PENDING_CHANNEL
)
self.assertEqual(self.nodes[ROUTER].get_channel(),
COMMISSIONER_PENDING_CHANNEL)
ipaddrs = self.nodes[ROUTER].get_addrs()
for ipaddr in ipaddrs:
@@ -49,6 +49,7 @@ MTDS = [ED, SED]
class Cert_9_2_8_PersistentDatasets(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -56,17 +57,17 @@ class Cert_9_2_8_PersistentDatasets(unittest.TestCase):
for i in range(1, 6):
self.nodes[i] = node.Node(i, (i in MTDS), simulator=self.simulator)
self.nodes[COMMISSIONER].set_active_dataset(
LEADER_ACTIVE_TIMESTAMP, panid=PANID_INIT, channel=CHANNEL_INIT
)
self.nodes[COMMISSIONER].set_active_dataset(LEADER_ACTIVE_TIMESTAMP,
panid=PANID_INIT,
channel=CHANNEL_INIT)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER].set_active_dataset(
LEADER_ACTIVE_TIMESTAMP, panid=PANID_INIT, channel=CHANNEL_INIT
)
self.nodes[LEADER].set_active_dataset(LEADER_ACTIVE_TIMESTAMP,
panid=PANID_INIT,
channel=CHANNEL_INIT)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].add_whitelist(self.nodes[COMMISSIONER].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER].get_addr64())
@@ -74,9 +75,9 @@ class Cert_9_2_8_PersistentDatasets(unittest.TestCase):
self.nodes[LEADER].add_whitelist(self.nodes[SED].get_addr64())
self.nodes[LEADER].enable_whitelist()
self.nodes[ROUTER].set_active_dataset(
LEADER_ACTIVE_TIMESTAMP, panid=PANID_INIT, channel=CHANNEL_INIT
)
self.nodes[ROUTER].set_active_dataset(LEADER_ACTIVE_TIMESTAMP,
panid=PANID_INIT,
channel=CHANNEL_INIT)
self.nodes[ROUTER].set_mode('rsdn')
self._setUpRouter()
@@ -149,16 +150,13 @@ class Cert_9_2_8_PersistentDatasets(unittest.TestCase):
self.simulator.go(60)
self.assertEqual(
self.nodes[LEADER].get_panid(), COMMISSIONER_PENDING_PANID
)
self.assertEqual(
self.nodes[COMMISSIONER].get_panid(), COMMISSIONER_PENDING_PANID
)
self.assertEqual(self.nodes[LEADER].get_panid(),
COMMISSIONER_PENDING_PANID)
self.assertEqual(self.nodes[COMMISSIONER].get_panid(),
COMMISSIONER_PENDING_PANID)
self.assertEqual(
self.nodes[LEADER].get_channel(), COMMISSIONER_PENDING_CHANNEL
)
self.assertEqual(self.nodes[LEADER].get_channel(),
COMMISSIONER_PENDING_CHANNEL)
self.assertEqual(
self.nodes[COMMISSIONER].get_channel(),
COMMISSIONER_PENDING_CHANNEL,
@@ -186,25 +184,18 @@ class Cert_9_2_8_PersistentDatasets(unittest.TestCase):
self.simulator.go(10)
self.assertEqual(
self.nodes[ROUTER].get_panid(), COMMISSIONER_PENDING_PANID
)
self.assertEqual(
self.nodes[ED].get_panid(), COMMISSIONER_PENDING_PANID
)
self.assertEqual(
self.nodes[SED].get_panid(), COMMISSIONER_PENDING_PANID
)
self.assertEqual(self.nodes[ROUTER].get_panid(),
COMMISSIONER_PENDING_PANID)
self.assertEqual(self.nodes[ED].get_panid(), COMMISSIONER_PENDING_PANID)
self.assertEqual(self.nodes[SED].get_panid(),
COMMISSIONER_PENDING_PANID)
self.assertEqual(
self.nodes[ROUTER].get_channel(), COMMISSIONER_PENDING_CHANNEL
)
self.assertEqual(
self.nodes[ED].get_channel(), COMMISSIONER_PENDING_CHANNEL
)
self.assertEqual(
self.nodes[SED].get_channel(), COMMISSIONER_PENDING_CHANNEL
)
self.assertEqual(self.nodes[ROUTER].get_channel(),
COMMISSIONER_PENDING_CHANNEL)
self.assertEqual(self.nodes[ED].get_channel(),
COMMISSIONER_PENDING_CHANNEL)
self.assertEqual(self.nodes[SED].get_channel(),
COMMISSIONER_PENDING_CHANNEL)
self.simulator.go(5)
@@ -45,6 +45,7 @@ ROUTER2 = 4
class Cert_9_2_09_PendingPartition(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -52,17 +53,17 @@ class Cert_9_2_09_PendingPartition(unittest.TestCase):
for i in range(1, 5):
self.nodes[i] = node.Node(i, simulator=self.simulator)
self.nodes[COMMISSIONER].set_active_dataset(
10, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[COMMISSIONER].set_active_dataset(10,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER].set_active_dataset(
10, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[LEADER].set_active_dataset(10,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].set_partition_id(0xffffffff)
self.nodes[LEADER].add_whitelist(self.nodes[COMMISSIONER].get_addr64())
@@ -70,18 +71,18 @@ class Cert_9_2_09_PendingPartition(unittest.TestCase):
self.nodes[LEADER].enable_whitelist()
self.nodes[LEADER].set_router_selection_jitter(1)
self.nodes[ROUTER1].set_active_dataset(
10, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[ROUTER1].set_active_dataset(10,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[ROUTER2].get_addr64())
self.nodes[ROUTER1].enable_whitelist()
self.nodes[ROUTER1].set_router_selection_jitter(1)
self.nodes[ROUTER2].set_active_dataset(
10, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[ROUTER2].set_active_dataset(10,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[ROUTER2].enable_whitelist()
@@ -48,6 +48,7 @@ MTDS = [ED1, SED1]
class Cert_9_2_10_PendingPartition(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -55,17 +56,17 @@ class Cert_9_2_10_PendingPartition(unittest.TestCase):
for i in range(1, 6):
self.nodes[i] = node.Node(i, (i in MTDS), simulator=self.simulator)
self.nodes[COMMISSIONER].set_active_dataset(
15, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[COMMISSIONER].set_active_dataset(15,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER].set_active_dataset(
15, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[LEADER].set_active_dataset(15,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].set_partition_id(0xffffffff)
self.nodes[LEADER].add_whitelist(self.nodes[COMMISSIONER].get_addr64())
@@ -73,9 +74,9 @@ class Cert_9_2_10_PendingPartition(unittest.TestCase):
self.nodes[LEADER].enable_whitelist()
self.nodes[LEADER].set_router_selection_jitter(1)
self.nodes[ROUTER1].set_active_dataset(
15, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[ROUTER1].set_active_dataset(15,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[ED1].get_addr64())
@@ -48,6 +48,7 @@ MTDS = [ED1, SED1]
class Cert_9_2_11_MasterKey(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -55,26 +56,29 @@ class Cert_9_2_11_MasterKey(unittest.TestCase):
for i in range(1, 6):
self.nodes[i] = node.Node(i, (i in MTDS), simulator=self.simulator)
self.nodes[COMMISSIONER].set_active_dataset(
10, channel=CHANNEL_INIT, panid=PANID_INIT, master_key=KEY1
)
self.nodes[COMMISSIONER].set_active_dataset(10,
channel=CHANNEL_INIT,
panid=PANID_INIT,
master_key=KEY1)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER].set_active_dataset(
10, channel=CHANNEL_INIT, panid=PANID_INIT, master_key=KEY1
)
self.nodes[LEADER].set_active_dataset(10,
channel=CHANNEL_INIT,
panid=PANID_INIT,
master_key=KEY1)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].add_whitelist(self.nodes[COMMISSIONER].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[LEADER].enable_whitelist()
self.nodes[LEADER].set_router_selection_jitter(1)
self.nodes[ROUTER1].set_active_dataset(
10, channel=CHANNEL_INIT, panid=PANID_INIT, master_key=KEY1
)
self.nodes[ROUTER1].set_active_dataset(10,
channel=CHANNEL_INIT,
panid=PANID_INIT,
master_key=KEY1)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[ED1].get_addr64())
@@ -48,6 +48,7 @@ DATASET2_PANID = 0xafce
class Cert_9_2_12_Announce(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -55,34 +56,34 @@ class Cert_9_2_12_Announce(unittest.TestCase):
for i in range(1, 6):
self.nodes[i] = node.Node(i, (i == MED), simulator=self.simulator)
self.nodes[LEADER1].set_active_dataset(
DATASET1_TIMESTAMP, channel=DATASET1_CHANNEL, panid=DATASET1_PANID
)
self.nodes[LEADER1].set_active_dataset(DATASET1_TIMESTAMP,
channel=DATASET1_CHANNEL,
panid=DATASET1_PANID)
self.nodes[LEADER1].set_mode('rsdn')
self.nodes[LEADER1].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[LEADER1].enable_whitelist()
self.nodes[ROUTER1].set_active_dataset(
DATASET1_TIMESTAMP, channel=DATASET1_CHANNEL, panid=DATASET1_PANID
)
self.nodes[ROUTER1].set_active_dataset(DATASET1_TIMESTAMP,
channel=DATASET1_CHANNEL,
panid=DATASET1_PANID)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER1].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER2].get_addr64())
self.nodes[ROUTER1].enable_whitelist()
self.nodes[ROUTER1].set_router_selection_jitter(1)
self.nodes[LEADER2].set_active_dataset(
DATASET2_TIMESTAMP, channel=DATASET2_CHANNEL, panid=DATASET2_PANID
)
self.nodes[LEADER2].set_active_dataset(DATASET2_TIMESTAMP,
channel=DATASET2_CHANNEL,
panid=DATASET2_PANID)
self.nodes[LEADER2].set_mode('rsdn')
self.nodes[LEADER2].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[LEADER2].add_whitelist(self.nodes[ROUTER2].get_addr64())
self.nodes[LEADER2].enable_whitelist()
self.nodes[LEADER2].set_router_selection_jitter(1)
self.nodes[ROUTER2].set_active_dataset(
DATASET2_TIMESTAMP, channel=DATASET2_CHANNEL, panid=DATASET2_PANID
)
self.nodes[ROUTER2].set_active_dataset(DATASET2_TIMESTAMP,
channel=DATASET2_CHANNEL,
panid=DATASET2_PANID)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[LEADER2].get_addr64())
self.nodes[ROUTER2].add_whitelist(self.nodes[MED].get_addr64())
@@ -39,6 +39,7 @@ ED1 = 4
class Cert_9_2_13_EnergyScan(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -101,9 +102,7 @@ class Cert_9_2_13_EnergyScan(unittest.TestCase):
break
self.assertTrue(self.nodes[COMMISSIONER].ping(ipaddr))
self.nodes[COMMISSIONER].energy_scan(
0x50000, 0x02, 0x20, 0x3E8, ipaddr
)
self.nodes[COMMISSIONER].energy_scan(0x50000, 0x02, 0x20, 0x3E8, ipaddr)
ipaddrs = self.nodes[ED1].get_addrs()
for ipaddr in ipaddrs:
@@ -111,13 +110,10 @@ class Cert_9_2_13_EnergyScan(unittest.TestCase):
break
self.assertTrue(self.nodes[COMMISSIONER].ping(ipaddr))
self.nodes[COMMISSIONER].energy_scan(
0x50000, 0x02, 0x20, 0x3E8, ipaddr
)
self.nodes[COMMISSIONER].energy_scan(0x50000, 0x02, 0x20, 0x3E8, ipaddr)
self.nodes[COMMISSIONER].energy_scan(
0x50000, 0x02, 0x20, 0x3E8, 'ff33:0040:fdde:ad00:beef:0:0:1'
)
self.nodes[COMMISSIONER].energy_scan(0x50000, 0x02, 0x20, 0x3E8,
'ff33:0040:fdde:ad00:beef:0:0:1')
self.assertTrue(self.nodes[COMMISSIONER].ping(ipaddr))
@@ -39,6 +39,7 @@ LEADER2 = 4
class Cert_9_2_14_PanIdQuery(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -48,17 +49,13 @@ class Cert_9_2_14_PanIdQuery(unittest.TestCase):
self.nodes[COMMISSIONER].set_panid(0xface)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].add_whitelist(
self.nodes[LEADER1].get_addr64()
)
self.nodes[COMMISSIONER].add_whitelist(self.nodes[LEADER1].get_addr64())
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER1].set_panid(0xface)
self.nodes[LEADER1].set_mode('rsdn')
self.nodes[LEADER1].add_whitelist(
self.nodes[COMMISSIONER].get_addr64()
)
self.nodes[LEADER1].add_whitelist(self.nodes[COMMISSIONER].get_addr64())
self.nodes[LEADER1].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[LEADER1].enable_whitelist()
@@ -106,9 +103,8 @@ class Cert_9_2_14_PanIdQuery(unittest.TestCase):
self.nodes[COMMISSIONER].panid_query(0xdead, 0xffffffff, ipaddr)
self.nodes[COMMISSIONER].panid_query(
0xdead, 0xffffffff, 'ff33:0040:fdde:ad00:beef:0:0:1'
)
self.nodes[COMMISSIONER].panid_query(0xdead, 0xffffffff,
'ff33:0040:fdde:ad00:beef:0:0:1')
self.assertTrue(self.nodes[COMMISSIONER].ping(ipaddr))
@@ -44,6 +44,7 @@ ROUTER2 = 4
class Cert_9_2_15_PendingPartition(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -51,17 +52,17 @@ class Cert_9_2_15_PendingPartition(unittest.TestCase):
for i in range(1, 5):
self.nodes[i] = node.Node(i, simulator=self.simulator)
self.nodes[COMMISSIONER].set_active_dataset(
15, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[COMMISSIONER].set_active_dataset(15,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER].set_active_dataset(
15, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[LEADER].set_active_dataset(15,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].set_partition_id(0xffffffff)
self.nodes[LEADER].add_whitelist(self.nodes[COMMISSIONER].get_addr64())
@@ -69,18 +70,18 @@ class Cert_9_2_15_PendingPartition(unittest.TestCase):
self.nodes[LEADER].enable_whitelist()
self.nodes[LEADER].set_router_selection_jitter(1)
self.nodes[ROUTER1].set_active_dataset(
15, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[ROUTER1].set_active_dataset(15,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[ROUTER2].get_addr64())
self.nodes[ROUTER1].enable_whitelist()
self.nodes[ROUTER1].set_router_selection_jitter(1)
self.nodes[ROUTER2].set_active_dataset(
15, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[ROUTER2].set_active_dataset(15,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[ROUTER2].set_mode('rsdn')
self._setUpRouter2()
@@ -45,6 +45,7 @@ ROUTER2 = 4
class Cert_9_2_16_ActivePendingPartition(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -52,17 +53,17 @@ class Cert_9_2_16_ActivePendingPartition(unittest.TestCase):
for i in range(1, 5):
self.nodes[i] = node.Node(i, simulator=self.simulator)
self.nodes[COMMISSIONER].set_active_dataset(
1, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[COMMISSIONER].set_active_dataset(1,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER].set_active_dataset(
1, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[LEADER].set_active_dataset(1,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].set_partition_id(0xffffffff)
self.nodes[LEADER].add_whitelist(self.nodes[COMMISSIONER].get_addr64())
@@ -70,18 +71,18 @@ class Cert_9_2_16_ActivePendingPartition(unittest.TestCase):
self.nodes[LEADER].enable_whitelist()
self.nodes[LEADER].set_router_selection_jitter(1)
self.nodes[ROUTER1].set_active_dataset(
1, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[ROUTER1].set_active_dataset(1,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[ROUTER2].get_addr64())
self.nodes[ROUTER1].enable_whitelist()
self.nodes[ROUTER1].set_router_selection_jitter(1)
self.nodes[ROUTER2].set_active_dataset(
1, channel=CHANNEL_INIT, panid=PANID_INIT
)
self.nodes[ROUTER2].set_active_dataset(1,
channel=CHANNEL_INIT,
panid=PANID_INIT)
self.nodes[ROUTER2].set_mode('rsdn')
self._setUpRouter2()
@@ -135,27 +136,22 @@ class Cert_9_2_16_ActivePendingPartition(unittest.TestCase):
)
self.simulator.go(5)
self.nodes[COMMISSIONER].send_mgmt_active_set(
active_timestamp=15, network_name='threadCert'
)
self.nodes[COMMISSIONER].send_mgmt_active_set(active_timestamp=15,
network_name='threadCert')
self.simulator.go(100)
self.nodes[ROUTER2].start()
self.simulator.go(5)
self.assertEqual(self.nodes[ROUTER2].get_state(), 'router')
self.assertEqual(
self.nodes[COMMISSIONER].get_network_name(), NETWORK_NAME_FINAL
)
self.assertEqual(
self.nodes[LEADER].get_network_name(), NETWORK_NAME_FINAL
)
self.assertEqual(
self.nodes[ROUTER1].get_network_name(), NETWORK_NAME_FINAL
)
self.assertEqual(
self.nodes[ROUTER2].get_network_name(), NETWORK_NAME_FINAL
)
self.assertEqual(self.nodes[COMMISSIONER].get_network_name(),
NETWORK_NAME_FINAL)
self.assertEqual(self.nodes[LEADER].get_network_name(),
NETWORK_NAME_FINAL)
self.assertEqual(self.nodes[ROUTER1].get_network_name(),
NETWORK_NAME_FINAL)
self.assertEqual(self.nodes[ROUTER2].get_network_name(),
NETWORK_NAME_FINAL)
self.simulator.go(100)
@@ -43,6 +43,7 @@ ED1 = 3
class Cert_9_2_17_Orphan(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -50,17 +51,19 @@ class Cert_9_2_17_Orphan(unittest.TestCase):
for i in range(1, 4):
self.nodes[i] = node.Node(i, (i == ED1), simulator=self.simulator)
self.nodes[LEADER1].set_active_dataset(
10, channel=CHANNEL1, panid=PANID_INIT, channel_mask=CHANNEL_MASK
)
self.nodes[LEADER1].set_active_dataset(10,
channel=CHANNEL1,
panid=PANID_INIT,
channel_mask=CHANNEL_MASK)
self.nodes[LEADER1].set_mode('rsdn')
self.nodes[LEADER1].add_whitelist(self.nodes[ED1].get_addr64())
self.nodes[LEADER1].enable_whitelist()
self.nodes[LEADER1].set_router_selection_jitter(1)
self.nodes[LEADER2].set_active_dataset(
20, channel=CHANNEL2, panid=PANID_INIT, channel_mask=CHANNEL_MASK
)
self.nodes[LEADER2].set_active_dataset(20,
channel=CHANNEL2,
panid=PANID_INIT,
channel_mask=CHANNEL_MASK)
self.nodes[LEADER2].set_mode('rsdn')
self.nodes[LEADER2].enable_whitelist()
self.nodes[LEADER2].set_router_selection_jitter(1)
@@ -49,6 +49,7 @@ MTDS = [ED1, SED1]
class Cert_9_2_18_RollBackActiveTimestamp(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
@@ -56,17 +57,19 @@ class Cert_9_2_18_RollBackActiveTimestamp(unittest.TestCase):
for i in range(1, 7):
self.nodes[i] = node.Node(i, (i in MTDS), simulator=self.simulator)
self.nodes[COMMISSIONER].set_active_dataset(
1, channel=CHANNEL_INIT, panid=PANID_INIT, master_key=KEY1
)
self.nodes[COMMISSIONER].set_active_dataset(1,
channel=CHANNEL_INIT,
panid=PANID_INIT,
master_key=KEY1)
self.nodes[COMMISSIONER].set_mode('rsdn')
self.nodes[COMMISSIONER].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[COMMISSIONER].enable_whitelist()
self.nodes[COMMISSIONER].set_router_selection_jitter(1)
self.nodes[LEADER].set_active_dataset(
1, channel=CHANNEL_INIT, panid=PANID_INIT, master_key=KEY1
)
self.nodes[LEADER].set_active_dataset(1,
channel=CHANNEL_INIT,
panid=PANID_INIT,
master_key=KEY1)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].set_partition_id(0xffffffff)
self.nodes[LEADER].add_whitelist(self.nodes[COMMISSIONER].get_addr64())
@@ -74,9 +77,10 @@ class Cert_9_2_18_RollBackActiveTimestamp(unittest.TestCase):
self.nodes[LEADER].enable_whitelist()
self.nodes[LEADER].set_router_selection_jitter(1)
self.nodes[ROUTER1].set_active_dataset(
1, channel=CHANNEL_INIT, panid=PANID_INIT, master_key=KEY1
)
self.nodes[ROUTER1].set_active_dataset(1,
channel=CHANNEL_INIT,
panid=PANID_INIT,
master_key=KEY1)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[ROUTER2].get_addr64())
@@ -85,9 +89,10 @@ class Cert_9_2_18_RollBackActiveTimestamp(unittest.TestCase):
self.nodes[ROUTER1].enable_whitelist()
self.nodes[ROUTER1].set_router_selection_jitter(1)
self.nodes[ROUTER2].set_active_dataset(
1, channel=CHANNEL_INIT, panid=PANID_INIT, master_key=KEY1
)
self.nodes[ROUTER2].set_active_dataset(1,
channel=CHANNEL_INIT,
panid=PANID_INIT,
master_key=KEY1)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[ROUTER2].enable_whitelist()
@@ -136,9 +141,8 @@ class Cert_9_2_18_RollBackActiveTimestamp(unittest.TestCase):
self.simulator.go(5)
self.assertEqual(self.nodes[SED1].get_state(), 'child')
self.nodes[COMMISSIONER].send_mgmt_active_set(
active_timestamp=20000, network_name='GRL'
)
self.nodes[COMMISSIONER].send_mgmt_active_set(active_timestamp=20000,
network_name='GRL')
self.simulator.go(5)
self.nodes[COMMISSIONER].send_mgmt_pending_set(
+1
View File
@@ -35,6 +35,7 @@ LEADER = 1
class Cert_Cli(unittest.TestCase):
def setUp(self):
self.nodes = {}
self.nodes[LEADER] = node.Node(LEADER)
@@ -37,6 +37,7 @@ ROUTER = 2
class Test_MacScan(unittest.TestCase):
def setUp(self):
self.nodes = {}
for i in range(1, 3):
+38 -46
View File
@@ -61,7 +61,6 @@ class CoapOptionsTypes(IntEnum):
class CoapOptionHeader(object):
""" Class representing CoAP optiona header. """
def __init__(self, delta, length):
@@ -104,7 +103,6 @@ class CoapOptionHeader(object):
class CoapOption(object):
""" Class representing CoAP option. """
def __init__(self, _type, value):
@@ -120,13 +118,11 @@ class CoapOption(object):
return self._value
def __repr__(self):
return "CoapOption(type={}, value={})".format(
self.type, hexlify(self.value)
)
return "CoapOption(type={}, value={})".format(self.type,
hexlify(self.value))
class CoapOptionsFactory(object):
""" Factory that produces CoAP options. """
def parse(self, data, message_info):
@@ -148,7 +144,6 @@ class CoapOptionsFactory(object):
class CoapCode(object):
""" Class representing CoAP code. """
def __init__(self, code):
@@ -182,8 +177,7 @@ class CoapCode(object):
@property
def dotted(self):
return ".".join(
["{:01d}".format(self._class), "{:02d}".format(self.detail)]
)
["{:01d}".format(self._class), "{:02d}".format(self.detail)])
def __eq__(self, other):
if isinstance(other, int):
@@ -196,16 +190,14 @@ class CoapCode(object):
return self.code == other.code
else:
raise TypeError(
"Could not compare {} and {}".format(type(self), type(other))
)
raise TypeError("Could not compare {} and {}".format(
type(self), type(other)))
def __repr__(self):
return self.dotted
class CoapMessage(object):
""" Class representing CoAP message. """
def __init__(
@@ -266,21 +258,21 @@ class CoapMessage(object):
def __repr__(self):
options_str = ", ".join([repr(opt) for opt in self.options])
return ("CoapMessage(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={},",
"uri-path='{}')").format(
self.version,
CoapMessageType.name[self.type],
self.code,
self.message_id,
hexlify(self.token),
options_str,
self.payload,
self.uri_path,
)
return (
"CoapMessage(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={},",
"uri-path='{}')").format(
self.version,
CoapMessageType.name[self.type],
self.code,
self.message_id,
hexlify(self.token),
options_str,
self.payload,
self.uri_path,
)
class CoapMessageProxy(object):
""" Proxy class of CoAP message.
The main idea behind this class is to delay parsing payload. Due to architecture of the existing solution
@@ -299,8 +291,7 @@ class CoapMessageProxy(object):
self._message_info = message_info
self._mid_to_uri_path_binder = mid_to_uri_path_binder
self._uri_path_based_payload_factories = (
uri_path_based_payload_factories
)
uri_path_based_payload_factories)
@property
def version(self):
@@ -334,14 +325,12 @@ class CoapMessageProxy(object):
def payload(self):
try:
binded_uri_path = self._mid_to_uri_path_binder.get_uri_path_for(
self.message_id, self.token
)
self.message_id, self.token)
factory = self._uri_path_based_payload_factories[binded_uri_path]
return factory.parse(
io.BytesIO(self._coap_message.payload), self._message_info
)
return factory.parse(io.BytesIO(self._coap_message.payload),
self._message_info)
except RuntimeError:
return self._coap_message.payload
@@ -352,14 +341,21 @@ class CoapMessageProxy(object):
def __repr__(self):
options_str = ", ".join([repr(opt) for opt in self.options])
return ("CoapMessageProxy(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={},",
"uri-path='{}')").format(
self.version, self.type, self.code, self.message_id, hexlify(self.token), options_str, self.payload,
self.uri_path, )
return (
"CoapMessageProxy(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={},",
"uri-path='{}')").format(
self.version,
self.type,
self.code,
self.message_id,
hexlify(self.token),
options_str,
self.payload,
self.uri_path,
)
class CoapMessageIdToUriPathBinder:
""" Class binds message id and token with URI path. """
def __init__(self):
@@ -373,12 +369,11 @@ class CoapMessageIdToUriPathBinder:
return self._uri_path_binds[message_id][hexlify(token)]
except KeyError:
raise RuntimeError(
"Could not find URI PATH for message_id: {} and token: {}".format(
message_id, hexlify(token)))
"Could not find URI PATH for message_id: {} and token: {}".
format(message_id, hexlify(token)))
class CoapMessageFactory(object):
""" Factory that produces CoAP messages. """
def __init__(
@@ -389,8 +384,7 @@ class CoapMessageFactory(object):
):
self._options_factory = options_factory
self._uri_path_based_payload_factories = (
uri_path_based_payload_factories
)
uri_path_based_payload_factories)
self._mid_to_uri_path_binder = message_id_to_uri_path_binder
def _uri_path_from(self, options):
@@ -416,8 +410,7 @@ class CoapMessageFactory(object):
def parse(self, data, message_info):
version, _type, token_length = self._parse_initial_byte(
data, message_info
)
data, message_info)
code = CoapCode(ord(data.read(1)))
message_id = struct.unpack(">H", data.read(2))[0]
@@ -428,8 +421,7 @@ class CoapMessageFactory(object):
uri_path = self._uri_path_from(options)
if uri_path is not None:
self._mid_to_uri_path_binder.add_uri_path_for(
message_id, token, uri_path
)
message_id, token, uri_path)
coap_message = CoapMessage(
version,

Some files were not shown because too many files have changed in this diff Show More