[tests] remove thread-cert backbone tests (#13161)

This commit removes the thread-cert/backbone tests and cleans
up all related configurations and references.

Specifically, the following changes are made:
- Deleted tests in tests/scripts/thread-cert/backbone/
- Removed the backbone-router job from .github/workflows/otbr.yml
- Removed backbone-router dependency from upload-coverage job
- Removed setup, cleanup, and checks for backbone tests in
  tests/scripts/thread-cert/run_cert_suite.py
This commit is contained in:
Jonathan Hui
2026-05-27 10:16:44 -07:00
committed by GitHub
parent 289abbd87b
commit 9431d3a77e
7 changed files with 4 additions and 947 deletions
-69
View File
@@ -45,74 +45,6 @@ permissions:
jobs:
backbone-router:
runs-on: ubuntu-22.04
env:
REFERENCE_DEVICE: 1
VIRTUAL_TIME: 0
TEST_TIMEOUT: 1800
PACKET_VERIFICATION: 1
THREAD_VERSION: 1.4
INTER_OP: 1
COVERAGE: 1
MULTIPLY: 1
PYTHONUNBUFFERED: 1
VERBOSE: 1
# The Border Routing and DUA feature can coexist, but current wireshark
# packet verification can't handle it because of the order of context ID
# of OMR prefix and Domain prefix is not deterministic.
BORDER_ROUTING: 0
DISCOVERY_PROXY: 0
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
with:
submodules: recursive
- name: Build OTBR Docker
env:
PR_BODY: "${{ github.event.pull_request.body }}"
run: |
./script/test build_otbr_docker
- name: Bootstrap
run: |
sudo rm /etc/apt/sources.list.d/* && sudo apt-get update
sudo apt-get --no-install-recommends install -y python3-setuptools python3-wheel ninja-build lcov
sudo bash script/install_socat
python3 -m pip install -r tests/scripts/thread-cert/requirements.txt
- name: Build
run: |
./script/test build
- name: Get Thread-Wireshark
run: |
./script/test get_thread_wireshark
- name: Run
run: |
export CI_ENV="$(bash <(curl -s https://codecov.io/env)) -e GITHUB_ACTIONS -e COVERAGE"
echo "CI_ENV=${CI_ENV}"
sudo -E ./script/test cert_suite ./tests/scripts/thread-cert/backbone/*.py || (sudo chmod a+r ot_testing/* && false)
- uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: cov-thread-1-3-backbone-docker
path: /tmp/coverage/
retention-days: 1
- uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
if: ${{ failure() }}
with:
name: thread-1-3-backbone-results
path: |
ot_testing/*.pcap
ot_testing/*.json
ot_testing/*.log
ot_testing/coredump_*
ot_testing/otbr-agent_*
- name: Generate Coverage
run: |
./script/test generate_coverage gcc
- uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: cov-thread-1-3-backbone
path: tmp/coverage.info
retention-days: 1
thread-border-router:
runs-on: ubuntu-22.04
strategy:
@@ -216,7 +148,6 @@ jobs:
upload-coverage:
needs:
- backbone-router
- thread-border-router
runs-on: ubuntu-22.04
steps:
@@ -1,202 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# This test verifies that PBBR sends BMLR.ntf correctly when multicast addresses are registered.
#
# Topology:
# ---- -(eth)-------
# | |
# PBBR----SBBR
# \ /
# Router1---Commissioner
#
import unittest
import thread_cert
import config
from pktverify.packet_verifier import PacketVerifier
PBBR = 1
SBBR = 2
ROUTER1 = 3
COMMISSIONER = 4
REREG_DELAY = 4 # Seconds
MLR_TIMEOUT = 300 # Seconds
CUSTOM_MLR_TIMEOUT = 1000 # Seconds
MA1 = 'ff04::1'
MA2 = 'ff04::2'
MA3 = 'ff04::3'
MA4 = 'ff04::4'
MA5 = 'ff04::5'
class BBR_5_11_01(thread_cert.TestCase):
USE_MESSAGE_FACTORY = False
TOPOLOGY = {
PBBR: {
'name': 'PBBR',
'allowlist': [SBBR, ROUTER1],
'is_otbr': True,
'version': '1.2',
},
SBBR: {
'name': 'SBBR',
'allowlist': [PBBR, ROUTER1],
'is_otbr': True,
'version': '1.2',
},
ROUTER1: {
'name': 'ROUTER1',
'allowlist': [PBBR, SBBR, COMMISSIONER],
'version': '1.2',
},
COMMISSIONER: {
'name': 'COMMISSIONER',
'allowlist': [ROUTER1],
'version': '1.2',
}
}
def test(self):
self.nodes[PBBR].start()
self.wait_node_state(PBBR, 'leader', 10)
self.nodes[PBBR].set_backbone_router(reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT)
self.nodes[PBBR].enable_backbone_router()
self.wait_until(lambda: self.nodes[PBBR].is_primary_backbone_router, 5)
self.nodes[SBBR].start()
self.wait_node_state(SBBR, 'router', 5)
self.nodes[SBBR].set_backbone_router(reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT)
self.nodes[SBBR].enable_backbone_router()
self.simulator.go(5)
self.assertFalse(self.nodes[SBBR].is_primary_backbone_router)
self.nodes[ROUTER1].start()
self.wait_node_state(ROUTER1, 'router', 5)
self.nodes[COMMISSIONER].start()
self.wait_node_state(COMMISSIONER, 'router', 5)
self.wait_route_established(COMMISSIONER, PBBR)
self.nodes[COMMISSIONER].commissioner_start()
self.simulator.go(10)
self.assertEqual('active', self.nodes[COMMISSIONER].commissioner_state())
self.nodes[PBBR].add_ipmaddr(MA1)
self.simulator.go(REREG_DELAY)
self.nodes[ROUTER1].add_ipmaddr(MA2)
self.simulator.go(REREG_DELAY)
# Commissioner registers MA3 with default timeout
self.assertEqual((0, []), self.nodes[COMMISSIONER].register_multicast_listener(MA3, timeout=None))
# Commissioner registers MA4 with a custom timeout
self.assertEqual((0, []), self.nodes[COMMISSIONER].register_multicast_listener(MA4,
timeout=CUSTOM_MLR_TIMEOUT))
# Commissioner unregisters MA5
self.assertEqual((0, []), self.nodes[COMMISSIONER].register_multicast_listener(MA5, timeout=0))
self.collect_ipaddrs()
self.collect_rloc16s()
def verify(self, pv: PacketVerifier):
pkts = pv.pkts
pv.add_common_vars()
pv.summary.show()
pv.verify_attached('ROUTER1')
ROUTER1 = pv.vars['ROUTER1']
COMMISSIONER = pv.vars['COMMISSIONER']
PBBR_ETH = pv.vars['PBBR_ETH']
SBBR_ETH = pv.vars['SBBR_ETH']
# Verify SBBR must not send `/b/bmr` during the test.
pkts.filter_eth_src(SBBR_ETH).filter_coap_request('/b/bmr').must_not_next()
# Verify PBBR sends `/b/bmr` on the Backbone link for MA1 with default timeout.
pkts.filter_eth_src(PBBR_ETH).filter_coap_request('/b/bmr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA1}']
and thread_bl.tlv.timeout == {MLR_TIMEOUT}
and ipv6.src.is_link_local
""")
# Router registers MA2 with default timeout
pkts.filter_wpan_src64(ROUTER1).filter_coap_request('/n/mr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA2}']
and thread_bl.tlv.timeout is null
""")
# Verify PBBR sends `/b/bmr` on the Backbone link for MA2 with default timeout.
pkts.filter_eth_src(PBBR_ETH).filter_coap_request('/b/bmr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA2}']
and thread_bl.tlv.timeout == {MLR_TIMEOUT}
and ipv6.src.is_link_local
""")
# Commissioner registers MA3 with default timeout
pkts.filter_wpan_src64(COMMISSIONER).filter_coap_request('/n/mr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA3}']
and thread_bl.tlv.timeout is null
""")
# Verify PBBR sends `/b/bmr` on the Backbone link for MA3 with default timeout.
pkts.filter_eth_src(PBBR_ETH).filter_coap_request('/b/bmr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA3}']
and thread_bl.tlv.timeout == {MLR_TIMEOUT}
and ipv6.src.is_link_local
""")
# Commissioner registers MA4 with custom timeout
pkts.filter_wpan_src64(COMMISSIONER).filter_coap_request('/n/mr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA4}']
and thread_nm.tlv.timeout == {CUSTOM_MLR_TIMEOUT}
""")
# Verify PBBR sends `/b/bmr` on the Backbone link for MA4 with custom timeout.
pkts.filter_eth_src(PBBR_ETH).filter_coap_request('/b/bmr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA4}']
and thread_bl.tlv.timeout == {CUSTOM_MLR_TIMEOUT}
and ipv6.src.is_link_local
""")
# Commissioner unregisters MA5
pkts.filter_wpan_src64(COMMISSIONER).filter_coap_request('/n/mr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA5}']
and thread_nm.tlv.timeout == 0
""")
# Verify PBBR sends `/b/bmr` on the Backbone link for MA5 with timeout equal to zero.
pkts.filter_eth_src(PBBR_ETH).filter_coap_request('/b/bmr').must_next().must_verify(f"""
thread_meshcop.tlv.ipv6_addr == ['{MA5}']
and thread_bl.tlv.timeout == 0
and ipv6.src.is_link_local
""")
if __name__ == '__main__':
unittest.main()
@@ -1,98 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# This test verifies that no ICMPv6 message is sent for MLE.
#
import unittest
import thread_cert
import config
from pktverify.consts import ICMPV6_TYPE_DESTINATION_UNREACHABLE
from pktverify.packet_verifier import PacketVerifier
PBBR = 1
ROUTER = 2
class TestMleMustNotSendIcmpv6DestinationUnreachable(thread_cert.TestCase):
USE_MESSAGE_FACTORY = False
# Topology:
#
# ------(eth)----------
# |
# PBBR---ROUTER
#
TOPOLOGY = {
PBBR: {
'name': 'PBBR',
'allowlist': [ROUTER],
'is_otbr': True,
'version': '1.2',
},
ROUTER: {
'name': 'ROUTER',
'allowlist': [PBBR],
'version': '1.2',
},
}
def test(self):
self.nodes[PBBR].start()
self.simulator.go(config.LEADER_STARTUP_DELAY)
self.assertEqual('leader', self.nodes[PBBR].get_state())
self.nodes[PBBR].enable_backbone_router()
self.simulator.go(3)
self.assertTrue(self.nodes[PBBR].is_primary_backbone_router)
self.nodes[ROUTER].start()
self.simulator.go(5)
self.assertEqual('router', self.nodes[ROUTER].get_state())
self.simulator.go(5)
self.collect_ipaddrs()
def verify(self, pv: PacketVerifier):
pkts = pv.pkts
pv.add_common_vars()
pv.summary.show()
with pkts.save_index():
pv.verify_attached('ROUTER')
PBBR = pv.vars['PBBR']
ROUTER = pv.vars['ROUTER']
# PBBR MUST NOT send ICMPv6 Destination Unreachable
pkts.filter_wpan_src64(PBBR).filter_wpan_dst64(ROUTER).filter(
f'icmpv6.type == {ICMPV6_TYPE_DESTINATION_UNREACHABLE}').must_not_next()
if __name__ == '__main__':
unittest.main()
@@ -1,217 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# This test verifies that the basic MLR feature works.
#
import logging
import unittest
import config
import thread_cert
from pktverify.packet_verifier import PacketVerifier
PBBR = 1
SBBR = 2
ROUTER = 3
LEADER = 4
FED = 5
MED = 6
SED = 7
HOST = 8
SED_POLL_PERIOD = 1000 # Milliseconds
MA1 = 'ff04::1234:777a:1'
MA2 = 'ff05::1234:777a:2'
MA3 = 'ff05::1234:777a:3'
MA4 = 'ff05::1234:777a:4'
MA5 = 'ff05::1234:777a:5'
BBR_REGISTRATION_JITTER = 1
WAIT_REDUNDANCE = 3
class TestMlr(thread_cert.TestCase):
USE_MESSAGE_FACTORY = False
# Topology:
# --------(eth)---------
# | | |
# PBBR---SBBR HOST
# \ / \
# \ / \
# ROUTER--LEADER
# / | \
# / | \
# FED MED SED
#
# More links:
# PBBR---LEADER
#
TOPOLOGY = {
PBBR: {
'name': 'PBBR',
'allowlist': [SBBR, ROUTER, LEADER],
'is_otbr': True,
'version': '1.2',
'bbr_registration_jitter': BBR_REGISTRATION_JITTER,
},
SBBR: {
'name': 'SBBR',
'allowlist': [PBBR, ROUTER, LEADER],
'is_otbr': True,
'version': '1.2',
'bbr_registration_jitter': BBR_REGISTRATION_JITTER,
},
ROUTER: {
'name': 'ROUTER',
'allowlist': [PBBR, SBBR, LEADER, FED, MED, SED],
'version': '1.2',
},
LEADER: {
'name': 'LEADER',
'allowlist': [PBBR, SBBR, ROUTER],
'version': '1.2',
},
FED: {
'name': 'FED',
'allowlist': [ROUTER],
'version': '1.2',
'router_upgrade_threshold': 0,
},
MED: {
'name': 'MED',
'allowlist': [ROUTER],
'version': '1.2',
'mode': 'rn',
},
SED: {
'name': 'SED',
'allowlist': [ROUTER],
'version': '1.2',
'mode': 'n'
},
HOST: {
'name': 'Host',
'is_host': True
},
}
def test(self):
# Bring up Host
self.nodes[HOST].start()
# Bring up Leader
self.nodes[LEADER].start()
self.simulator.go(config.LEADER_STARTUP_DELAY)
self.assertEqual('leader', self.nodes[LEADER].get_state())
# Bring up Router
self.nodes[ROUTER].start()
self.simulator.go(5)
self.assertEqual('router', self.nodes[ROUTER].get_state())
# Bring up PBBR
self.nodes[PBBR].start()
self.simulator.go(5)
self.assertEqual('router', self.nodes[PBBR].get_state())
self.nodes[PBBR].enable_backbone_router()
self.simulator.go(10)
self.assertTrue(self.nodes[PBBR].is_primary_backbone_router)
self.nodes[PBBR].add_prefix(config.DOMAIN_PREFIX, "parosD")
self.nodes[PBBR].register_netdata()
# Bring up SBBR
self.nodes[SBBR].start()
self.simulator.go(5)
self.assertEqual('router', self.nodes[SBBR].get_state())
self.nodes[SBBR].enable_backbone_router()
self.simulator.go(10)
self.assertFalse(self.nodes[SBBR].is_primary_backbone_router)
# Bring up FED, MED, SED
self.nodes[FED].start()
self.nodes[MED].start()
self.nodes[SED].set_pollperiod(SED_POLL_PERIOD)
self.nodes[SED].start()
self.simulator.go(5)
self.assertEqual('child', self.nodes[FED].get_state())
self.assertEqual('child', self.nodes[MED].get_state())
self.assertEqual('child', self.nodes[SED].get_state())
# Verify Multicast Routing works for all devices
self._verify_multicast_routing(ROUTER, MA1)
self._verify_multicast_routing(LEADER, MA2)
self._verify_multicast_routing(FED, MA3)
self._verify_multicast_routing(MED, MA4, is_med=True)
self._verify_multicast_routing(SED, MA5, is_med=True, is_sed=True)
# Verify MA_scope2 is not reachable from Host
MA_scope2 = 'ff02::10'
self.nodes[ROUTER].add_ipmaddr(MA_scope2)
self.simulator.go(3)
self.assertFalse(self.nodes[HOST].ping(MA_scope2, backbone=True, ttl=10))
self.nodes[ROUTER].del_ipmaddr(MA_scope2)
# Verify MA_scope3 is not reachable from Host
MA_scope3 = 'ff03::1234:777a:5'
self.nodes[ROUTER].add_ipmaddr(MA_scope3)
self.simulator.go(3)
self.assertFalse(self.nodes[HOST].ping(MA_scope3, backbone=True, ttl=10))
self.nodes[ROUTER].del_ipmaddr(MA_scope3)
# Router subscribes MA2 and MA3 at the same time and verify that they are both reachable
self.nodes[ROUTER].add_ipmaddr(MA2)
self.nodes[ROUTER].add_ipmaddr(MA3)
self.simulator.go(3)
self.assertTrue(self.nodes[HOST].ping(MA2, backbone=True, ttl=10))
self.assertTrue(self.nodes[HOST].ping(MA3, backbone=True, ttl=10))
self.nodes[ROUTER].del_ipmaddr(MA2)
self.nodes[ROUTER].del_ipmaddr(MA3)
self.simulator.go(1)
def _verify_multicast_routing(self, nodeid: int, ma: str, is_med=False, is_sed=False):
logging.info('_verify_multicast_routing: nodeid=%d, MA=%s', nodeid, ma)
# Verify MA is not reachable from Host initially
self.assertFalse(self.nodes[HOST].ping(ma, backbone=True, ttl=10))
# Device subscribes MA
self.nodes[nodeid].add_ipmaddr(ma)
self.simulator.go(3 + (SED_POLL_PERIOD * 2 / 1000) * is_sed + config.PARENT_AGGREGATIOIN_DELAY * is_med +
WAIT_REDUNDANCE)
# Verify MA is reachable from Host
self.assertTrue(self.nodes[HOST].ping(ma, backbone=True, ttl=10))
# Device unsubscribes MA
self.nodes[nodeid].del_ipmaddr(ma)
self.simulator.go(1)
if __name__ == '__main__':
unittest.main()
@@ -1,138 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# This test verifies that the MLR timeout configured by Commissioner works for multicast routing.
#
import unittest
import config
from pktverify.consts import NM_COMMISSIONER_SESSION_ID_TLV
from pktverify.packet_verifier import PacketVerifier
from test_mlr_multicast_routing_timeout import ROUTER, HOST
from test_mlr_multicast_routing_timeout import TestMlrTimeout, MLR_TIMEOUT, WAIT_REDUNDANCE, MA1
COMMISSIONER_MLR_TIMEOUT = 360
assert COMMISSIONER_MLR_TIMEOUT > MLR_TIMEOUT + WAIT_REDUNDANCE * 3, "COMMISSIONER_MLR_TIMEOUT should be significantly larger than MLR_TIMEOUT"
class TestMlrCommissionerTimeout(TestMlrTimeout):
# Topology (same as TestMlrTimeout):
# --------(eth)---------
# | |
# PBBR(Leader) HOST
# |
# ROUTER
#
def test(self):
self._bootstrap()
# Ping 1: PBBR should not forward to Thread network
self.assertFalse(self.nodes[HOST].ping(MA1, backbone=True, ttl=10))
# Router starts Commissioner
self.nodes[ROUTER].commissioner_start()
self.simulator.go(10)
self.assertEqual('active', self.nodes[ROUTER].commissioner_state())
# Router (Commissioner) registers MA1
self.nodes[ROUTER].register_multicast_listener(MA1, timeout=COMMISSIONER_MLR_TIMEOUT)
self.simulator.go(WAIT_REDUNDANCE)
# Ping 2: PBBR should forward to Thread network
self.nodes[HOST].ping(MA1, backbone=True, ttl=10)
self.simulator.go(MLR_TIMEOUT + WAIT_REDUNDANCE)
# Ping 3: PBBR should forward to Thread network
self.nodes[HOST].ping(MA1, backbone=True, ttl=10)
self.simulator.go(COMMISSIONER_MLR_TIMEOUT - MLR_TIMEOUT)
# Ping 4: PBBR should NOT forward to Thread network
self.assertFalse(self.nodes[HOST].ping(MA1, backbone=True, ttl=10))
def verify(self, pv: PacketVerifier):
pkts = pv.pkts
pv.add_common_vars()
pv.summary.show()
ROUTER = pv.vars['ROUTER']
PBBR = pv.vars['PBBR']
MM = pv.vars['MM_PORT']
HOST_ETH = pv.vars['Host_ETH']
HOST_BGUA = pv.vars['Host_BGUA']
start = pkts.index
# Verify that Router sends MLR.req for MA1 with Commissioner timeout
pkts.filter_wpan_src64(ROUTER).filter_coap_request('/n/mr', port=MM).must_next().must_verify(
"""
thread_meshcop.tlv.ipv6_addr == {ipv6_addr}
and thread_nm.tlv.timeout == {timeout}
and {NM_COMMISSIONER_SESSION_ID_TLV} in thread_nm.tlv.type
""",
ipv6_addr=[MA1],
timeout=COMMISSIONER_MLR_TIMEOUT,
NM_COMMISSIONER_SESSION_ID_TLV=NM_COMMISSIONER_SESSION_ID_TLV)
mr_index = pkts.index
# Ping 1: Host pings Router before MLR.reg
before_mr_pkts = pkts.range(start, mr_index, cascade=False)
ping1 = before_mr_pkts.filter_eth_src(HOST_ETH).filter_ipv6_dst(MA1).filter_ping_request().must_next()
# PBBR should not forward Ping 1 to Thread network
before_mr_pkts.filter_wpan_src64(PBBR).filter_ipv6_dst(MA1).filter_ping_request(
identifier=ping1.icmpv6.echo.identifier).must_not_next()
# Ping 2: Host pings Router after MLR.req
ping2 = pkts.filter_eth_src(HOST_ETH).filter_ipv6_dst(MA1).filter_ping_request().must_next()
# PBBR should forward this ping request to Thread network
pkts.filter_wpan_src64(PBBR).filter_AMPLFMA().filter_ping_request(
identifier=ping2.icmpv6.echo.identifier).must_next()
# Ping 3: Host pings Router after delayed MLR Timeout
ping3 = pkts.filter_eth_src(HOST_ETH).filter_ipv6_dst(MA1).filter_ping_request().must_next()
# PBBR should forward this ping request to Thread network because MA doesn't expire yet
pkts.filter_wpan_src64(PBBR).filter_AMPLFMA().filter_ping_request(
identifier=ping3.icmpv6.echo.identifier).must_next()
# Ping 4: Host pings Router after delayed Commissioner MLR Timeout
ping4 = pkts.filter_eth_src(HOST_ETH).filter_ipv6_dst(MA1).filter_ping_request().must_next()
# PBBR should NOT forward this ping request to Thread network because MLR has timeout
pkts.filter_wpan_src64(PBBR).filter_AMPLFMA().filter_ping_request(
identifier=ping4.icmpv6.echo.identifier).must_not_next()
del TestMlrTimeout
if __name__ == '__main__':
unittest.main()
@@ -1,180 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# This test verifies that the MLR timeout works for multicast routing.
#
import unittest
import config
import thread_cert
from pktverify.packet_verifier import PacketVerifier
PBBR = 1
ROUTER = 2
HOST = 3
MA1 = 'ff05::1234:777a:1'
BBR_REGISTRATION_JITTER = 1
REG_DELAY = 10
MLR_TIMEOUT = 300
WAIT_REDUNDANCE = 3
class TestMlrTimeout(thread_cert.TestCase):
USE_MESSAGE_FACTORY = False
# Topology:
# --------(eth)---------
# | |
# PBBR(Leader) HOST
# |
# ROUTER
#
TOPOLOGY = {
PBBR: {
'name': 'PBBR',
'allowlist': [ROUTER],
'is_otbr': True,
'version': '1.2',
'bbr_registration_jitter': BBR_REGISTRATION_JITTER,
},
ROUTER: {
'name': 'ROUTER',
'allowlist': [PBBR],
'version': '1.2',
},
HOST: {
'name': 'Host',
'is_host': True
},
}
def _bootstrap(self):
# Bring up Host
self.nodes[HOST].start()
# Bring up PBBR
self.nodes[PBBR].start()
self.simulator.go(config.LEADER_STARTUP_DELAY)
self.assertEqual('leader', self.nodes[PBBR].get_state())
self.nodes[PBBR].enable_backbone_router()
self.nodes[PBBR].set_backbone_router(reg_delay=REG_DELAY, mlr_timeout=MLR_TIMEOUT)
self.simulator.go(10)
self.assertTrue(self.nodes[PBBR].is_primary_backbone_router)
self.nodes[PBBR].add_prefix(config.DOMAIN_PREFIX, "parosD")
self.nodes[PBBR].register_netdata()
# Bring up Router
self.nodes[ROUTER].start()
self.simulator.go(5)
self.assertEqual('router', self.nodes[ROUTER].get_state())
self.collect_ipaddrs()
def test(self):
self._bootstrap()
self.assertFalse(self.nodes[HOST].ping(MA1, backbone=True, ttl=10))
# Router subscribes MA
self.nodes[ROUTER].add_ipmaddr(MA1)
self.simulator.go(WAIT_REDUNDANCE)
# Verify MA is reachable from Host
self.assertTrue(self.nodes[HOST].ping(MA1, backbone=True, ttl=10))
self.simulator.go(WAIT_REDUNDANCE)
# Router unsubscribes MA
self.nodes[ROUTER].del_ipmaddr(MA1)
self.simulator.go(WAIT_REDUNDANCE)
# Verify MA is not reachable from Host after unsubscribed
# But PBBR should still forward the Ping Requests to Thread network
self.nodes[HOST].ping(MA1, backbone=True, ttl=10)
self.simulator.go(MLR_TIMEOUT + WAIT_REDUNDANCE)
# Verify MA is not reachable from Host after MLR timeout
# PBBR should not forward the Ping Requests to Thread network
self.assertFalse(self.nodes[HOST].ping(MA1, backbone=True, ttl=10))
def verify(self, pv: PacketVerifier):
pkts = pv.pkts
pv.add_common_vars()
pv.summary.show()
ROUTER = pv.vars['ROUTER']
PBBR = pv.vars['PBBR']
MM = pv.vars['MM_PORT']
HOST_ETH = pv.vars['Host_ETH']
HOST_BGUA = pv.vars['Host_BGUA']
start = pkts.index
# Verify that Router sends MLR.req for MA1
pkts.filter_wpan_src64(ROUTER).filter_coap_request('/n/mr', port=MM).must_next().must_verify(
'thread_meshcop.tlv.ipv6_addr == {ipv6_addr}', ipv6_addr=[MA1])
mr_index = pkts.index
# Host pings Router before MLR.reg
before_mr_pkts = pkts.range(start, mr_index, cascade=False)
ping = before_mr_pkts.filter_eth_src(HOST_ETH).filter_ipv6_dst(MA1).filter_ping_request().must_next()
# PBBR should not forward this ping request to Thread network
before_mr_pkts.filter_wpan_src64(PBBR).filter_ipv6_dst(MA1).filter_ping_request(
identifier=ping.icmpv6.echo.identifier).must_not_next()
# Host pings Router after MLR.req
ping = pkts.filter_eth_src(HOST_ETH).filter_ipv6_dst(MA1).filter_ping_request().must_next()
# PBBR should forward this ping request to Thread network
pkts.filter_wpan_src64(PBBR).filter_AMPLFMA().filter_ping_request(
identifier=ping.icmpv6.echo.identifier).must_next()
# Router should reply
pkts.filter_wpan_src64(ROUTER).filter_ipv6_dst(HOST_BGUA).filter_ping_reply(
identifier=ping.icmpv6.echo.identifier).must_next()
# Host pings Router after Router unsubscribed MA
ping = pkts.filter_eth_src(HOST_ETH).filter_ipv6_dst(MA1).filter_ping_request().must_next()
# PBBR should forward this ping request to Thread network because MA doesn't expire yet
pkts.filter_wpan_src64(PBBR).filter_AMPLFMA().filter_ping_request(
identifier=ping.icmpv6.echo.identifier).must_next()
# Router should NOT reply because it has unsubscribed the multicast address
pkts.filter_wpan_src64(ROUTER).filter_ipv6_dst(HOST_BGUA).filter_ping_reply(
identifier=ping.icmpv6.echo.identifier).must_not_next()
# Host pings Router after MLR timeout
ping = pkts.filter_eth_src(HOST_ETH).filter_ipv6_dst(MA1).filter_ping_request().must_next()
# PBBR should NOT forward this ping request to Thread network because MLR has timeout
pkts.filter_wpan_src64(PBBR).filter_AMPLFMA().filter_ping_request(
identifier=ping.icmpv6.echo.identifier).must_not_next()
if __name__ == '__main__':
unittest.main()
+3 -42
View File
@@ -31,6 +31,7 @@ import multiprocessing
import os
import queue
import subprocess
import sys
import time
import traceback
from collections import Counter, defaultdict
@@ -42,8 +43,6 @@ THREAD_VERSION = os.getenv('THREAD_VERSION')
VIRTUAL_TIME = int(os.getenv('VIRTUAL_TIME', '1'))
MAX_JOBS = int(os.getenv('MAX_JOBS', (multiprocessing.cpu_count() * 2 if VIRTUAL_TIME else 10)))
_BACKBONE_TESTS_DIR = 'tests/scripts/thread-cert/backbone'
_COLOR_PASS = '\033[0;32m'
_COLOR_FAIL = '\033[0;31m'
_COLOR_NONE = '\033[0m'
@@ -98,31 +97,13 @@ def run_cert(iteration_id: int, port_offset: int, script: str, run_directory: st
pool = multiprocessing.Pool(processes=MAX_JOBS)
def cleanup_backbone_env():
logging.info("Cleaning up Backbone testing environment ...")
bash('pkill socat 2>/dev/null || true')
bash('pkill dumpcap 2>/dev/null || true')
bash(f'docker rm -f $(docker ps -a -q -f "name=otbr_") 2>/dev/null || true')
bash(f'docker network rm $(docker network ls -q -f "name=backbone") 2>/dev/null || true')
def setup_backbone_env():
if THREAD_VERSION == '1.1':
raise RuntimeError('Backbone tests do not work with THREAD_VERSION=1.1')
if VIRTUAL_TIME:
raise RuntimeError('Backbone tests only work with VIRTUAL_TIME=0')
bash(f'docker image inspect {config.OTBR_DOCKER_IMAGE} >/dev/null')
def parse_args():
import argparse
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--multiply', type=int, default=1, help='run each test for multiple times')
parser.add_argument('--timeout', type=int, default=0, help='timeout in seconds per test, zero means no timeout')
parser.add_argument('--run-directory', type=str, default=None, help='run each test in the specified directory')
parser.add_argument("scripts", nargs='+', type=str, help='specify Backbone test scripts')
parser.add_argument("scripts", nargs='+', type=str, help='specify test scripts')
args = parser.parse_args()
logging.info("Max jobs: %d", MAX_JOBS)
@@ -133,15 +114,6 @@ def parse_args():
return args
def check_has_backbone_tests(scripts):
for script in scripts:
relpath = os.path.relpath(script, _BACKBONE_TESTS_DIR)
if not relpath.startswith('..'):
return True
return False
class PortOffsetPool:
def __init__(self, size: int):
@@ -205,19 +177,8 @@ def run_tests(scripts: List[str], multiply: int = 1, run_directory: str = None,
def main():
args = parse_args()
has_backbone_tests = check_has_backbone_tests(args.scripts)
logging.info('Has Backbone tests: %s', has_backbone_tests)
if has_backbone_tests:
cleanup_backbone_env()
setup_backbone_env()
try:
fail_count = run_tests(args.scripts, args.multiply, args.run_directory, args.timeout)
exit(fail_count)
finally:
if has_backbone_tests:
cleanup_backbone_env()
sys.exit(1 if fail_count else 0)
if __name__ == '__main__':