[tests] remove v1.2 backbone and multicast scripts migrated to nexus (#12778)

This commit removes three Python-based thread-cert scripts for v1.2
backbone router and multicast registration testing:

- v1_2_test_backbone_router_service.py
- v1_2_test_multicast_listener_registration.py
- v1_2_test_multicast_registration.py

These tests have been fully migrated to the Nexus simulation
framework, providing equivalent verification in a more robust and
scalable environment.

The equivalent Nexus test cases are already part of the repository:
- 1_2_BBR_TC_1, 1_2_BBR_TC_2, 1_2_BBR_TC_3
- 1_2_MATN_TC_1, 1_2_MATN_TC_2, ..., 1_2_MATN_TC_26
This commit is contained in:
Jonathan Hui
2026-03-26 16:53:31 -05:00
committed by GitHub
parent cc31b64cc1
commit f5b89b7384
3 changed files with 0 additions and 1492 deletions
@@ -1,255 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2019, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import unittest
import thread_cert
import config
LEADER_1_1 = 1
BBR_1 = 2
BBR_2 = 3
WAIT_ATTACH = 5
WAIT_REDUNDANCE = 3
ROUTER_SELECTION_JITTER = 1
BBR_REGISTRATION_JITTER = 5
"""
Topology
LEADER_1_1 --- BBR_1
\ |
\ |
\ |
BBR_2
1) Bring up Leader_1_1 and then BBR_1, BBR_1 becomes Primary Backbone Router.
2) Reset BBR_1, if bring back soon, it could restore the Backbone Router Service
from the network, after increasing sequence number, it will reregister its
Backbone Router Service to the Leader and become Primary.
3) Reset BBR_1, if bring back after it is released in the network, BBR_1 will
choose a random sequence number, register its Backbone Router Service to
Leader and become Primary.
4) Configure BBR_2 with highest sequence number and explicitly trigger SRV_DATA.ntf.
BBR_2 would become Primary and BBR_1 would change to Secondary with sequence
number increased by 1.
a) Check communication via DUA.
5) Stop BBR_2, BBR_1 would become Primary after detecting there is no available
Backbone Router Service in Thread Network.
6) Bring back BBR_2, and it would become Secondary.
a) Check the uniqueness of DUA by comparing the one in above 4a).
b) Check communication via DUA.
"""
class TestBackboneRouterService(thread_cert.TestCase):
TOPOLOGY = {
LEADER_1_1: {
'version': '1.1',
'allowlist': [BBR_1, BBR_2],
},
BBR_1: {
'version': '1.2',
'allowlist': [LEADER_1_1, BBR_2],
'is_bbr': True
},
BBR_2: {
'version': '1.2',
'allowlist': [LEADER_1_1, BBR_1],
'is_bbr': True
},
}
"""All nodes are created with default configurations"""
def test(self):
self.nodes[LEADER_1_1].start()
WAIT_TIME = WAIT_ATTACH
self.simulator.go(WAIT_TIME * 2)
self.assertEqual(self.nodes[LEADER_1_1].get_state(), 'leader')
self.simulator.set_lowpan_context(1, config.DOMAIN_PREFIX)
# 1) First Backbone Router would become the Primary.
self.nodes[BBR_1].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[BBR_1].set_bbr_registration_jitter(BBR_REGISTRATION_JITTER)
self.nodes[BBR_1].set_backbone_router(seqno=1)
self.nodes[BBR_1].start()
WAIT_TIME = WAIT_ATTACH + ROUTER_SELECTION_JITTER
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_1].get_state(), 'router')
self.nodes[BBR_1].enable_backbone_router()
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_1].get_backbone_router_state(), 'Primary')
self.nodes[BBR_1].set_domain_prefix(config.DOMAIN_PREFIX)
WAIT_TIME = WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
# 2) Reset BBR_1 and bring it back soon.
# Verify that it restores Primary State with sequence number
# increased by 1.
self.nodes[BBR_1].reset()
self.nodes[BBR_1].set_bbr_registration_jitter(BBR_REGISTRATION_JITTER)
self.nodes[BBR_1].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[BBR_1].set_domain_prefix(config.DOMAIN_PREFIX)
self.nodes[BBR_1].enable_backbone_router()
self.nodes[BBR_1].start()
WAIT_TIME = WAIT_ATTACH + ROUTER_SELECTION_JITTER
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_1].get_state(), 'router')
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_1].get_backbone_router_state(), 'Primary')
self.assertEqual(self.nodes[BBR_1].get_backbone_router()['seqno'], 2)
# 3) Reset BBR_1 and bring it back after its original router id is released
# 200s (100s MaxNeighborAge + 90s InfiniteCost + 10s redundance)
# Verify it becomes Primary again.
# Note: To ensure test in next step, here Step 3) will repeat until
# the random sequence number is not the highest value 255.
while True:
self.nodes[BBR_1].reset()
WAIT_TIME = 200
self.simulator.go(WAIT_TIME)
self.nodes[BBR_1].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[BBR_1].set_bbr_registration_jitter(BBR_REGISTRATION_JITTER)
self.nodes[BBR_1].set_domain_prefix(config.DOMAIN_PREFIX)
self.nodes[BBR_1].enable_backbone_router()
self.nodes[BBR_1].start()
WAIT_TIME = config.ROUTER_RESET_DELAY
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_1].get_state(), 'router')
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_1].get_backbone_router_state(), 'Primary')
BBR_1_SEQNO = self.nodes[BBR_1].get_backbone_router()['seqno']
if (BBR_1_SEQNO != 255):
break
#4) Configure BBR_2 with highest sequence number (255) and
# explicitly trigger SRV_DATA.ntf.
# Verify BBR_2 would become Primary and BBR_1 would change to
# Secondary with sequence number increased by 1.
# Bring up BBR_2, it becomes Router with backbone function disabled
# by default.
self.nodes[BBR_2].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[BBR_2].set_bbr_registration_jitter(BBR_REGISTRATION_JITTER)
self.nodes[BBR_2].set_domain_prefix(config.DOMAIN_PREFIX)
self.nodes[BBR_2].start()
WAIT_TIME = WAIT_ATTACH + ROUTER_SELECTION_JITTER
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_2].get_state(), 'router')
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_2].get_backbone_router_state(), 'Disabled')
# Enable Backbone function, it will stay at Secondary state as
# there is Primary Backbone Router already.
# Here removes the Domain Prefix before enabling backbone function
# intentionally to avoid SRV_DATA.ntf due to prefix inconsistency.
self.nodes[BBR_2].remove_domain_prefix(config.DOMAIN_PREFIX)
self.nodes[BBR_2].enable_backbone_router()
self.nodes[BBR_2].set_backbone_router(seqno=255)
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_2].get_backbone_router_state(), 'Secondary')
# Check no SRV_DATA.ntf.
messages = self.simulator.get_messages_sent_by(BBR_2)
msg = messages.next_coap_message('0.02', '/a/sd', False)
self.assertIsNone(msg)
# Flush relative message queue.
self.flush_nodes([BBR_1])
# BBR_2 registers SRV_DATA.ntf explicitly.
self.nodes[BBR_2].register_backbone_router()
WAIT_TIME = WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_2].get_backbone_router_state(), 'Primary')
# Verify BBR_1 becomes Secondary and sends SRV_DATA.ntf to deregister
# its service.
messages = self.simulator.get_messages_sent_by(BBR_1)
messages.next_coap_message('0.02', '/a/sd', True)
self.assertEqual(self.nodes[BBR_1].get_backbone_router_state(), 'Secondary')
# Verify Sequence number increases when become Secondary from Primary.
if (BBR_1_SEQNO == 126) or (BBR_1_SEQNO == 127):
next_seqno = 0
elif (BBR_1_SEQNO == 254) or (BBR_1_SEQNO == 255):
next_seqno = 128
else:
next_seqno = BBR_1_SEQNO + 1
self.assertEqual(self.nodes[BBR_1].get_backbone_router()['seqno'], next_seqno)
# 4a) Check communication via DUA.
bbr2_dua = self.nodes[BBR_2].get_addr(config.DOMAIN_PREFIX)
self.assertTrue(self.nodes[BBR_1].ping(bbr2_dua))
# 5) Stop BBR_2, BBR_1 becomes Primary after detecting there is no
# available Backbone Router Service.
self.nodes[BBR_2].reset()
self.nodes[LEADER_1_1].release_router_id(self.nodes[BBR_2].get_router_id())
# Wait for the dissemination of Network Data without Backbone Router service
self.simulator.go(10)
# BBR_1 becomes Primary.
self.assertEqual(self.nodes[BBR_1].get_backbone_router_state(), 'Primary')
messages = self.simulator.get_messages_sent_by(BBR_1)
messages.next_coap_message('0.02', '/a/sd', True)
# 6) Bring back BBR_2.
# Verify that BBR_2 stays at Secondary.
self.nodes[BBR_2].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[BBR_2].set_bbr_registration_jitter(BBR_REGISTRATION_JITTER)
self.nodes[BBR_1].set_domain_prefix(config.DOMAIN_PREFIX)
self.nodes[BBR_2].enable_backbone_router()
self.nodes[BBR_2].interface_up()
self.nodes[BBR_2].thread_start()
WAIT_TIME = config.ROUTER_RESET_DELAY
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_2].get_state(), 'router')
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_2].get_backbone_router_state(), 'Secondary')
# 6a) Check the uniqueness of DUA by comparing the one in above 4a).
bbr2_dua2 = self.nodes[BBR_2].get_addr(config.DOMAIN_PREFIX)
self.assertEqual(bbr2_dua, bbr2_dua2)
# 6b) Check communication via DUA
self.assertTrue(self.nodes[BBR_1].ping(bbr2_dua))
if __name__ == '__main__':
unittest.main()
@@ -1,937 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import ipaddress
import logging
import time
import unittest
from typing import Union, List
import config
import network_layer
import thread_cert
logging.basicConfig(level=logging.DEBUG)
_, BBR_1, BBR_2, ROUTER_1_2, ROUTER_1_1, SED_1, MED_1, MED_2, FED_1 = range(9)
WAIT_ATTACH = 5
WAIT_REDUNDANCE = 3
ROUTER_SELECTION_JITTER = 1
BBR_REGISTRATION_JITTER = 5
SED_POLL_PERIOD = 1000 # 1s
REREG_DELAY = 10
MLR_TIMEOUT = 300
PARENT_AGGREGATE_DELAY = 5
MA1 = 'ff04::1234:777a:1'
MA1g = 'ff0e::1234:777a:1'
MA2 = 'ff05::1234:777a:2'
MA3 = 'ff0e::1234:777a:3'
MA4 = 'ff05::1234:777a:4'
MA5 = 'ff03::1234:777a:5'
MA6 = 'ff02::10'
"""
Initial topology:
BBR_1---BBR_2
| / \ |
| / \ |
ROUTER_1_2 ROUTER_1_1
| | \____ |
| | \ |
SED_1 MED_1/2 FED_1
"""
class TestMulticastListenerRegistration(thread_cert.TestCase):
TOPOLOGY = {
BBR_1: {
'version': '1.2',
'allowlist': [BBR_2, ROUTER_1_2],
'is_bbr': True,
},
BBR_2: {
'version': '1.2',
'allowlist': [BBR_1, ROUTER_1_2, ROUTER_1_1],
'is_bbr': True,
},
ROUTER_1_2: {
'version': '1.2',
'allowlist': [BBR_1, BBR_2, SED_1, MED_1, MED_2, FED_1],
},
ROUTER_1_1: {
'version': '1.1',
'allowlist': [BBR_2],
},
MED_1: {
'mode': 'rn',
'version': '1.2',
'allowlist': [ROUTER_1_2],
'timeout': config.DEFAULT_CHILD_TIMEOUT,
},
MED_2: {
'mode': 'rn',
'version': '1.2',
'allowlist': [ROUTER_1_2],
'timeout': config.DEFAULT_CHILD_TIMEOUT,
},
SED_1: {
'mode': 'n',
'version': '1.2',
'allowlist': [ROUTER_1_2],
'timeout': config.DEFAULT_CHILD_TIMEOUT,
},
FED_1: {
'mode': 'rdn',
'version': '1.2',
'allowlist': [ROUTER_1_2],
'router_eligible': False,
'timeout': config.DEFAULT_CHILD_TIMEOUT,
},
}
"""All nodes are created with default configurations"""
def _bootstrap(self, bbr_1_enable_backbone_router=True, turn_on_bbr_2=True, turn_on_router_1_1=True):
assert (turn_on_bbr_2 or not turn_on_router_1_1) # ROUTER_1_1 needs BBR_2
# starting context id
t0 = time.time()
context_id = 1
# Bring up BBR_1, BBR_1 becomes Leader and Primary Backbone Router
self.nodes[BBR_1].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[BBR_1].set_bbr_registration_jitter(BBR_REGISTRATION_JITTER)
self.nodes[BBR_1].set_backbone_router(seqno=1, reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT)
self.nodes[BBR_1].start()
WAIT_TIME = WAIT_ATTACH + ROUTER_SELECTION_JITTER
self.simulator.go(WAIT_TIME * 2)
self.assertEqual(self.nodes[BBR_1].get_state(), 'leader')
if bbr_1_enable_backbone_router:
self.nodes[BBR_1].enable_backbone_router()
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_1].get_backbone_router_state(), 'Primary')
self.pbbr_seq = 1
self.pbbr_id = BBR_1
if turn_on_bbr_2:
# Bring up BBR_2, BBR_2 becomes Router and Secondary Backbone Router
self.nodes[BBR_2].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[BBR_2].set_bbr_registration_jitter(BBR_REGISTRATION_JITTER)
self.nodes[BBR_2].set_backbone_router(seqno=2, reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT)
self.nodes[BBR_2].start()
WAIT_TIME = WAIT_ATTACH + ROUTER_SELECTION_JITTER
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_2].get_state(), 'router')
self.nodes[BBR_2].enable_backbone_router()
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_2].get_backbone_router_state(), 'Secondary')
self.simulator.set_lowpan_context(context_id, config.DOMAIN_PREFIX)
domain_prefix_cid = context_id
# Bring up ROUTER_1_2
self.nodes[ROUTER_1_2].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[ROUTER_1_2].start()
WAIT_TIME = WAIT_ATTACH + ROUTER_SELECTION_JITTER
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[ROUTER_1_2].get_state(), 'router')
if turn_on_router_1_1:
# Bring up ROUTER_1_1
self.nodes[ROUTER_1_1].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[ROUTER_1_1].start()
WAIT_TIME = WAIT_ATTACH + ROUTER_SELECTION_JITTER
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[ROUTER_1_1].get_state(), 'router')
# Bring up FED_1
self.nodes[FED_1].start()
self.simulator.go(WAIT_ATTACH)
self.assertEqual(self.nodes[FED_1].get_state(), 'child')
# Bring up MED_1
self.nodes[MED_1].start()
self.simulator.go(WAIT_ATTACH)
self.assertEqual(self.nodes[MED_1].get_state(), 'child')
# Bring up MED_2
self.nodes[MED_2].start()
self.simulator.go(WAIT_ATTACH)
self.assertEqual(self.nodes[MED_2].get_state(), 'child')
# Bring up SED_1
self.nodes[SED_1].set_pollperiod(SED_POLL_PERIOD)
self.nodes[SED_1].start()
self.simulator.go(WAIT_ATTACH)
self.assertEqual(self.nodes[SED_1].get_state(), 'child')
logging.info("bootstrap takes %f seconds", time.time() - t0)
def test(self):
self._bootstrap()
# Verify MLR.req for each device when parent is 1.2
self.__check_mlr_ok(ROUTER_1_2, is_ftd=True)
self.__check_mlr_ok(FED_1, is_ftd=True)
self.__check_mlr_ok(MED_1, is_ftd=False)
self.__check_mlr_ok(SED_1, is_ftd=False)
# Switch to parent 1.1
self.__switch_to_1_1_parent()
# Verify MLR.req for each device when parent is 1.1
self.__check_mlr_ok(FED_1, is_ftd=True, is_parent_1p1=True)
self.__check_mlr_ok(MED_1, is_ftd=False, is_parent_1p1=True)
self.__check_mlr_ok(SED_1, is_ftd=False, is_parent_1p1=True)
# Switch to parent 1.2
self.__switch_to_1_2_parent()
def testParentMergeMedMlrReq(self):
self._bootstrap()
# Make sure Parent registers multiple MAs of MED Children in one MLR.req
self.__check_parent_merge_med_mlr_req([MED_1, MED_2], ROUTER_1_2)
def testNotSendMlrReqIfSubscribed(self):
self._bootstrap()
# Make sure Parent does not send MLR.req of Child if it's already subscribed by Netif or other Children
self.__check_not_send_mlr_req_if_subscribed([MED_1, MED_2], ROUTER_1_2)
def testIpmaddrAddBeforeBBREnable(self):
self._bootstrap(bbr_1_enable_backbone_router=False, turn_on_bbr_2=False, turn_on_router_1_1=False)
self.flush_all()
# Subscribing to MAs when there is no PBBR should not send MLR.req
self.nodes[ROUTER_1_2].add_ipmaddr("ff04::1")
self.nodes[FED_1].add_ipmaddr("ff04::2")
self.nodes[MED_1].add_ipmaddr("ff04::3")
self.nodes[SED_1].add_ipmaddr("ff04::4")
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
router_reg = ["ff04::1", "ff04::2", "ff04::3", "ff04::4"]
self.__check_send_mlr_req(ROUTER_1_2, router_reg, should_send=False)
self.flush_all()
# Turn on PBBR
self.nodes[BBR_1].enable_backbone_router()
WAIT_TIME = BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[BBR_1].get_backbone_router_state(), 'Primary')
self.simulator.go(REREG_DELAY)
# Expect MLR.req sent by ROUTER_1_2 and FED_1
self.__check_send_mlr_req(ROUTER_1_2, router_reg, should_send=True, expect_mlr_rsp=True)
def testMulticastListenersTableAdd(self):
self._bootstrap()
self.__test_multicast_listeners_table_add()
def testMulticastListenersTableExpire(self):
self._bootstrap()
self.__test_multicast_listeners_table_expire()
def testMulticastListenersTableFull(self):
self._bootstrap()
self.__test_multicast_listeners_table_full()
def testMulticastListenersTableTwoFreeSlot(self):
self._bootstrap()
self.__test_multicast_listeners_table_two_free_slots()
def testMulticastListenerTableAPI(self):
self._bootstrap()
self.__test_multicast_listeners_table_api()
def testMlrConfigResponse(self):
self._bootstrap()
self.__test_mlr_config_response()
def __test_mlr_config_response(self):
bbr = self.nodes[BBR_1]
router = self.nodes[ROUTER_1_2]
self.flush_all()
# Configure next response to 0
bbr.set_next_mlr_response(0)
router.add_ipmaddr("ff04::1")
self.simulator.go(WAIT_REDUNDANCE)
self.__check_send_mlr_req(ROUTER_1_2, ["ff04::1"],
should_send=True,
expect_mlr_rsp=True,
expect_mlr_rsp_status=0)
self.assertNotIn(ipaddress.IPv6Address("ff04::1"), bbr.multicast_listener_list())
router.del_ipmaddr("ff04::1")
self.simulator.go(WAIT_REDUNDANCE)
self.flush_all()
# Configure next response to 2
bbr.set_next_mlr_response(2)
router.add_ipmaddr("ff04::2")
self.simulator.go(WAIT_REDUNDANCE)
self.__check_send_mlr_req(ROUTER_1_2, ["ff04::2"],
should_send=True,
expect_mlr_rsp=True,
expect_mlr_rsp_status=2)
router.del_ipmaddr("ff04::2")
self.simulator.go(WAIT_REDUNDANCE)
self.flush_all()
# Configure next response to 4
bbr.set_next_mlr_response(4)
router.add_ipmaddr("ff04::4")
self.simulator.go(WAIT_REDUNDANCE)
self.__check_send_mlr_req(ROUTER_1_2, ["ff04::4"],
should_send=True,
expect_mlr_rsp=True,
expect_mlr_rsp_status=4)
# The MA should be eventually registered after reregistration
self.simulator.go(REREG_DELAY + WAIT_REDUNDANCE)
self.assertIn(ipaddress.IPv6Address("ff04::4"), bbr.multicast_listener_list())
router.del_ipmaddr("ff04::4")
self.simulator.go(WAIT_REDUNDANCE)
self.flush_all()
def testCommissionerRegisterMulticastListeners(self):
self._bootstrap()
# Use ROUTER_1_2 as the Commissioner
commissioiner = self.nodes[ROUTER_1_2]
self.assertEqual((0, []), commissioiner.register_multicast_listener("ff04::1"))
commissioiner.commissioner_start()
self.simulator.go(10)
# Now the Commissioner should be able to register MAs
for ip in ["ff04::1", "ff04::2"]:
status, failed_ips = commissioiner.register_multicast_listener(ip)
self.assertTrue(status == 0 and not failed_ips)
self.__check_multicast_listener(ip, expect_mlr_timeout_range=[290, 300])
# Register existing MA with a new timeout should be able to update the timeout
for ip in ["ff04::1", "ff04::2"]:
status, failed_ips = commissioiner.register_multicast_listener(ip, timeout=1000)
self.assertTrue(status == 0 and not failed_ips)
self.__check_multicast_listener(ip, expect_mlr_timeout_range=[990, 1000])
# Register MAs with given timeouts
for ip, timeout in [("ff05::1", 400), ("ff05::2", 500), ("ff05::3", 600)]:
status, failed_ips = commissioiner.register_multicast_listener(ip, timeout=timeout)
self.assertTrue(status == 0 and not failed_ips)
self.__check_multicast_listener(ip, expect_mlr_timeout_range=[timeout - 10, timeout])
# Register multiple MAs with one call
ips = ["ff05::4", "ff05::5", "ff05::6"]
status, failed_ips = commissioiner.register_multicast_listener(*ips, timeout=700)
self.assertTrue(status == 0 and not failed_ips)
for ip in ips:
self.__check_multicast_listener(ip, expect_mlr_timeout_range=[690, 700])
# Register multiple MAs with one call (without timeout)
ips = ["ff05::7", "ff05::8", "ff05::9"]
status, failed_ips = commissioiner.register_multicast_listener(*ips)
self.assertTrue(status == 0 and not failed_ips)
for ip in ips:
self.__check_multicast_listener(ip, expect_mlr_timeout_range=[290, 300])
# Unregister MAs using timeout=0
for ip in ["ff05::1", "ff05::2", "ff05::3"]:
status, failed_ips = commissioiner.register_multicast_listener(ip, timeout=0)
self.assertTrue(status == 0 and not failed_ips)
self.__check_multicast_listener(ip, expect_not_present=True)
# Unregister multiple MAs
ips = ["ff05::4", "ff05::5", "ff05::6"]
status, failed_ips = commissioiner.register_multicast_listener(*ips, timeout=0)
self.assertTrue(status == 0 and not failed_ips)
self.__check_multicast_listener(ip, expect_not_present=True)
# Remove MAs that are not subscribed should not fail
ips = ["ff06::1", "ff02::1", "2001::1"]
status, failed_ips = commissioiner.register_multicast_listener(*ips, timeout=0)
self.assertTrue(status == 0 and not failed_ips)
self.__check_multicast_listener(ip, expect_not_present=True)
# Register invalid MAs should fail
for ip in ["ff02::1", "ff03::1", "2001::1"]:
status, failed_ips = commissioiner.register_multicast_listener(ip)
self.assertEqual(status, 2)
self.assertEqual(set(failed_ips), {ipaddress.IPv6Address(ip)})
self.__check_multicast_listener(ip, expect_not_present=True)
# Register valid MAs with invalid MAs should succeed partially
ips = ["ff05::1", "ff05::2", "ff02::1", "2001::1"]
status, failed_ips = commissioiner.register_multicast_listener(*ips)
self.assertEqual(status, 2)
self.assertEqual(set(failed_ips), {ipaddress.IPv6Address("ff02::1"), ipaddress.IPv6Address("2001::1")})
self.__check_multicast_listener("ff05::1")
self.__check_multicast_listener("ff05::2")
self.__check_multicast_listener("ff02::1", expect_not_present=True)
self.__check_multicast_listener("2001::1", expect_not_present=True)
# Registering persistent MAs should fail for now
status, failed_ips = commissioiner.register_multicast_listener("ff06::1", timeout=0xffffffff)
self.assertEqual(status, 3)
# "ff06::1" should not be included in failed IPs because all IPs failed
self.assertTrue(not failed_ips == 0)
def __check_multicast_listener(self, *addrs, expect_mlr_timeout_range=None, expect_not_present=False):
addrs = map(ipaddress.IPv6Address, addrs)
listeners = self.nodes[BBR_1].multicast_listener_list()
logging.info("__check_multicast_listener get listeners: %s", listeners)
for addr in addrs:
if not expect_not_present:
self.assertIn(addr, listeners)
if expect_mlr_timeout_range is not None:
self.assertGreaterEqual(listeners[addr], expect_mlr_timeout_range[0])
self.assertLessEqual(listeners[addr], expect_mlr_timeout_range[1])
else:
self.assertNotIn(addr, listeners)
def __test_multicast_listeners_table_api(self):
self.assertTrue(self.nodes[BBR_1].multicast_listener_list() == {})
self.nodes[BBR_1].multicast_listener_add("ff04::1")
self.assertEqual(1, len(self.nodes[BBR_1].multicast_listener_list()))
self.nodes[BBR_1].multicast_listener_add("ff04::2", 300)
self.assertEqual(2, len(self.nodes[BBR_1].multicast_listener_list()))
self.nodes[BBR_1].multicast_listener_clear()
self.assertTrue(self.nodes[BBR_1].multicast_listener_list() == {})
def __test_multicast_listeners_table_add(self):
self.assertTrue(self.nodes[BBR_1].multicast_listener_list() == {})
all_mas = set()
CHECK_LIST = [(ROUTER_1_2, "ff04::1"), (FED_1, "ff04::2"), (MED_1, "ff04::3"), (SED_1, "ff04::4")]
for id, ip in CHECK_LIST:
self.nodes[id].add_ipmaddr(ip)
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
all_mas.add(ipaddress.IPv6Address(ip))
self.assertEqual(all_mas, set(self.nodes[BBR_1].multicast_listener_list().keys()))
# restore
for id, ip in CHECK_LIST:
self.nodes[id].del_ipmaddr(ip)
self.simulator.go(WAIT_REDUNDANCE)
def __test_multicast_listeners_table_expire(self):
self.assertEqual({}, self.nodes[BBR_1].multicast_listener_list())
all_mas = set()
CHECK_LIST = [(ROUTER_1_2, "ff04::1"), (FED_1, "ff04::2"), (MED_1, "ff04::3"), (SED_1, "ff04::4")]
for id, ip in CHECK_LIST:
self.nodes[id].add_ipmaddr(ip)
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
all_mas.add(ipaddress.IPv6Address(ip))
self.assertEqual(all_mas, set(self.nodes[BBR_1].multicast_listener_list().keys()))
# remove MAs from nodes, and wait for Multicast Listeners to expire on BBR_1
for id, ip in CHECK_LIST:
self.nodes[id].del_ipmaddr(ip)
# Wait for MLR_TIMEOUT/3, and expect Multicast Listeners not to expire.
self.simulator.go(MLR_TIMEOUT / 3)
self.assertEqual(all_mas, set(self.nodes[BBR_1].multicast_listener_list().keys()))
# Wait for MLR_TIMEOUT*2/3, and expect all Multicast Listeners to expire.
self.simulator.go(MLR_TIMEOUT * 2 / 3 + WAIT_REDUNDANCE)
self.assertEqual({}, self.nodes[BBR_1].multicast_listener_list())
def __test_multicast_listeners_table_full(self):
self.assertTrue(self.nodes[BBR_1].multicast_listener_list() == {})
table = set()
# Add to Multicast Listeners Table until it's full
for i in range(1, 76):
self.nodes[BBR_1].multicast_listener_add(f"ff04::{i}")
table.add(ipaddress.IPv6Address(f"ff04::{i}"))
self.assertEqual(table, set(self.nodes[BBR_1].multicast_listener_list().keys()))
# Add when Multicast Listeners Table is full should not succeed
self.nodes[BBR_1].multicast_listener_add(f"ff05::1")
self.assertEqual(table, set(self.nodes[BBR_1].multicast_listener_list().keys()))
self.flush_all()
# Expect PBBR to respond with MLR_NO_RESOURCES Multicast Listeners Table when it's full
self.nodes[ROUTER_1_2].add_ipmaddr("ff06::1")
self.simulator.go(WAIT_REDUNDANCE)
self.__check_send_mlr_req(ROUTER_1_2,
"ff06::1",
should_send=True,
expect_mlr_rsp=True,
expect_mlr_rsp_status=4)
self.assertEqual(table, set(self.nodes[BBR_1].multicast_listener_list().keys()))
self.nodes[MED_1].add_ipmaddr("ff06::2")
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(ROUTER_1_2,
"ff06::2",
should_send=True,
expect_mlr_rsp=True,
expect_mlr_rsp_status=4)
self.assertEqual(table, set(self.nodes[BBR_1].multicast_listener_list().keys()))
# the ROUTER_1_2 should be resending both ff06::1 and ff06::2
for i in range(3):
self.simulator.go(REREG_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(ROUTER_1_2, ['ff06::1', 'ff06::2'],
should_send=True,
expect_mlr_rsp=True,
expect_mlr_rsp_status=4)
# Restore
self.nodes[ROUTER_1_2].del_ipmaddr("ff06::1")
self.nodes[MED_1].del_ipmaddr("ff06::2")
self.simulator.go(WAIT_REDUNDANCE)
def __test_multicast_listeners_table_two_free_slots(self):
# Add to Multicast Listeners Table until there is only two free slots
for i in range(1, 74):
self.nodes[BBR_1].multicast_listener_add(f"ff04::{i}")
self.assertEqual(73, len(self.nodes[BBR_1].multicast_listener_list()))
self.nodes[MED_1].add_ipmaddr("ff05::1")
self.nodes[MED_1].add_ipmaddr("ff05::2")
self.nodes[MED_2].add_ipmaddr("ff05::3")
self.nodes[MED_2].add_ipmaddr("ff05::4")
self.nodes[SED_1].add_ipmaddr("ff05::5")
self.nodes[SED_1].add_ipmaddr("ff05::6")
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(ROUTER_1_2, ["ff05::1", "ff05::2", "ff05::3", "ff05::4", "ff05::5", "ff05::6"])
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.flush_all()
# two addresses should be registered, others can not
for i in range(3):
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.assertEqual(4, len(set(self.__get_registered_MAs(ROUTER_1_2))))
# Restore
self.nodes[MED_1].del_ipmaddr("ff05::1")
self.nodes[MED_1].del_ipmaddr("ff05::2")
self.nodes[MED_2].del_ipmaddr("ff05::3")
self.nodes[MED_2].del_ipmaddr("ff05::4")
self.nodes[SED_1].del_ipmaddr("ff05::5")
self.nodes[SED_1].del_ipmaddr("ff05::6")
def __check_mlr_ok(self, id, is_ftd, is_parent_1p1=False):
"""Check if MLR works for the node"""
# Add MA1 and send MLR.req
logging.info("======== checking MLR: Node%d (%s), Parent=%s ========" %
(id, 'FTD' if is_ftd else 'MTD', '1.1' if is_parent_1p1 else '1.2'))
expect_mlr_req = is_ftd or is_parent_1p1
if id == ROUTER_1_2:
parent_id = None
else:
parent_id = ROUTER_1_1 if is_parent_1p1 else ROUTER_1_2
for addr in [MA1, MA1g, MA2, MA3, MA4]:
self.__check_ipmaddr_add(id,
parent_id,
addr,
expect_mlr_req=expect_mlr_req,
expect_mlr_req_proxied=(not expect_mlr_req))
for addr in [MA5, MA6]:
self.__check_ipmaddr_add(id, parent_id, addr, expect_mlr_req=False, expect_mlr_req_proxied=False)
logging.info('=' * 120)
def __check_ipmaddr_add(self, id, parent_id, addr, expect_mlr_req=True, expect_mlr_req_proxied=False):
"""Check MLR works for the added multicast address"""
logging.info("Node %d: ipmaddr %s" % (id, addr))
self.flush_all()
self.nodes[id].add_ipmaddr(addr)
self.assertTrue(self.nodes[id].has_ipmaddr(addr))
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(id, addr, should_send=expect_mlr_req, expect_mlr_rsp=expect_mlr_req)
# Parent should either forward or proxy the MLR.req
if parent_id:
self.__check_send_mlr_req(parent_id,
addr,
should_send=expect_mlr_req or expect_mlr_req_proxied,
expect_mlr_rsp=expect_mlr_req_proxied)
self.__check_rereg(id,
parent_id,
addr,
expect_mlr_req=expect_mlr_req,
expect_mlr_req_proxied=expect_mlr_req_proxied)
self.__check_renewing(id,
parent_id,
addr,
expect_mlr_req=expect_mlr_req,
expect_mlr_req_proxied=expect_mlr_req_proxied)
self.nodes[id].del_ipmaddr(addr)
self.simulator.go(1)
def __check_send_mlr_req(self,
id,
addrs: Union[List[str], str],
should_send=True,
expect_mlr_rsp=False,
expect_mlr_rsp_status=0,
expect_mlr_req_num=None,
expect_unique_reg=False):
if isinstance(addrs, str):
addrs = [addrs]
message_ids = []
reg_mas = self.__get_registered_MAs(id, expect_mlr_req_num=expect_mlr_req_num, message_ids=message_ids)
if should_send:
for addr in addrs:
self.assertIn(ipaddress.IPv6Address(addr), reg_mas)
if expect_unique_reg:
self.assertEqual(1, reg_mas.count(ipaddress.IPv6Address(addr)))
# BBR should send MLR.rsp ACK
if expect_mlr_rsp:
message_id = message_ids[0]
rsp = self.__expect_MLR_rsp(message_id)
logging.info('MLR.rsp %s uri_path=%s, payload=%s', rsp, rsp.coap.uri_path, rsp.coap.payload)
statusTlv = None
for tlv in rsp.coap.payload:
if isinstance(tlv, network_layer.Status):
statusTlv = tlv
break
self.assertIsNotNone(statusTlv)
self.assertEqual(expect_mlr_rsp_status, statusTlv.status)
else:
for addr in addrs:
self.assertNotIn(ipaddress.IPv6Address(addr), reg_mas)
def __expect_MLR_rsp(self, message_id):
logging.info("Expecting MLR.rsp with message ID = %s", message_id)
messages = self.simulator.get_messages_sent_by(self.pbbr_id)
logging.info("PBBR %d messages: %s", self.pbbr_id, messages)
while True:
msg = messages.next_coap_message('2.04')
logging.info('Check ACK for %s: %s, %s, %s, %s', message_id, msg, msg.coap.message_id, msg.coap.uri_path,
msg.coap.payload)
if msg.coap.message_id == message_id:
return msg
def __get_registered_MAs(self, id, expect_mlr_req_num=None, message_ids=None):
"""Get MAs registered via MLR.req by the node"""
messages = self.simulator.get_messages_sent_by(id)
reg_mas = []
while True:
msg = messages.next_coap_message('0.02', '/n/mr', assert_enabled=False)
if not msg:
break
logging.info("MLR.req: %s %s" % (msg.coap.message_id, msg.coap.payload))
addrs = msg.get_coap_message_tlv(network_layer.IPv6Addresses)
reg_mas.append(addrs)
if message_ids is not None:
message_ids.append(msg.coap.message_id)
logging.info('Node %d registered MAs: %s' % (id, reg_mas))
if expect_mlr_req_num is not None:
self.assertEqual(len(reg_mas), expect_mlr_req_num)
# expand from [[...], [...], ...] to [...]
reg_mas = [ma for mas in reg_mas for ma in mas]
return reg_mas
def __check_renewing(self, id, parent_id, addr, expect_mlr_req=True, expect_mlr_req_proxied=False):
"""Check if MLR works that a node can renew it's registered MAs"""
self.assertEqual(self.pbbr_id, BBR_1)
self.flush_all()
self.simulator.go(MLR_TIMEOUT + WAIT_REDUNDANCE)
self.__check_send_mlr_req(id, addr, should_send=expect_mlr_req, expect_mlr_rsp=expect_mlr_req)
# Parent should either forward or proxy the MLR.req
if parent_id:
self.__check_send_mlr_req(parent_id,
addr,
should_send=expect_mlr_req or expect_mlr_req_proxied,
expect_mlr_rsp=expect_mlr_req_proxied)
def __check_rereg(self, id, parent_id, addr, expect_mlr_req=True, expect_mlr_req_proxied=False):
"""Check if MLR works that a node can do MLR reregistration when necessary"""
self.__check_rereg_seqno(id,
parent_id,
addr,
expect_mlr_req=expect_mlr_req,
expect_mlr_req_proxied=expect_mlr_req_proxied)
self.__check_rereg_pbbr_change(id,
parent_id,
addr,
expect_mlr_req=expect_mlr_req,
expect_mlr_req_proxied=expect_mlr_req_proxied)
def __check_rereg_seqno(self, id, parent_id, addr, expect_mlr_req=True, expect_mlr_req_proxied=False):
"""Check if MLR works that a node can do MLR reregistration when PBBR seqno changes"""
# Change seq on PBBR and expect MLR.req within REREG_DELAY
self.flush_all()
self.pbbr_seq = (self.pbbr_seq + 10) % 256
self.nodes[BBR_1].set_backbone_router(seqno=self.pbbr_seq)
self.simulator.go(REREG_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(id, addr, should_send=expect_mlr_req, expect_mlr_rsp=expect_mlr_req)
# Parent should either forward or proxy the MLR.req
if parent_id:
self.__check_send_mlr_req(parent_id,
addr,
should_send=expect_mlr_req or expect_mlr_req_proxied,
expect_mlr_rsp=expect_mlr_req_proxied)
def __check_rereg_pbbr_change(self, id, parent_id, addr, expect_mlr_req=True, expect_mlr_req_proxied=False):
"""Check if MLR works that a node can do MLR reregistration when PBBR changes"""
# Make BBR_2 to be Primary and expect MLR.req within REREG_DELAY
self.assertEqual(self.pbbr_id, BBR_1)
self.flush_all()
self.nodes[BBR_1].disable_backbone_router()
self.simulator.go(BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE)
self.assertEqual(self.nodes[BBR_2].get_backbone_router_state(), 'Primary')
self.pbbr_id = BBR_2
self.simulator.go(REREG_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(id, addr, should_send=expect_mlr_req, expect_mlr_rsp=expect_mlr_req)
# Parent should either forward or proxy the MLR.req
if parent_id:
self.__check_send_mlr_req(parent_id,
addr,
should_send=expect_mlr_req or expect_mlr_req_proxied,
expect_mlr_rsp=expect_mlr_req_proxied)
# Restore BBR_1 to be Primary and BBR_2 to be Secondary
self.nodes[BBR_2].disable_backbone_router()
self.nodes[BBR_1].enable_backbone_router()
self.simulator.go(BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE)
self.assertEqual(self.nodes[BBR_1].get_backbone_router_state(), 'Primary')
self.nodes[BBR_2].enable_backbone_router()
self.simulator.go(BBR_REGISTRATION_JITTER + WAIT_REDUNDANCE)
self.assertEqual(self.nodes[BBR_2].get_backbone_router_state(), 'Secondary')
self.pbbr_id = BBR_1
def __switch_to_1_1_parent(self):
"""Check if MLR works when nodes are switching to a 1.1 parent"""
# Add MA1 to EDs to prepare for parent switching
logging.info("=" * 10 + " switching to 1.1 parent " + '=' * 10)
self.flush_all()
self.nodes[FED_1].add_ipmaddr(MA1)
self.nodes[MED_1].add_ipmaddr(MA1)
self.nodes[SED_1].add_ipmaddr(MA1)
self.simulator.go(REREG_DELAY + WAIT_REDUNDANCE)
self.assertIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(FED_1))
self.assertNotIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(MED_1))
self.assertNotIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(SED_1))
self.flush_all()
# Turn off Router 1.2 and turn on Router 1.1
self.nodes[ROUTER_1_2].stop()
for id in [FED_1, MED_1, SED_1]:
self.nodes[ROUTER_1_1].add_allowlist(self.nodes[id].get_addr64())
self.nodes[id].add_allowlist(self.nodes[ROUTER_1_1].get_addr64())
self.simulator.go(config.DEFAULT_CHILD_TIMEOUT + WAIT_REDUNDANCE)
self.assertEqual(self.nodes[id].get_state(), 'child')
self.assertEqual(self.nodes[id].get_router_id(), self.nodes[ROUTER_1_1].get_router_id())
self.simulator.go(REREG_DELAY + WAIT_REDUNDANCE)
# Verify all FED send MLR.req within REREG_DELAY when parent is 1.1
self.assertIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(FED_1))
self.assertIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(MED_1))
self.assertIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(SED_1))
self.nodes[FED_1].del_ipmaddr(MA1)
self.nodes[MED_1].del_ipmaddr(MA1)
self.nodes[SED_1].del_ipmaddr(MA1)
self.simulator.go(WAIT_REDUNDANCE)
def __switch_to_1_2_parent(self):
"""Check if MLR works when nodes are switching to a 1.2 parent"""
# Add MA1 to EDs to prepare for parent switching
logging.info("=" * 10, "switching to 1.2 parent", '=' * 10)
self.flush_all()
self.nodes[FED_1].add_ipmaddr(MA1)
self.nodes[MED_1].add_ipmaddr(MA1)
self.nodes[SED_1].add_ipmaddr(MA1)
self.simulator.go(REREG_DELAY + WAIT_REDUNDANCE)
self.assertIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(FED_1))
self.assertIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(MED_1))
self.assertIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(SED_1))
self.flush_all()
# Turn off Router 1.1 and turn on Router 1.2
self.nodes[ROUTER_1_1].stop()
self.nodes[ROUTER_1_2].start()
self.simulator.go(config.ROUTER_RESET_DELAY)
for id in [FED_1, MED_1, SED_1]:
self.simulator.go(config.DEFAULT_CHILD_TIMEOUT + WAIT_REDUNDANCE)
self.assertEqual(self.nodes[id].get_state(), 'child')
self.assertEqual(self.nodes[id].get_router_id(), self.nodes[ROUTER_1_2].get_router_id())
self.simulator.go(REREG_DELAY + WAIT_REDUNDANCE)
# Verify only FTD sends MLR.req within REREG_DELAY when parent is 1.2
self.assertIn(ipaddress.IPv6Address(MA1), self.__get_registered_MAs(FED_1))
# MED and SED might still send MLR.req during this period because it could be sending to it's 1.2 parent.
self.nodes[FED_1].del_ipmaddr(MA1)
self.nodes[MED_1].del_ipmaddr(MA1)
self.nodes[SED_1].del_ipmaddr(MA1)
self.simulator.go(WAIT_REDUNDANCE)
def __check_parent_merge_med_mlr_req(self, meds, parent_id):
"""Check that the 1.2 parent merge multiple multicast addresses for MED children."""
self.flush_all()
for med in meds:
self.nodes[med].add_ipmaddr(MA1)
self.nodes[meds[0]].add_ipmaddr(MA2)
self.nodes[meds[1]].add_ipmaddr(MA3)
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(parent_id, [MA1, MA2, MA3],
should_send=True,
expect_mlr_rsp=True,
expect_mlr_req_num=1,
expect_unique_reg=True)
# restore
self.nodes[meds[0]].del_ipmaddr(MA2)
self.nodes[meds[1]].del_ipmaddr(MA3)
for med in meds:
self.nodes[med].del_ipmaddr(MA1)
self.simulator.go(WAIT_REDUNDANCE)
def __check_not_send_mlr_req_if_subscribed(self, meds, parent_id):
"""Check that the 1.2 parent does not send MLR.req if the MA is already subscribed."""
# Parent should register MA1 on Netif
self.flush_all()
self.nodes[parent_id].add_ipmaddr(MA1)
self.simulator.go(WAIT_REDUNDANCE)
self.__check_send_mlr_req(parent_id, MA1, should_send=True, expect_mlr_rsp=True)
# Parent should not register MA1 of Child 1 because it's already registered
self.flush_all()
self.nodes[meds[0]].add_ipmaddr(MA1)
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(parent_id, MA1, should_send=False)
# Parent should register MA2 of Child 1 because it's new
self.flush_all()
self.nodes[meds[0]].add_ipmaddr(MA2)
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(parent_id, MA2, should_send=True)
# Parent should not register MA2 of Child 2 because it's already registered for Child 1
self.flush_all()
self.nodes[meds[1]].add_ipmaddr(MA2)
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(parent_id, MA2, should_send=False)
# Parent should register MA3 of Child 2 because it's new
self.flush_all()
self.nodes[meds[1]].add_ipmaddr(MA3)
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(parent_id, MA3, should_send=True)
# Parent should not register MA2 and MA3 because they are already registered for Child2 itself
self.flush_all()
self.nodes[meds[1]].del_ipmaddr(MA2)
self.nodes[meds[1]].del_ipmaddr(MA3)
self.nodes[meds[1]].add_ipmaddr(MA2)
self.nodes[meds[1]].add_ipmaddr(MA3)
self.simulator.go(PARENT_AGGREGATE_DELAY + WAIT_REDUNDANCE)
self.__check_send_mlr_req(parent_id, [MA2, MA3], should_send=False)
# Restore
self.nodes[parent_id].del_ipmaddr(MA1)
self.nodes[meds[0]].del_ipmaddr(MA1)
self.nodes[meds[0]].del_ipmaddr(MA2)
self.nodes[meds[1]].del_ipmaddr(MA2)
self.nodes[meds[1]].del_ipmaddr(MA3)
self.simulator.go(WAIT_REDUNDANCE)
if __name__ == '__main__':
unittest.main()
@@ -1,300 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import unittest
import command
import config
import mle
import thread_cert
LEADER_1_2 = 1
MED_1_2 = 2
SED_1_2 = 3
MED_1_1 = 4
SED_1_1 = 5
ROUTER_1_1 = 6
MED_1_2_2 = 7
SED_1_2_2 = 8
WAIT_ATTACH = 5
WAIT_REDUNDANCE = 3
ROUTER_SELECTION_JITTER = 1
MA1_LINKLOCAL = 'ff02::1:2:3:4'
MA2_ADMINSCOPE = 'ff04::1:2:3:4'
"""
Topology
SED_1_2
|
|
MED_1_2 --- LEADER_1_2 --- MED_1_1
| \
| \
SED_1_1 ROUTER_1_1 --- MED_1_2_2
|
|
SED_1_2_2
1) Bring up Leader_1_2.
2) Bring up MED_1_2, which attaches to Thread 1.2 parent, only register MA with scope larger than realm local.
a) add MA1_LINKLOCAL which would NOT be registered in AddressRegistrationTLV of Child Update Request.
b) add MA2_ADMINSCOPE which would be registered in AddressRegistrationTLV of Child Update Request.
3) Bring up SED_1_2, which attaches to Thread 1.2 parent, register any external MA for indirect transmission.
a) add MA1_LINKLOCAL which would be registered in AddressRegistrationTLV of Child Update Request
b) add MA2_ADMINSCOPE which would be registered in AddressRegistrationTLV of Child Update Request.
4) Bring up MED_1_1, which attaches to Thread 1.2 parent, not register any external MA.
a) add MA1_LINKLOCAL which would NOT be registered in AddressRegistrationTLV of Child Update Request.
b) add MA2_ADMINSCOPE which would NOT be registered in AddressRegistrationTLV of Child Update Request.
5) Bring up SED_1_1, which attaches to Thread 1.2 parent, register any external MA for indirect transmission.
a) add MA1_LINKLOCAL which would be registered in AddressRegistrationTLV of Child Update Request
b) add MA2_ADMINSCOPE which would be registered in AddressRegistrationTLV of Child Update Request.
6) Bring up ROUTER_1_1.
7) Bring up MED_1_2_2 which attaches to Thread 1.1 parent, not register any external MA.
a) add MA1_LINKLOCAL which would NOT be registered in AddressRegistrationTLV of Child Update Request
b) add MA2_ADMINSCOPE which would NOT be registered in AddressRegistrationTLV of Child Update Request.
8) Bring up SED_1_2_2 which attaches to Thread 1.1 parent, register any external MA for indirect transmission.
a) add MA1_LINKLOCAL which would be registered in AddressRegistrationTLV of Child Update Request
b) add MA2_ADMINSCOPE which would be registered in AddressRegistrationTLV of Child Update Request.
"""
class TestMulticastRegistration(thread_cert.TestCase):
TOPOLOGY = {
LEADER_1_2: {
'version': '1.2',
'allowlist': [MED_1_2, SED_1_2, MED_1_1, SED_1_1, ROUTER_1_1],
},
MED_1_2: {
'mode': 'rn',
'version': '1.2',
'allowlist': [LEADER_1_2],
},
SED_1_2: {
'mode': 'n',
'version': '1.2',
'allowlist': [LEADER_1_2],
},
MED_1_1: {
'mode': 'rn',
'version': '1.1',
'allowlist': [LEADER_1_2],
},
SED_1_1: {
'mode': 'n',
'version': '1.1',
'allowlist': [LEADER_1_2],
},
ROUTER_1_1: {
'version': '1.1',
'allowlist': [LEADER_1_2, MED_1_2_2, SED_1_2_2],
},
MED_1_2_2: {
'mode': 'rn',
'version': '1.2',
'allowlist': [ROUTER_1_1],
},
SED_1_2_2: {
'mode': 'n',
'version': '1.2',
'allowlist': [ROUTER_1_1],
},
}
"""All nodes are created with default configurations"""
def __check_multicast_registration(self,
node,
multicast_address,
child_update_request_assert=True,
in_address_registration=True):
''' Check whether or not the addition of the multicast address on the specific node
would trigger Child Update Request for multicast address registration via Address
Registration TLV.
Args:
node (int) : The device id
multicast_address (string): The multicast address
child_update_request_assert (bool): whether or not the addition should trigger Child Update Request
in_address_registration (bool): Whether or not the multicast_address should be registered
'''
# Flush relative message queues.
self.flush_nodes([node])
self.nodes[node].add_ipmaddr(multicast_address)
WAIT_TIME = WAIT_REDUNDANCE
self.simulator.go(WAIT_TIME)
messages = self.simulator.get_messages_sent_by(node)
msg = messages.next_mle_message(mle.CommandType.CHILD_UPDATE_REQUEST,
assert_enabled=child_update_request_assert)
if msg:
is_in = command.check_address_registration_tlv(msg, multicast_address)
if in_address_registration:
assert is_in, 'Error: Expected {} in AddressRegistrationTLV not found'.format(multicast_address)
else:
assert not is_in, 'Error: Unexpected {} in AddressRegistrationTLV'.format(multicast_address)
def test(self):
# 1) Bring up Leader_1_2.
self.nodes[LEADER_1_2].start()
WAIT_TIME = WAIT_ATTACH
self.simulator.go(WAIT_TIME * 2)
self.assertEqual(self.nodes[LEADER_1_2].get_state(), 'leader')
# 2) Bring up MED_1_2, which attaches to Thread 1.2 parent, only register MA with scope larger than realm local.
self.nodes[MED_1_2].start()
WAIT_TIME = WAIT_ATTACH
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[MED_1_2].get_state(), 'child')
# 2a) add MA1_LINKLOCAL which would NOT be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(MED_1_2,
MA1_LINKLOCAL,
child_update_request_assert=False,
in_address_registration=False)
# 2b) add MA2_ADMINSCOPE which would be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(MED_1_2,
MA2_ADMINSCOPE,
child_update_request_assert=True,
in_address_registration=True)
# 3) Bring up SED_1_2, which attaches to Thread 1.2 parent, register any external MA for indirect transmission.
self.nodes[SED_1_2].start()
WAIT_TIME = WAIT_ATTACH
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[SED_1_2].get_state(), 'child')
# 3a) add MA1_LINKLOCAL which would be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(SED_1_2,
MA1_LINKLOCAL,
child_update_request_assert=True,
in_address_registration=True)
# 3b) add MA2_ADMINSCOPE which would be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(SED_1_2,
MA2_ADMINSCOPE,
child_update_request_assert=True,
in_address_registration=True)
# 4) Bring up MED_1_1, which attaches to Thread 1.2 parent, not register any external MA.
self.nodes[MED_1_1].start()
WAIT_TIME = WAIT_ATTACH
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[MED_1_1].get_state(), 'child')
# 4a) add MA1_LINKLOCAL which would NOT be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(MED_1_1,
MA1_LINKLOCAL,
child_update_request_assert=False,
in_address_registration=False)
# 4b) add MA2_ADMINSCOPE which would NOT be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(MED_1_1,
MA2_ADMINSCOPE,
child_update_request_assert=False,
in_address_registration=False)
# 5) Bring up SED_1_1, which attaches to Thread 1.2 parent, register any external MA for indirect transmission.
self.nodes[SED_1_1].start()
WAIT_TIME = WAIT_ATTACH
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[SED_1_1].get_state(), 'child')
# 5a) add MA1_LINKLOCAL which would be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(SED_1_1,
MA1_LINKLOCAL,
child_update_request_assert=True,
in_address_registration=True)
# 5b) add MA2_ADMINSCOPE which would be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(SED_1_1,
MA2_ADMINSCOPE,
child_update_request_assert=True,
in_address_registration=True)
#6) Bring up ROUTER_1_1.
self.nodes[ROUTER_1_1].set_router_selection_jitter(ROUTER_SELECTION_JITTER)
self.nodes[ROUTER_1_1].start()
WAIT_TIME = WAIT_ATTACH + ROUTER_SELECTION_JITTER
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[ROUTER_1_1].get_state(), 'router')
# 7) Bring up MED_1_2_2 which attaches to Thread 1.1 parent, not register any external MA.
self.nodes[MED_1_2_2].start()
WAIT_TIME = WAIT_ATTACH
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[MED_1_2_2].get_state(), 'child')
# 7a) add MA1_LINKLOCAL which would NOT be registered in AddressRegistrationTLV of Child Update Request
self.__check_multicast_registration(MED_1_2_2,
MA1_LINKLOCAL,
child_update_request_assert=False,
in_address_registration=False)
# 7b) add MA2_ADMINSCOPE which would NOT be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(MED_1_2_2,
MA2_ADMINSCOPE,
child_update_request_assert=False,
in_address_registration=False)
# 8) Bring up SED_1_2_2 which attaches to Thread 1.1 parent, register any external MA for indirect transmission.
self.nodes[SED_1_2_2].start()
WAIT_TIME = WAIT_ATTACH
self.simulator.go(WAIT_TIME)
self.assertEqual(self.nodes[SED_1_2_2].get_state(), 'child')
# 8a) add MA1_LINKLOCAL which would be registered in AddressRegistrationTLV of Child Update Request
self.__check_multicast_registration(SED_1_2_2,
MA1_LINKLOCAL,
child_update_request_assert=True,
in_address_registration=True)
# 8b) add MA2_ADMINSCOPE which would be registered in AddressRegistrationTLV of Child Update Request.
self.__check_multicast_registration(SED_1_2_2,
MA2_ADMINSCOPE,
child_update_request_assert=True,
in_address_registration=True)
if __name__ == '__main__':
unittest.main()