[thread-cert] Thread 1.2 CI with OTBR and Backbone link (#5489)

Thread 1.2 CI with OTBR and Backbone link:
- Run OTBR in Dockers with Backbone link
- Enhance node.py to work with OTBR Docker
- Add Packet Verification for 5.11.1
  - incomplete, some steps can not pass yet, just to make sure
    Backbone traffic verification works
- Support running multiple tests simultaneously (default: each test
  run 3 instances)
- Build OTBR Docker using OpenThread PR code
- Upload code coverage in Docker

Some implementation details:
- Most existing code of node.py is shared by OTBR Docker
- Backbone related test scripts are found in
  tests/scripts/thread-cert/backbone
- A new script tests/scripts/thread-cert/run_bbr_tests.py is added to
  manage multiple running tests
- Test configuration differs according to PORT_OFFSET
  - Backbone interface name: Backbone{PORT_OFFSET}
  - Backbone network prefix: 91{PORT_OFFSET:02x}::/64
  - Docker instance name: otbr_{PORT_OFFSET}_{nodeid}
  - Output Files:
    - Pcap:
      - Thread: {test_name}_{PORT_OFFSET}.pcap
      - Backbone: {test_name}_{PORT_OFFSET}_backbone.pcap
      - Merged: {test_name}_{PORT_OFFSET}_merged.pcap
    - Log: {test_name}_{PORT_OFFSET}.log
This commit is contained in:
Simon Lin
2020-09-25 11:16:27 +08:00
committed by GitHub
parent 2daa097bb4
commit ce336257ee
14 changed files with 1011 additions and 114 deletions
+49
View File
@@ -119,6 +119,9 @@ jobs:
- name: Build
run: |
./script/test build
- name: Get Thread-Wireshark
run: |
./script/test get_thread_wireshark
- name: Run
run: |
for i in {1..10}
@@ -209,3 +212,49 @@ jobs:
uses: codecov/codecov-action@v1
with:
fail_ci_if_error: true
thread-1-2-backbone:
runs-on: ubuntu-18.04
env:
REFERENCE_DEVICE: 1
VIRTUAL_TIME: 0
PACKET_VERIFICATION: 1
THREAD_VERSION: 1.2
INTER_OP: 1
COVERAGE: 1
MULTIPLY: 3
PYTHONUNBUFFERED: 1
OTBR_COMMIT: "d1671cfa4777d3a1bb565b88856f1ef446fe760f" # Sep 22, 2020
steps:
- uses: actions/checkout@v2
- name: Build OTBR Docker
run: |
./script/test build_otbr_docker
- name: Bootstrap
run: |
sudo rm /etc/apt/sources.list.d/* && sudo apt-get update
sudo apt-get --no-install-recommends install -y python3-setuptools python3-wheel ninja-build socat
python3 -m pip install -r tests/scripts/thread-cert/requirements.txt
- name: Build
run: |
./script/test build
- name: Get Thread-Wireshark
run: |
./script/test get_thread_wireshark
- name: Run
run: |
export CI_ENV="$(bash <(curl -s https://codecov.io/env)) -e GITHUB_ACTIONS -e COVERAGE"
echo "CI_ENV=${CI_ENV}"
sudo -E ./script/test cert_bbr ./tests/scripts/thread-cert/backbone/*.py || (sudo chmod a+r *.log *.json *.pcap && false)
- uses: actions/upload-artifact@v2
if: ${{ failure() }}
with:
name: thread-1-2-backbone-results
path: |
*.pcap
*.json
*.log
- name: Codecov
uses: codecov/codecov-action@v1
with:
fail_ci_if_error: true
+3
View File
@@ -129,6 +129,9 @@ jobs:
- name: Build
run: |
./script/test build
- name: Get Thread-Wireshark
run: |
./script/test get_thread_wireshark
- name: Run
run: |
for i in {1..10}
+74
View File
@@ -229,6 +229,70 @@ do_cert_suite()
fi
}
do_cert_bbr()
{
if [[ ${THREAD_VERSION} != "1.2" ]]; then
echo "cert_bbr only work with THREAD_VERSION=1.2!"
exit 1
fi
if [[ ${VIRTUAL_TIME} != "0" ]]; then
echo "cert_bbr only work with VIRTUAL_TIME=0!"
exit 1
fi
export top_builddir="${OT_BUILDDIR}/cmake/openthread-simulation-1.2"
export top_builddir_1_2_bbr="${OT_BUILDDIR}/cmake/openthread-simulation-1.2-bbr"
export top_builddir_1_1="${OT_BUILDDIR}/cmake/openthread-simulation-1.1"
export PYTHONPATH=tests/scripts/thread-cert
python3 tests/scripts/thread-cert/run_bbr_tests.py --multiply "${MULTIPLY:-1}" "$@"
}
do_get_thread_wireshark()
{
echo "Downloading thread-wireshark from https://github.com/openthread/wireshark/releases ..."
local download_url=https://github.com/openthread/wireshark/releases/download/ot-pktverify-20200727/thread-wireshark.tar.gz
local save_file=/tmp/thread-wireshark.tar.gz
rm -rf /tmp/thread-wireshark || true
rm -rf "${save_file}" || true
curl -L "${download_url}" -o "${save_file}"
tar -C /tmp -xvzf "${save_file}"
LD_LIBRARY_PATH=/tmp/thread-wireshark /tmp/thread-wireshark/tshark -v
LD_LIBRARY_PATH=/tmp/thread-wireshark /tmp/thread-wireshark/dumpcap -v
rm -rf "${save_file}"
}
do_build_otbr_docker()
{
echo "Building OTBR Docker ..."
local otdir
local otbrdir
local otbr_options="-DOTBR_BACKBONE_ROUTER=ON -DOT_DUA=ON -DOT_MLR=ON -DOT_REFERENCE_DEVICE=ON -DOT_COVERAGE=ON"
local otbr_commit=${OTBR_COMMIT:-master}
local otbr_docker_image=${OTBR_DOCKER_IMAGE:-otbr-ot12-backbone-ci}
otbrdir=$(mktemp -d -t otbr_XXXXXX)
otdir=$(pwd)
(
cd "${otbrdir}"
git init
git remote add origin https://github.com/openthread/ot-br-posix.git
git fetch origin "${otbr_commit}" --depth 1
git reset --hard FETCH_HEAD
git submodule update --init --recursive --depth 1
rm -rf third_party/openthread/repo
cp -r "${otdir}" third_party/openthread/repo
rm -rf .git
docker build -t "${otbr_docker_image}" -f etc/docker/Dockerfile . --build-arg REFERENCE_DEVICE=1 --build-arg OT_BACKBONE_CI=1 --build-arg OTBR_OPTIONS="${otbr_options}"
)
rm -rf "${otbrdir}"
}
do_expect()
{
local ot_command
@@ -448,6 +512,16 @@ main()
shift
do_cert_suite "$@"
;;
cert_bbr)
shift
do_cert_bbr "$@"
;;
get_thread_wireshark)
do_get_thread_wireshark
;;
build_otbr_docker)
do_build_otbr_docker
;;
unit)
do_unit
;;
+177
View File
@@ -0,0 +1,177 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import unittest
import config
import thread_cert
# Test description: Here is the test case `5.11.1 DUA-TC-04: DUA re-registration`
#
# Topology:
# -----------(eth)----------------
# | |
# Router_1----BR_1---HOST
# \ /
# Router_2
#
from pktverify.packet_verifier import PacketVerifier
BR_1 = 1
ROUTER1 = 2
ROUTER2 = 3
HOST = 4
class BBR_5_11_01(thread_cert.TestCase):
USE_MESSAGE_FACTORY = False
TOPOLOGY = {
BR_1: {
'name': 'BR_1',
'whitelist': [ROUTER1, ROUTER2],
'is_otbr': True,
'version': '1.2',
'router_selection_jitter': 1,
},
ROUTER1: {
'name': 'Router_1',
'whitelist': [ROUTER2, BR_1],
'version': '1.2',
'router_selection_jitter': 1,
},
ROUTER2: {
'name': 'Router_2',
'whitelist': [ROUTER1, BR_1],
'version': '1.2',
'router_selection_jitter': 1,
},
HOST: {
'name': 'Host',
'is_host': True
},
}
def test(self):
self.nodes[HOST].start()
# P1: Router_1 is configured with leader weight of 72 in case the test is executed on a CCM network
self.nodes[ROUTER1].set_weight(72)
self.nodes[ROUTER1].start()
self.simulator.go(5)
self.assertEqual('leader', self.nodes[ROUTER1].get_state())
self.nodes[ROUTER2].start()
self.simulator.go(5)
self.assertEqual('router', self.nodes[ROUTER2].get_state())
self.nodes[BR_1].start()
self.simulator.go(5)
self.assertEqual('router', self.nodes[BR_1].get_state())
self.nodes[BR_1].enable_backbone_router()
self.simulator.go(3)
self.assertTrue(self.nodes[BR_1].is_primary_backbone_router)
self.nodes[BR_1].add_prefix(config.DOMAIN_PREFIX, "parosD")
self.nodes[BR_1].register_netdata()
self.simulator.go(5)
self.assertIsNotNone(self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.DUA))
self.simulator.go(10) # must wait for DUA_DAD_REPEATS to complete
logging.info("Host addresses: %r", self.nodes[HOST].get_addrs())
self.assertGreaterEqual(len(self.nodes[HOST].get_addrs()), 2)
self.collect_ipaddrs()
self.collect_rloc16s()
Dg = self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.DUA)
self.collect_extra_vars(Dg=Dg)
logging.info("BR_1 addrs: %r", self.nodes[BR_1].get_addrs())
logging.info("Host addrs: %r", self.nodes[HOST].get_addrs())
# BR_1 and Host can ping each other on the Backbone link
self.assertTrue(self.nodes[HOST].ping(self.nodes[BR_1].get_ip6_address(config.ADDRESS_TYPE.BACKBONE_GUA),
backbone=True))
self.assertTrue(self.nodes[BR_1].ping(self.nodes[HOST].get_ip6_address(config.ADDRESS_TYPE.BACKBONE_GUA),
backbone=True))
# Step 23: Host sends ping packet to destination Dg
self.assertFalse(self.nodes[HOST].ping(Dg,
backbone=True)) # Must fail since ND Proxying is not implemented yet
def verify(self, pv: PacketVerifier):
pkts = pv.pkts
pv.add_common_vars()
pv.summary.show()
pv.verify_attached('BR_1')
MM = pv.vars['MM_PORT']
BB = pv.vars['BB_PORT']
BR_1 = pv.vars['BR_1']
BR_1_ETH = pv.vars['BR_1_ETH']
Host_ETH = pv.vars['Host_ETH']
BR_1_BGUA = pv.vars['BR_1_BGUA']
Host_BGUA = pv.vars['Host_BGUA']
Dg = pv.vars['Dg'] # DUA of Router_2
# Step 3: BR_1: Checks received Network Data and determines that it needs to send its BBR Dataset to the
# leader to become primary BBR.
pkts.filter_wpan_src64(BR_1).filter_coap_request('/a/sd', port=MM).must_next().must_verify("""
thread_nwd.tlv.server_16 is not null
and thread_nwd.tlv.service.s_data.seqno is not null
and thread_nwd.tlv.service.s_data.rrdelay is not null
and thread_nwd.tlv.service.s_data.mlrtimeout is not null
""")
# Step 9: BR_1: Responds to the DUA registration.
pkts.filter_wpan_src64(BR_1).filter_coap_ack('/n/dr',
port=MM).must_next().must_verify('thread_nm.tlv.status == 0')
# TODO: (DUA) Step 10: BR_1: Performs DAD on the backbone link.
pkts.filter_eth_src(BR_1_ETH).filter_coap_request('/b/bq', port=BB).must_not_next()
# Verify Host ping BBR
pkts.filter_eth_src(Host_ETH).filter_ipv6_src_dst(Host_BGUA, BR_1_BGUA).filter_ping_request().must_next()
pkts.filter_eth_src(BR_1_ETH).filter_ipv6_src_dst(BR_1_BGUA, Host_BGUA).filter_ping_reply().must_next()
# Verify BR_1 ping Host
pkts.filter_eth_src(BR_1_ETH).filter_ipv6_src_dst(BR_1_BGUA, Host_BGUA).filter_ping_request().must_next()
pkts.filter_eth_src(Host_ETH).filter_ipv6_src_dst(Host_BGUA, BR_1_BGUA).filter_ping_reply().must_next()
# Step 16: Host: Queries DUA, Dg, with ND-NS
# TODO: setup radvd on Host
pkts.filter_eth_src(Host_ETH).filter_icmpv6_nd_ns(Dg).must_not_next()
# Step 17: BR_1: Responds with a neighbor advertisement.
# TODO: (DUA) implement ND proxy on PBBR
pkts.filter_eth_src(BR_1_ETH).filter_icmpv6_nd_na(Dg).must_not_next()
if __name__ == '__main__':
unittest.main()
+28 -8
View File
@@ -26,10 +26,9 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from enum import Enum
import os
from enum import Enum
from tlvs_parsing import SubTlvsFactory
import coap
import dtls
import ipv6
@@ -43,6 +42,7 @@ import network_diag
import network_layer
import simulator
import sniffer
from tlvs_parsing import SubTlvsFactory
# This extended address will generate the MESH_LOCAL_PREFIX
EXTENDED_PANID = '000db80000000000'
@@ -60,8 +60,18 @@ LINK_LOCAL_ALL_NODES_ADDRESS = 'ff02::1'
LINK_LOCAL_ALL_ROUTERS_ADDRESS = 'ff02::2'
DOMAIN_PREFIX = 'fd00:7d03:7d03:7d03::/64'
DOMAIN_PREFIX_REGEX_PATTERN = '^fd00:7d03:7d03:7d03:'
DOMAIN_PREFIX_ALTER = 'fd00:7d04:7d04:7d04::/64'
PORT_OFFSET = int(os.getenv('PORT_OFFSET', '0'))
BACKBONE_PREFIX = f'{0x9100 + PORT_OFFSET:04x}::/64'
BACKBONE_PREFIX_REGEX_PATTERN = f'^{0x9100 + PORT_OFFSET:04x}:'
BACKBONE_DOCKER_NETWORK_NAME = f'backbone{PORT_OFFSET}'
OTBR_DOCKER_IMAGE = os.getenv('OTBR_DOCKER_IMAGE', 'otbr-ot12-backbone-ci')
OTBR_DOCKER_NAME_PREFIX = f'otbr_{PORT_OFFSET}_'
OTBR_COMMIT = os.getenv('OTBR_COMMIT', 'master')
ALL_NETWORK_BBRS_ADDRESS = 'ff32:40:fd00:db8:0:0:0:3'
ALL_DOMAIN_BBRS_ADDRESS = 'ff32:40:fd00:7d03:7d03:7d03:0:3'
@@ -86,7 +96,17 @@ DEFAULT_MASTER_KEY = bytearray([
0xff,
])
ADDRESS_TYPE = Enum('ADDRESS_TYPE', ('LINK_LOCAL', 'GLOBAL', 'RLOC', 'ALOC', 'ML_EID'))
class ADDRESS_TYPE(Enum):
LINK_LOCAL = 'LINK_LOCAL'
GLOBAL = 'GLOBAL'
RLOC = 'RLOC'
ALOC = 'ALOC'
ML_EID = 'ML_EID'
DUA = 'DUA'
BACKBONE_GUA = 'BACKBONE_GUA'
RSSI = {
'LINK_QULITY_0': -100,
'LINK_QULITY_1': -95,
@@ -493,11 +513,11 @@ def create_default_thread_message_factory(master_key=DEFAULT_MASTER_KEY):
return message.MessageFactory(lowpan_parser=lowpan_parser)
def create_default_thread_sniffer():
return sniffer.Sniffer(create_default_thread_message_factory())
def create_default_thread_sniffer(use_message_factory=True):
return sniffer.Sniffer(create_default_thread_message_factory() if use_message_factory else None)
def create_default_simulator():
def create_default_simulator(use_message_factory=True):
if VIRTUAL_TIME:
return simulator.VirtualTime()
return simulator.RealTime()
return simulator.VirtualTime(use_message_factory=use_message_factory)
return simulator.RealTime(use_message_factory=use_message_factory)
+346 -43
View File
@@ -27,27 +27,191 @@
# POSSIBILITY OF SUCH DAMAGE.
#
import config
import binascii
import ipaddress
import logging
import os
import re
import socket
import subprocess
import sys
import time
import traceback
import unittest
from typing import Union, Dict, Optional, List
import pexpect
import pexpect.popen_spawn
import re
import config
import simulator
import socket
import time
import unittest
import binascii
from typing import Union, Dict
PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0"))
class Node:
class OtbrDocker:
_socat_proc = None
_ot_rcp_proc = None
_docker_proc = None
def __init__(self, nodeid, is_mtd=False, simulator=None, name=None, version=None, is_bbr=False):
self.nodeid = nodeid
self.name = name or ('Node%d' % nodeid)
def __init__(self, nodeid: int, **kwargs):
try:
self._docker_name = config.OTBR_DOCKER_NAME_PREFIX + str(nodeid)
self._prepare_ot_rcp_sim(nodeid)
self._launch_docker()
except Exception:
traceback.print_exc()
self.destroy()
raise
def _prepare_ot_rcp_sim(self, nodeid: int):
self._socat_proc = subprocess.Popen(['socat', '-d', '-d', 'pty,raw,echo=0', 'pty,raw,echo=0'],
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL)
line = self._socat_proc.stderr.readline().decode('ascii').strip()
self._rcp_device_pty = rcp_device_pty = line[line.index('PTY is /dev') + 7:]
line = self._socat_proc.stderr.readline().decode('ascii').strip()
self._rcp_device = rcp_device = line[line.index('PTY is /dev') + 7:]
logging.info(f"socat running: device PTY: {rcp_device_pty}, device: {rcp_device}")
ot_rcp_path = self._get_ot_rcp_path()
self._ot_rcp_proc = subprocess.Popen(f"{ot_rcp_path} {nodeid} > {rcp_device_pty} < {rcp_device_pty}",
shell=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
def _get_ot_rcp_path(self) -> str:
srcdir = os.environ['top_builddir']
path = '%s/examples/apps/ncp/ot-rcp' % srcdir
logging.info("ot-rcp path: %s", path)
return path
def _launch_docker(self):
subprocess.check_call(f"docker rm -f {self._docker_name} || true", shell=True)
CI_ENV = os.getenv('CI_ENV', '').split()
self._docker_proc = subprocess.Popen(['docker', 'run'] + CI_ENV + [
'--rm',
'--name',
self._docker_name,
'--network',
config.BACKBONE_DOCKER_NETWORK_NAME,
'-i',
'--sysctl',
'net.ipv6.conf.all.disable_ipv6=0 net.ipv4.conf.all.forwarding=1 net.ipv6.conf.all.forwarding=1',
'--privileged',
'--cap-add=NET_ADMIN',
'--volume',
f'{self._rcp_device}:/dev/ttyUSB0',
config.OTBR_DOCKER_IMAGE,
],
stdin=subprocess.DEVNULL,
stdout=sys.stdout,
stderr=sys.stderr)
launch_docker_deadline = time.time() + 60
launch_ok = False
while time.time() < launch_docker_deadline:
try:
subprocess.check_call(f'docker exec -i {self._docker_name} ot-ctl state', shell=True)
launch_ok = True
logging.info("OTBR Docker %s Is Ready!", self._docker_name)
break
except subprocess.CalledProcessError:
time.sleep(0.2)
continue
assert launch_ok
cmd = f'docker exec -i {self._docker_name} ot-ctl'
self.pexpect = pexpect.popen_spawn.PopenSpawn(cmd, timeout=10)
# Add delay to ensure that the process is ready to receive commands.
timeout = 0.4
while timeout > 0:
self.pexpect.send('\r\n')
try:
self.pexpect.expect('> ', timeout=0.1)
break
except pexpect.TIMEOUT:
timeout -= 0.1
def __repr__(self):
return f'OtbrDocker<{self.nodeid}>'
def destroy(self):
logging.info("Destroying %s", self)
self._shutdown_docker()
self._shutdown_ot_rcp()
self._shutdown_socat()
def _shutdown_docker(self):
if self._docker_proc is not None:
COVERAGE = int(os.getenv('COVERAGE', '0'))
OTBR_COVERAGE = int(os.getenv('OTBR_COVERAGE', '0'))
if COVERAGE or OTBR_COVERAGE:
self.bash('service otbr-agent stop')
self.bash('curl https://codecov.io/bash -o codecov_bash --retry 5')
codecov_cmd = 'bash codecov_bash -Z'
# Upload OTBR code coverage if OTBR_COVERAGE=1, otherwise OpenThread code coverage.
if not OTBR_COVERAGE:
codecov_cmd += ' -R third_party/openthread/repo'
self.bash(codecov_cmd)
subprocess.check_call(f"docker rm -f {self._docker_name}", shell=True)
self._docker_proc.wait()
del self._docker_proc
def _shutdown_ot_rcp(self):
if self._ot_rcp_proc is not None:
self._ot_rcp_proc.kill()
self._ot_rcp_proc.wait()
del self._ot_rcp_proc
def _shutdown_socat(self):
if self._socat_proc is not None:
self._socat_proc.stderr.close()
self._socat_proc.kill()
self._socat_proc.wait()
del self._socat_proc
def bash(self, cmd: str) -> List[str]:
logging.info("%s $ %s", self, cmd)
proc = subprocess.Popen(['docker', 'exec', '-i', self._docker_name, 'bash', '-c', cmd],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=sys.stderr,
encoding='ascii')
with proc:
lines = []
while True:
line = proc.stdout.readline()
if not line:
break
lines.append(line)
logging.info("%s $ %s", self, line.rstrip('\r\n'))
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, cmd, ''.join(lines))
else:
return lines
class OtCli:
def __init__(self, nodeid, is_mtd=False, version=None, is_bbr=False, **kwargs):
self.verbose = int(float(os.getenv('VERBOSE', 0)))
self.node_type = os.getenv('NODE_TYPE', 'sim')
self.env_version = os.getenv('THREAD_VERSION', '1.1')
@@ -59,10 +223,6 @@ class Node:
else:
self.version = self.env_version
self.simulator = simulator
if self.simulator:
self.simulator.add_node(self)
mode = os.environ.get('USE_MTD') == '1' and is_mtd and 'mtd' or 'ftd'
if self.node_type == 'soc':
@@ -78,8 +238,6 @@ class Node:
self._initialized = True
self.set_extpanid(config.EXTENDED_PANID)
def __init_sim(self, nodeid, mode):
""" Initialize a simulation node. """
@@ -216,6 +374,45 @@ class Node:
self._expect('spinel-cli >')
self.debug(int(os.getenv('DEBUG', '0')))
def __init_soc(self, nodeid):
""" Initialize a System-on-a-chip node connected via UART. """
import fdpexpect
serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2)
self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY))
def __del__(self):
self.destroy()
def destroy(self):
if not self._initialized:
return
if (hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or
not hasattr(self.pexpect, 'proc') and self.pexpect.isalive()):
print("%d: exit" % self.nodeid)
self.pexpect.send('exit\n')
self.pexpect.expect(pexpect.EOF)
self.pexpect.wait()
self._initialized = False
class NodeImpl:
is_host = False
is_otbr = False
def __init__(self, nodeid, name=None, simulator=None, **kwargs):
self.nodeid = nodeid
self.name = name or ('Node%d' % nodeid)
self.simulator = simulator
if self.simulator:
self.simulator.add_node(self)
super().__init__(nodeid, **kwargs)
self.set_extpanid(config.EXTENDED_PANID)
def _expect(self, pattern, timeout=-1, *args, **kwargs):
""" Process simulator events until expected the pattern. """
if timeout == -1:
@@ -262,7 +459,7 @@ class Node:
The matched line.
"""
results = self._expect_results(pattern, *args, **kwargs)
assert len(results) == 1
assert len(results) == 1, results
return results[0]
def _expect_results(self, pattern, *args, **kwargs):
@@ -312,28 +509,6 @@ class Node:
print(f'_expect_command_output({cmd!r}) returns {lines!r}')
return lines
def __init_soc(self, nodeid):
""" Initialize a System-on-a-chip node connected via UART. """
import fdpexpect
serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2)
self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY))
def __del__(self):
self.destroy()
def destroy(self):
if not self._initialized:
return
if (hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or
not hasattr(self.pexpect, 'proc') and self.pexpect.isalive()):
print("%d: exit" % self.nodeid)
self.pexpect.send('exit\n')
self.pexpect.expect(pexpect.EOF)
self.pexpect.wait()
self._initialized = False
def read_cert_messages_in_commissioning_log(self, timeout=-1):
"""Get the log of the traffic after DTLS handshake.
"""
@@ -497,6 +672,10 @@ class Node:
self.send_command('bbr state')
return self._expect_result(states)
@property
def is_primary_backbone_router(self) -> bool:
return self.get_backbone_router_state() == 'Primary'
def get_backbone_router(self):
cmd = 'bbr config'
self.send_command(cmd)
@@ -791,7 +970,7 @@ class Node:
self._expect('Done')
def get_state(self):
states = [r'detached', r'child', r'router', r'leader']
states = [r'detached', r'child', r'router', r'leader', r'disabled']
self.send_command('state')
return self._expect_result(states)
@@ -969,6 +1148,13 @@ class Node:
return None
def __getDua(self) -> Optional[str]:
for ip6Addr in self.get_addrs():
if re.match(config.DOMAIN_PREFIX_REGEX_PATTERN, ip6Addr, re.I):
return ip6Addr
return None
def get_ip6_address(self, address_type):
"""Get specific type of IPv6 address configured on thread device.
@@ -988,11 +1174,13 @@ class Node:
return self.__getAloc()
elif address_type == config.ADDRESS_TYPE.ML_EID:
return self.__getMleid()
elif address_type == config.ADDRESS_TYPE.DUA:
return self.__getDua()
elif address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
return self._getBackboneGua()
else:
return None
return None
def get_context_reuse_delay(self):
self.send_command('contextreusedelay')
return self._expect_result(r'\d+')
@@ -1679,5 +1867,120 @@ class Node:
return router_table
class Node(NodeImpl, OtCli):
pass
class LinuxHost():
PING_RESPONSE_PATTERN = re.compile(r'\d+ bytes from .*:.*')
ETH_DEV = 'eth0'
def get_ether_addrs(self):
output = self.bash(f'ip -6 addr list dev {self.ETH_DEV}')
addrs = []
for line in output:
# line example: "inet6 fe80::42:c0ff:fea8:903/64 scope link"
line = line.strip().split()
if line and line[0] == 'inet6':
addr = line[1]
if '/' in addr:
addr = addr.split('/')[0]
addrs.append(addr)
logging.debug('%s: get_ether_addrs: %r', self, addrs)
return addrs
def get_ether_mac(self):
output = self.bash(f'ip addr list dev {self.ETH_DEV}')
for line in output:
# link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
line = line.strip().split()
if line and line[0] == 'link/ether':
return line[1]
assert False, output
def ping_ether(self, ipaddr, num_responses=1, size=None, timeout=5) -> int:
cmd = f'ping -6 {ipaddr} -I eth0 -c {num_responses} -W {timeout}'
if size is not None:
cmd += f' -s {size}'
resp_count = 0
try:
for line in self.bash(cmd):
if self.PING_RESPONSE_PATTERN.match(line):
resp_count += 1
except subprocess.CalledProcessError:
pass
return resp_count
def _getBackboneGua(self) -> Optional[str]:
for ip6Addr in self.get_addrs():
if re.match(config.BACKBONE_PREFIX_REGEX_PATTERN, ip6Addr, re.I):
return ip6Addr
return None
def ping(self, *args, **kwargs):
backbone = kwargs.pop('backbone', False)
if backbone:
return self.ping_ether(*args, **kwargs)
else:
return super().ping(*args, **kwargs)
class OtbrNode(LinuxHost, NodeImpl, OtbrDocker):
is_otbr = True
is_bbr = True # OTBR is also BBR
def __repr__(self):
return f'Otbr<{self.nodeid}>'
def get_addrs(self) -> List[str]:
return super().get_addrs() + self.get_ether_addrs()
class HostNode(LinuxHost, OtbrDocker):
is_host = True
def __init__(self, nodeid, name=None, **kwargs):
self.nodeid = nodeid
self.name = name or ('Host%d' % nodeid)
super().__init__(nodeid, **kwargs)
def start(self):
# TODO: Use radvd to advertise the Domain Prefix on the Backbone link.
pass
def stop(self):
pass
def get_addrs(self) -> List[str]:
return self.get_ether_addrs()
def __repr__(self):
return f'Host<{self.nodeid}>'
def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
"""Get specific type of IPv6 address configured on thread device.
Args:
address_type: the config.ADDRESS_TYPE type of IPv6 address.
Returns:
IPv6 address string.
"""
assert address_type == config.ADDRESS_TYPE.BACKBONE_GUA
if address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
return self._getBackboneGua()
else:
return None
if __name__ == '__main__':
unittest.main()
+1 -1
View File
@@ -118,7 +118,7 @@ class Ipv6Addr(Bytes):
return self.startswith(consts.DOMAIN_PREFIX)
@property
def is_backbone(self) -> bool:
def is_backbone_gua(self) -> bool:
"""
Returns if the Ip6 address is Backbone address.
"""
@@ -31,7 +31,7 @@ from pktverify.addrs import Ipv6Addr
from pktverify.bytes import Bytes
DOMAIN_PREFIX = Bytes('fd00:7d03:7d03:7d03')
BACKBONE_IPV6_PREFIX = Bytes('2001:0db8:0001:0000')
BACKBONE_IPV6_PREFIX = Bytes('91')
LINK_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS = Ipv6Addr('ff32:40:fd00:db8::1')
REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS = Ipv6Addr('ff33:40:fd00:db8::1')
@@ -514,6 +514,7 @@ _LAYER_FIELDS = {
'thread_nm.tlv.type': _list(_auto),
'thread_nm.tlv.ml_eid': _ext_addr,
'thread_nm.tlv.target_eid': _ipv6_addr,
'thread_nm.tlv.status': _auto,
# thread_meshcop is not a real layer
'thread_meshcop.len_size_mismatch': _str,
'thread_meshcop.tlv.type': _list(_auto),
@@ -44,6 +44,7 @@ class PacketVerifier(object):
NET_NAME = "OpenThread"
MC_PORT = 49191
MM_PORT = 61631
BB_PORT = 61631
LLANMA = 'ff02::1' # Link-Local All Nodes multicast address
LLARMA = 'ff02::2' # Link-Local All Routers multicast address
RLANMA = 'ff03::1' # realm-local all-nodes multicast address
@@ -89,6 +90,7 @@ class PacketVerifier(object):
NET_NAME=PacketVerifier.NET_NAME,
MM_PORT=PacketVerifier.MM_PORT,
MC_PORT=PacketVerifier.MC_PORT,
BB_PORT=PacketVerifier.BB_PORT,
LLANMA=PacketVerifier.LLANMA, # Link-Local All Nodes multicast address
LLARMA=PacketVerifier.LLARMA, # Link-Local All Routers multicast address
RLANMA=PacketVerifier.RLANMA, # realm-local all-nodes multicast address
@@ -121,8 +123,11 @@ class PacketVerifier(object):
for addr in addrs:
if addr.is_dua:
key = name + '_DUA'
elif addr.is_backbone:
key = name + '_BBA'
elif addr.is_backbone_gua:
key = name + '_BGUA'
elif addr.is_link_local and (name + '_BGUA') in self._vars:
# FIXME: assume the link-local address after Backbone GUA is the Backbone Link Local address
key = name + '_BLLA'
elif addr.is_link_local:
key = name + '_LLA'
else:
@@ -171,8 +176,8 @@ class PacketVerifier(object):
:param td: TB's name.
:param bbr: BBR's name.
"""
assert self.is_wpan_device(td)
assert self.is_wpan_device(bbr) and self.is_eth_device(bbr), bbr
assert self.is_thread_device(td)
assert self.is_thread_device(bbr) and self.is_backbone_device(bbr), bbr
if pkts is None:
pkts = self.pkts
@@ -337,7 +342,7 @@ class PacketVerifier(object):
:param name: The device name.
"""
result = VerifyResult()
assert self.is_wpan_device(name), name
assert self.is_thread_device(name), name
pkts = pkts or self.pkts
extaddr = self.vars[name]
@@ -369,9 +374,9 @@ class PacketVerifier(object):
:return: The verification result.
"""
if bbr:
assert not (self.is_wpan_device(src) and self.is_wpan_device(dst)), \
assert not (self.is_thread_device(src) and self.is_thread_device(dst)), \
f"both {src} and {dst} are WPAN devices"
assert not (self.is_eth_device(src) and self.is_eth_device(dst)), \
assert not (self.is_backbone_device(src) and self.is_backbone_device(dst)), \
f"both {src} and {dst} are ETH devices"
if pkts is None:
@@ -385,7 +390,7 @@ class PacketVerifier(object):
result = VerifyResult()
ping_req = pkts.filter_ping_request().filter_ipv6_dst(dst_dua)
if self.is_eth_device(src):
if self.is_backbone_device(src):
p = ping_req.filter_eth_src(self.vars[src + '_ETH']).must_next()
else:
p = ping_req.filter_wpan_src64(self.vars[src]).must_next()
@@ -398,7 +403,7 @@ class PacketVerifier(object):
# BBR unicasts the ping packet to TD.
if bbr:
if self.is_eth_device(src):
if self.is_backbone_device(src):
ping_req.filter_wpan_src64(bbr_ext).must_next()
else:
ping_req.filter_eth_src(bbr_eth).must_next()
@@ -406,7 +411,7 @@ class PacketVerifier(object):
ping_reply = pkts.filter_ping_reply().filter_ipv6_dst(src_dua).filter(
lambda p: p.icmpv6.echo.identifier == ping_id)
# TD receives ping packet and responds back to Host via SBBR.
if self.is_wpan_device(dst):
if self.is_thread_device(dst):
ping_reply.filter_wpan_src64(self.vars[dst]).must_next()
else:
ping_reply.filter_eth_src(self.vars[dst + '_ETH']).must_next()
@@ -415,14 +420,14 @@ class PacketVerifier(object):
if bbr:
# SBBR forwards the ping response packet to Host.
if self.is_wpan_device(dst):
if self.is_thread_device(dst):
ping_reply.filter_eth_src(bbr_eth).must_next()
else:
ping_reply.filter_wpan_src64(bbr_ext).must_next()
return result
def is_wpan_device(self, name: str) -> bool:
def is_thread_device(self, name: str) -> bool:
"""
Returns if the device is an WPAN device.
@@ -432,9 +437,9 @@ class PacketVerifier(object):
"""
assert isinstance(name, str), name
return name in self.test_info.extaddrs
return name in self.vars
def is_eth_device(self, name: str) -> bool:
def is_backbone_device(self, name: str) -> bool:
"""
Returns if the device s an Ethernet device.
@@ -444,7 +449,7 @@ class PacketVerifier(object):
"""
assert isinstance(name, str), name
return name in self.test_info.ethaddrs
return f'{name}_ETH' in self.vars
def max_index(self, *indexes: Tuple[int, int]) -> Tuple[int, int]:
wpan_idx = 0
@@ -80,17 +80,6 @@ def make_filter_func(func: Union[str, Callable], **vars) -> Callable:
return func
def _install_travis_thread_wireshark():
logging.info("downloading thread-wireshark from https://github.com/openthread/wireshark/releases ...")
download_url = 'https://github.com/openthread/wireshark/releases/download/ot-pktverify-20200727/thread-wireshark.tar.gz'
save_file = '/tmp/thread-wireshark.tar.gz'
subprocess.check_call(f'curl -L {download_url} -o {save_file}', shell=True)
subprocess.check_call(f'tar -C /tmp -xvzf {save_file}', shell=True)
assert os.path.isdir('/tmp/thread-wireshark')
def _setup_wireshark_disabled_protos():
home = os.environ['HOME']
wireshark_config_dir = os.path.join(home, '.config', 'wireshark')
@@ -123,11 +112,7 @@ def get_wireshark_dir() -> str:
:return: The path to wireshark directory.
"""
dir = '/tmp/thread-wireshark'
if not os.path.exists(dir):
_install_travis_thread_wireshark()
_setup_wireshark_disabled_protos()
return dir
+136
View File
@@ -0,0 +1,136 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import multiprocessing
import os
import subprocess
import sys
import traceback
from collections import Counter
import config
MULTIPLE_JOBS = 10
logging.basicConfig(level=logging.DEBUG,
format='File "%(pathname)s", line %(lineno)d, in %(funcName)s\n'
'%(asctime)s - %(levelname)s - %(message)s')
def bash(cmd: str, check=True, stdout=None):
subprocess.run(cmd, shell=True, check=check, stdout=stdout)
def run_bbr_test(port_offset: int, script: str):
try:
logging.info("Running BBR test: %s ...", script)
test_name = os.path.splitext(os.path.basename(script))[0] + '_' + str(port_offset)
logfile = test_name + '.log'
env = os.environ.copy()
env['PORT_OFFSET'] = str(port_offset)
env['TEST_NAME'] = test_name
try:
with open(logfile, 'wt') as output:
subprocess.check_call(["python3", script],
stdout=output,
stderr=output,
stdin=subprocess.DEVNULL,
env=env)
except subprocess.CalledProcessError:
bash(f'cat {logfile} 1>&2')
logging.error("Run test %s failed, please check the log file: %s", test_name, logfile)
raise
except Exception:
traceback.print_exc()
raise
pool = multiprocessing.Pool(processes=MULTIPLE_JOBS)
def cleanup_env():
logging.info("Cleaning up Backbone testing environment ...")
bash('pkill socat 2>/dev/null || true')
bash('pkill dumpcap 2>/dev/null || true')
bash(f'docker rm -f $(docker ps -a -q -f "name=otbr_") 2>/dev/null || true')
bash(f'docker network rm $(docker network ls -q -f "name=backbone") 2>/dev/null || true')
def setup_env():
bash(f'docker image inspect {config.OTBR_DOCKER_IMAGE} >/dev/null')
bash('mkdir build || true')
def parse_args():
import argparse
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--multiply', type=int, default=1, help='run each test for multiple times')
parser.add_argument("scripts", nargs='+', type=str, help='specify Backbone test scripts')
args = parser.parse_args()
logging.info("Multiply: %d", args.multiply)
logging.info("Test scripts: %s", args.scripts)
return args
def main():
args = parse_args()
cleanup_env()
setup_env()
script_fail_count = Counter()
def error_callback(script, err):
logging.error("Test %s failed: %s", script, err)
script_fail_count[script] += 1
# Run each script for multiple times
scripts = args.scripts * args.multiply
for i, script in enumerate(scripts):
pool.apply_async(run_bbr_test, [i, script],
error_callback=lambda err, script=script: error_callback(script, err))
pool.close()
logging.info("Waiting for tests to complete ...")
pool.join()
cleanup_env()
for script in args.scripts:
logging.info("Test %s: %d PASS/%d TOTAL", script, args.multiply - script_fail_count[script], args.multiply)
exit(len(script_fail_count))
if __name__ == '__main__':
main()
+10 -7
View File
@@ -91,9 +91,9 @@ class BaseSimulator(object):
class RealTime(BaseSimulator):
def __init__(self):
def __init__(self, use_message_factory=True):
super(RealTime, self).__init__()
self._sniffer = config.create_default_thread_sniffer()
self._sniffer = config.create_default_thread_sniffer(use_message_factory=use_message_factory)
self._sniffer.start()
def set_lowpan_context(self, cid, prefix):
@@ -147,7 +147,7 @@ class VirtualTime(BaseSimulator):
RADIO_ONLY = os.getenv('RADIO_DEVICE') is not None
NCP_SIM = os.getenv('NODE_TYPE', 'sim') == 'ncp-sim'
def __init__(self):
def __init__(self, use_message_factory=True):
super(VirtualTime, self).__init__()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -169,7 +169,8 @@ class VirtualTime(BaseSimulator):
self.current_nodeid = None
self._pause_time = 0
self._message_factory = config.create_default_thread_message_factory()
if use_message_factory:
self._message_factory = config.create_default_thread_message_factory()
def __del__(self):
if self.sock:
@@ -189,8 +190,9 @@ class VirtualTime(BaseSimulator):
# Ignore any exceptions
try:
messages = self._message_factory.create(io.BytesIO(message_obj))
self.devices[addr]['msgs'] += messages
if self._message_factory is not None:
messages = self._message_factory.create(io.BytesIO(message_obj))
self.devices[addr]['msgs'] += messages
except message.DropPacketException:
print('Drop current packet because it cannot be handled in test scripts')
@@ -200,7 +202,8 @@ class VirtualTime(BaseSimulator):
traceback.print_exc()
def set_lowpan_context(self, cid, prefix):
self._message_factory.set_lowpan_context(cid, prefix)
if self._message_factory is not None:
self._message_factory.set_lowpan_context(cid, prefix)
def get_messages_sent_by(self, nodeid):
""" Get sniffed messages.
+165 -24
View File
@@ -28,20 +28,25 @@
#
import json
import logging
import os
import signal
import subprocess
import sys
import time
import traceback
import unittest
from typing import Optional
import config
import debug
from node import Node
from node import Node, OtbrNode, HostNode
from pktverify import utils as pvutils
PACKET_VERIFICATION = int(os.getenv('PACKET_VERIFICATION', 0))
if PACKET_VERIFICATION:
from pktverify.addrs import ExtAddr
from pktverify.addrs import ExtAddr, EthAddr
from pktverify.packet_verifier import PacketVerifier
PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0"))
@@ -51,6 +56,8 @@ ENV_THREAD_VERSION = os.getenv('THREAD_VERSION', '1.1')
DEFAULT_PARAMS = {
'is_mtd': False,
'is_bbr': False,
'is_otbr': False,
'is_host': False,
'mode': 'rsdn',
'panid': 0xface,
'allowlist': None,
@@ -82,39 +89,73 @@ class TestCase(NcpSupportMixin, unittest.TestCase):
The `topology` member of sub-class is used to create test topology.
"""
USE_MESSAGE_FACTORY = True
TOPOLOGY = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
self._start_time = None
self._do_packet_verification = PACKET_VERIFICATION and hasattr(self, 'verify')
def setUp(self):
try:
self._setUp()
except:
traceback.print_exc()
for node in list(self.nodes.values()):
try:
node.destroy()
except Exception:
traceback.print_exc()
raise
def _setUp(self):
"""Create simulator, nodes and apply configurations.
"""
self._clean_up_tmp()
self.simulator = config.create_default_simulator()
self.simulator = config.create_default_simulator(use_message_factory=self.USE_MESSAGE_FACTORY)
self.nodes = {}
os.environ['LD_LIBRARY_PATH'] = '/tmp/thread-wireshark'
if self._has_backbone_traffic():
self._prepare_backbone_network()
self._start_backbone_sniffer()
self._initial_topology = initial_topology = {}
for i, params in self.TOPOLOGY.items():
if params:
params = dict(DEFAULT_PARAMS, **params)
else:
params = DEFAULT_PARAMS.copy()
params = self._parse_params(params)
initial_topology[i] = params
self.nodes[i] = Node(
logging.info("Creating node %d: %r", i, params)
if params['is_otbr']:
nodeclass = OtbrNode
elif params['is_host']:
nodeclass = HostNode
else:
nodeclass = Node
node = nodeclass(
i,
params['is_mtd'],
is_mtd=params['is_mtd'],
simulator=self.simulator,
name=params.get('name'),
version=params['version'],
is_bbr=params['is_bbr'],
)
self.nodes[i] = node
if node.is_host:
continue
self.nodes[i].set_panid(params['panid'])
self.nodes[i].set_mode(params['mode'])
@@ -188,22 +229,31 @@ class TestCase(NcpSupportMixin, unittest.TestCase):
"""
if self._do_packet_verification and os.uname().sysname != "Linux":
raise NotImplementedError(
f'{self.testcase_name}: Packet Verification not available on {os.uname().sysname} (Linux only).')
f'{self.test_name}: Packet Verification not available on {os.uname().sysname} (Linux only).')
if self._do_packet_verification:
time.sleep(3)
if self._has_backbone_traffic():
# Stop Backbone sniffer before stopping nodes so that we don't capture Codecov Uploading traffic
self._stop_backbone_sniffer()
for node in list(self.nodes.values()):
node.stop()
node.destroy()
self.simulator.stop()
if self._has_backbone_traffic():
self._remove_backbone_network()
pcap_filename = self._merge_thread_backbone_pcaps()
else:
pcap_filename = self._get_thread_pcap_filename()
if self._do_packet_verification:
self._test_info['pcap'] = self._get_pcap_filename()
self._test_info['pcap'] = pcap_filename
test_info_path = self._output_test_info()
os.environ['LD_LIBRARY_PATH'] = '/tmp/thread-wireshark'
self._verify_packets(test_info_path)
def flush_all(self):
@@ -236,8 +286,8 @@ class TestCase(NcpSupportMixin, unittest.TestCase):
print("Packet verification passed: %s" % test_info_path, file=sys.stderr)
@property
def testcase_name(self):
return os.path.splitext(os.path.basename(sys.argv[0]))[0]
def test_name(self):
return os.getenv('TEST_NAME', 'current')
def collect_ipaddrs(self):
if not self._do_packet_verification:
@@ -248,8 +298,9 @@ class TestCase(NcpSupportMixin, unittest.TestCase):
for i, node in self.nodes.items():
ipaddrs = node.get_addrs()
test_info['ipaddrs'][i] = ipaddrs
mleid = node.get_mleid()
test_info['mleids'][i] = mleid
if not node.is_host:
mleid = node.get_mleid()
test_info['mleids'][i] = mleid
def collect_rloc16s(self):
if not self._do_packet_verification:
@@ -259,7 +310,8 @@ class TestCase(NcpSupportMixin, unittest.TestCase):
test_info['rloc16s'] = {}
for i, node in self.nodes.items():
test_info['rloc16s'][i] = '0x%04x' % node.get_addr16()
if not node.is_host:
test_info['rloc16s'][i] = '0x%04x' % node.get_addr16()
def collect_rlocs(self):
if not self._do_packet_verification:
@@ -296,7 +348,7 @@ class TestCase(NcpSupportMixin, unittest.TestCase):
return
test_info = self._test_info = {
'testcase': self.testcase_name,
'testcase': self.test_name,
'start_time': time.ctime(self._start_time),
'pcap': '',
'extaddrs': {},
@@ -304,24 +356,38 @@ class TestCase(NcpSupportMixin, unittest.TestCase):
'ipaddrs': {},
'mleids': {},
'topology': self._initial_topology,
'backbone': {
'interface': config.BACKBONE_DOCKER_NETWORK_NAME,
'prefix': config.BACKBONE_PREFIX,
},
'otbr_commit': config.OTBR_COMMIT,
'domain_prefix': config.DOMAIN_PREFIX,
'env': {
'PORT_OFFSET': config.PORT_OFFSET,
},
}
for i, node in self.nodes.items():
extaddr = node.get_addr64()
test_info['extaddrs'][i] = ExtAddr(extaddr).format_octets()
if not node.is_host:
extaddr = node.get_addr64()
test_info['extaddrs'][i] = ExtAddr(extaddr).format_octets()
if node.is_host or node.is_otbr:
ethaddr = node.get_ether_mac()
test_info['ethaddrs'][i] = EthAddr(ethaddr).format_octets()
def _output_test_info(self):
"""
Output test info to json file after tearDown
"""
filename = f'{self.testcase_name}.json'
filename = f'{self.test_name}.json'
with open(filename, 'wt') as ofd:
ofd.write(json.dumps(self._test_info, indent=1, sort_keys=True))
return filename
def _get_pcap_filename(self):
current_pcap = os.getenv('TEST_NAME', 'current') + '.pcap'
def _get_thread_pcap_filename(self):
current_pcap = self.test_name + '.pcap'
return os.path.abspath(current_pcap)
def assure_run_ok(self, cmd, shell=False):
@@ -330,3 +396,78 @@ class TestCase(NcpSupportMixin, unittest.TestCase):
proc = subprocess.run(cmd, stdout=sys.stdout, stderr=sys.stderr, shell=shell)
print(">>> %s => %d" % (cmd, proc.returncode), file=sys.stderr)
proc.check_returncode()
def _parse_params(self, params: Optional[dict]) -> dict:
params = params or {}
if params.get('is_bbr') or params.get('is_otbr'):
# BBRs must use thread version 1.2
assert params.get('version', '1.2') == '1.2', params
params['version'] = '1.2'
elif params.get('is_host'):
# Hosts must not specify thread version
assert params.get('version', '') == '', params
params['version'] = ''
if params:
params = dict(DEFAULT_PARAMS, **params)
else:
params = DEFAULT_PARAMS.copy()
return params
def _has_backbone_traffic(self):
for param in self.TOPOLOGY.values():
if param and (param.get('is_otbr') or param.get('is_host')):
return True
return False
def _prepare_backbone_network(self):
network_name = config.BACKBONE_DOCKER_NETWORK_NAME
self.assure_run_ok(
f'docker network create --driver bridge --ipv6 --subnet {config.BACKBONE_PREFIX} -o "com.docker.network.bridge.name"="{network_name}" {network_name} || true',
shell=True)
def _remove_backbone_network(self):
network_name = config.BACKBONE_DOCKER_NETWORK_NAME
self.assure_run_ok(f'docker network rm {network_name}', shell=True)
def _start_backbone_sniffer(self):
# don't know why but I have to create the empty bbr.pcap first, otherwise tshark won't work
# self.assure_run_ok("truncate --size 0 bbr.pcap && chmod 664 bbr.pcap", shell=True)
pcap_file = self._get_backbone_pcap_filename()
try:
os.remove(pcap_file)
except FileNotFoundError:
pass
dumpcap = pvutils.which_dumpcap()
self._dumpcap_proc = subprocess.Popen([dumpcap, '-i', config.BACKBONE_DOCKER_NETWORK_NAME, '-w', pcap_file],
stdout=sys.stdout,
stderr=sys.stderr)
time.sleep(0.2)
assert self._dumpcap_proc.poll() is None, 'tshark terminated unexpectedly'
logging.info('Backbone sniffer launched successfully: pid=%s', self._dumpcap_proc.pid)
def _get_backbone_pcap_filename(self):
backbone_pcap = self.test_name + '_backbone.pcap'
return os.path.abspath(backbone_pcap)
def _get_merged_pcap_filename(self):
backbone_pcap = self.test_name + '_merged.pcap'
return os.path.abspath(backbone_pcap)
def _stop_backbone_sniffer(self):
self._dumpcap_proc.send_signal(signal.SIGTERM)
self._dumpcap_proc.__exit__(None, None, None)
logging.info('Backbone sniffer terminated successfully: pid=%s' % self._dumpcap_proc.pid)
def _merge_thread_backbone_pcaps(self):
thread_pcap = self._get_thread_pcap_filename()
backbone_pcap = self._get_backbone_pcap_filename()
merged_pcap = self._get_merged_pcap_filename()
mergecap = pvutils.which_mergecap()
self.assure_run_ok(f'{mergecap} -w {merged_pcap} {thread_pcap} {backbone_pcap}', shell=True)
return merged_pcap