mirror of
https://github.com/espressif/openthread.git
synced 2026-06-05 21:14:49 +00:00
[otci] add support for IPv4, vendor commands, networkdiagnostics (#11397)
With Thread 1.4 the cli application not can also (dns) resolve IPv4 addresses. This commit adds the same support in otci * dns_resolve4 Implements support for vendor operations in otci get/set * vendor_name * vendor_model * vendor_sw_version Implements network diagnostic commands * get * reset * non_preferred_channels Various other (small changes)" * allow setting read timeout on serial connections * allow replacing read routine filter * expose latest thread versions in the public module api * expand the definition of dns_get_config * replaces mgmtget/mgmtset with the correct mgmtgetcommand and mgmtsetcommand * replaces addressmode with the correct addrmode * adds an `ignore_result` option to `execute_command` * adds a missing `diag` command * removes some unexisting getters
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
# OpenThread Control Interface
|
||||
|
||||
The OpenThread Control Interface (OTCI) is a library which provides uniform python interfaces to connect and control various kinds of devices running OpenThread.
|
||||
The OpenThread Control Interface (OTCI) is a library which provides uniform Python interfaces to connect and control various kinds of devices running OpenThread.
|
||||
|
||||
## Supported device types
|
||||
|
||||
@@ -22,7 +22,7 @@ node1 = otci.connect_otbr_ssh("192.168.1.101")
|
||||
# Connect to an OpenThread CLI device via Serial
|
||||
node2 = otci.connect_cli_serial("/dev/ttyACM0"))
|
||||
|
||||
# Start node1 to become Leader
|
||||
# Start node1 and wait for it to become the Leader
|
||||
node1.dataset_init_buffer()
|
||||
node1.dataset_set_buffer(network_name='test', network_key='00112233445566778899aabbccddeeff', panid=0xface, channel=11)
|
||||
node1.dataset_commit_buffer('active')
|
||||
@@ -32,7 +32,7 @@ node1.thread_start()
|
||||
node1.wait(5)
|
||||
assert node1.get_state() == "leader"
|
||||
|
||||
# Start Commissioner on node1
|
||||
# Start the Commissioner role on node1
|
||||
node1.commissioner_start()
|
||||
node1.wait(3)
|
||||
|
||||
@@ -42,11 +42,11 @@ node1.commissioner_add_joiner("TEST123",eui64='*')
|
||||
node2.ifconfig_up()
|
||||
node2.set_router_selection_jitter(1)
|
||||
|
||||
# Start Joiner on node2 to join the network
|
||||
# Start the Joiner role on node2 and wait for it to join the network
|
||||
node2.joiner_start("TEST123")
|
||||
node2.wait(10, expect_line="Join success")
|
||||
|
||||
# Wait for node 2 to become Router
|
||||
# Wait for node 2 to become a Router
|
||||
node2.thread_start()
|
||||
node2.wait(5)
|
||||
assert node2.get_state() == "router"
|
||||
|
||||
@@ -28,8 +28,9 @@
|
||||
#
|
||||
|
||||
from . import errors
|
||||
from .constants import THREAD_VERSION_1_1, THREAD_VERSION_1_2
|
||||
from .constants import THREAD_VERSION_1_1, THREAD_VERSION_1_2, THREAD_VERSION_1_3, THREAD_VERSION_1_4
|
||||
from .command_handlers import OTCommandHandler
|
||||
from .connectors import OtCliHandler
|
||||
from .otci import OTCI
|
||||
from .otci import (
|
||||
connect_cli_sim,
|
||||
@@ -56,6 +57,7 @@ _connectors = [
|
||||
__all__ = [
|
||||
'OTCI',
|
||||
'OTCommandHandler',
|
||||
'OTCliHandler',
|
||||
'errors',
|
||||
'Rloc16',
|
||||
'ChildId',
|
||||
|
||||
@@ -84,26 +84,30 @@ class OTCommandHandler(ABC):
|
||||
def shell(self, cmd: str, timeout: float) -> List[str]:
|
||||
raise NotImplementedError("shell command is not supported on %s" % self.__class__.__name__)
|
||||
|
||||
@classmethod
|
||||
def set_filter(cls, filter: re.Pattern[str]):
|
||||
return
|
||||
|
||||
|
||||
class OtCliCommandRunner(OTCommandHandler):
|
||||
__PATTERN_COMMAND_DONE_OR_ERROR = re.compile(
|
||||
r'(Done|Error|Error \d+:.*|.*: command not found)$') # "Error" for spinel-cli.py
|
||||
|
||||
__PATTERN_LOG_LINE = re.compile(r'((\[(NONE|CRIT|WARN|NOTE|INFO|DEBG)\])'
|
||||
__pattern_log_line = re.compile(r'((\[(NONE|CRIT|WARN|NOTE|INFO|DEBG)\])'
|
||||
r'|(-.*-+: )' # e.g. -CLI-----:
|
||||
r'|(\[[DINWC\-]\] (?=[\w\-]{14}:)\w+-*:)' # e.g. [I] Mac-----------:
|
||||
r')')
|
||||
"""regex used to filter logs"""
|
||||
|
||||
assert __PATTERN_LOG_LINE.match('[I] ChannelMonitor: debug log')
|
||||
assert __PATTERN_LOG_LINE.match('[I] Mac-----------: info log')
|
||||
assert __PATTERN_LOG_LINE.match('[N] MeshForwarder-: note log')
|
||||
assert __PATTERN_LOG_LINE.match('[W] Notifier------: warn log')
|
||||
assert __PATTERN_LOG_LINE.match('[C] Mle-----------: critical log')
|
||||
assert __PATTERN_LOG_LINE.match('[-] Settings------: none log')
|
||||
assert not __PATTERN_LOG_LINE.match('[-] Settings-----: none log') # not enough `-` after module name
|
||||
assert __pattern_log_line.match('[I] ChannelMonitor: debug log')
|
||||
assert __pattern_log_line.match('[I] Mac-----------: info log')
|
||||
assert __pattern_log_line.match('[N] MeshForwarder-: note log')
|
||||
assert __pattern_log_line.match('[W] Notifier------: warn log')
|
||||
assert __pattern_log_line.match('[C] Mle-----------: critical log')
|
||||
assert __pattern_log_line.match('[-] Settings------: none log')
|
||||
assert not __pattern_log_line.match('[-] Settings-----: none log') # not enough `-` after module name
|
||||
|
||||
__ASYNC_COMMANDS = {'scan', 'ping', 'discover'}
|
||||
__ASYNC_COMMANDS = {'scan', 'ping', 'discover', 'networkdiagnostic get'}
|
||||
|
||||
def __init__(self, otcli: OtCliHandler, is_spinel_cli: bool = False):
|
||||
self.__otcli: OtCliHandler = otcli
|
||||
@@ -134,7 +138,8 @@ class OtCliCommandRunner(OTCommandHandler):
|
||||
|
||||
output = self.__expect_line(timeout,
|
||||
OtCliCommandRunner.__PATTERN_COMMAND_DONE_OR_ERROR,
|
||||
asynchronous=cmd.split()[0] in OtCliCommandRunner.__ASYNC_COMMANDS)
|
||||
asynchronous=any(cmd.startswith(x) for x in OtCliCommandRunner.__ASYNC_COMMANDS))
|
||||
|
||||
return output
|
||||
|
||||
def execute_platform_command(self, cmd: str, timeout: float = 10) -> List[str]:
|
||||
@@ -162,6 +167,18 @@ class OtCliCommandRunner(OTCommandHandler):
|
||||
def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]):
|
||||
self.__line_read_callback = callback
|
||||
|
||||
@classmethod
|
||||
def set_filter(cls, filter: re.Pattern[str]):
|
||||
"""Set a different filter for the read routine that still matches the original filter"""
|
||||
assert filter.match('[I] ChannelMonitor: debug log')
|
||||
assert filter.match('[I] Mac-----------: info log')
|
||||
assert filter.match('[N] MeshForwarder-: note log')
|
||||
assert filter.match('[W] Notifier------: warn log')
|
||||
assert filter.match('[C] Mle-----------: critical log')
|
||||
assert filter.match('[-] Settings------: none log')
|
||||
assert not filter.match('[-] Settings-----: none log') # not enough `-` after module name
|
||||
cls.__pattern_log_line = filter
|
||||
|
||||
#
|
||||
# Private methods
|
||||
#
|
||||
@@ -216,6 +233,8 @@ class OtCliCommandRunner(OTCommandHandler):
|
||||
if line is None:
|
||||
break
|
||||
|
||||
line = line.rstrip()
|
||||
|
||||
if line.startswith('> '):
|
||||
line = line[2:]
|
||||
|
||||
@@ -224,8 +243,9 @@ class OtCliCommandRunner(OTCommandHandler):
|
||||
|
||||
logging.debug('%s: %s', self.__otcli, line)
|
||||
|
||||
if not OtCliCommandRunner.__PATTERN_LOG_LINE.match(line):
|
||||
logging.info('%s: %s', self.__otcli, line)
|
||||
if not OtCliCommandRunner.__pattern_log_line.match(line):
|
||||
if line:
|
||||
logging.info('%s: %s', self.__otcli, line)
|
||||
self.__pending_lines.put(line)
|
||||
|
||||
|
||||
|
||||
@@ -138,12 +138,12 @@ class OtNcpSim(OtCliPopen):
|
||||
class OtCliSerial(OtCliHandler):
|
||||
"""Connector for OT CLI SOC devices via Serial."""
|
||||
|
||||
def __init__(self, dev: str, baudrate: int):
|
||||
def __init__(self, dev: str, baudrate: int, timeout: float = 0.1):
|
||||
self.__dev = dev
|
||||
self.__baudrate = baudrate
|
||||
|
||||
import serial
|
||||
self.__serial = serial.Serial(self.__dev, self.__baudrate, timeout=0.1, exclusive=True)
|
||||
self.__serial = serial.Serial(self.__dev, self.__baudrate, timeout=timeout, exclusive=True)
|
||||
self.writeline('\r\n')
|
||||
self.__linebuffer = b''
|
||||
|
||||
|
||||
+262
-135
@@ -38,7 +38,7 @@ from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshComma
|
||||
from .command_handlers import OtbrAdbUsbCommandRunner
|
||||
from .connectors import Simulator
|
||||
from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError
|
||||
from .types import ChildId, Rloc16, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix
|
||||
from .types import ChildId, Rloc16, Ip4Addr, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix
|
||||
from .types import RouterTableEntry, NetifIdentifier
|
||||
from .utils import match_line, constant_property, bits_set
|
||||
|
||||
@@ -66,6 +66,9 @@ class OTCI(object):
|
||||
"""Gets the string representation of the OTCI instance."""
|
||||
return repr(self.__otcmd)
|
||||
|
||||
def set_filter(self, filter: re.Pattern[str]):
|
||||
self.__otcmd.set_filter(filter)
|
||||
|
||||
def wait(self, duration: float, expect_line: Optional[Union[str, Pattern[str], Collection[str]]] = None):
|
||||
"""Wait for a given duration.
|
||||
|
||||
@@ -98,21 +101,27 @@ class OTCI(object):
|
||||
cmd: str,
|
||||
timeout: float = 10,
|
||||
silent: bool = False,
|
||||
already_is_ok: bool = True) -> List[str]:
|
||||
already_is_ok: bool = True,
|
||||
ignore_result: bool = False) -> List[str]:
|
||||
for i in range(self.__exec_command_retry + 1):
|
||||
try:
|
||||
return self.__execute_command(cmd, timeout, silent, already_is_ok=already_is_ok)
|
||||
except Exception:
|
||||
return self.__execute_command(cmd,
|
||||
timeout,
|
||||
silent,
|
||||
already_is_ok=already_is_ok,
|
||||
ignore_result=ignore_result)
|
||||
except Exception as e:
|
||||
self.wait(2)
|
||||
if i == self.__exec_command_retry:
|
||||
raise
|
||||
raise e from None
|
||||
assert False
|
||||
|
||||
def __execute_command(self,
|
||||
cmd: str,
|
||||
timeout: float = 10,
|
||||
silent: bool = False,
|
||||
already_is_ok: bool = True) -> List[str]:
|
||||
already_is_ok: bool = True,
|
||||
ignore_result: bool = False) -> List[str]:
|
||||
"""Execute the OpenThread CLI command.
|
||||
|
||||
:param cmd: The command to execute.
|
||||
@@ -129,7 +138,7 @@ class OTCI(object):
|
||||
for line in output:
|
||||
self.log('info', '%s', line)
|
||||
|
||||
if cmd in ('reset', 'factoryreset'):
|
||||
if cmd in ('reset', 'factoryreset') or ignore_result:
|
||||
return output
|
||||
|
||||
if output[-1] == 'Done' or (already_is_ok and output[-1] == 'Error 24: Already'):
|
||||
@@ -206,6 +215,27 @@ class OTCI(object):
|
||||
# which would lead to ValueError.
|
||||
return 0
|
||||
|
||||
#
|
||||
# Vendor operations
|
||||
#
|
||||
def get_vendor_name(self) -> str:
|
||||
return self.__parse_str(self.execute_command('vendor name'))
|
||||
|
||||
def set_vendor_name(self, name: str):
|
||||
self.execute_command(f'vendor name {name}')
|
||||
|
||||
def get_vendor_model(self) -> str:
|
||||
return self.__parse_str(self.execute_command('vendor model'))
|
||||
|
||||
def set_vendor_model(self, model: str):
|
||||
self.execute_command(f'vendor model {model}')
|
||||
|
||||
def get_vendor_sw_version(self) -> str:
|
||||
return self.__parse_str(self.execute_command('vendor swversion'))
|
||||
|
||||
def set_vendor_sw_version(self, version: str):
|
||||
self.execute_command(f'vendor swversion {version}')
|
||||
|
||||
#
|
||||
# Basic device operations
|
||||
#
|
||||
@@ -834,10 +864,25 @@ class OTCI(object):
|
||||
|
||||
_IPV6_SERVER_PORT_PATTERN = re.compile(r'\[(.*)\]:(\d+)')
|
||||
|
||||
def dns_get_config(self) -> Dict[str, Union[Tuple[Ip6Addr, int], int, bool]]:
|
||||
"""Get DNS client query config."""
|
||||
def dns_get_config(self) -> Dict[str, Union[Tuple[Ip6Addr, int], int, bool, str]]:
|
||||
"""Get DNS client query config.
|
||||
"""
|
||||
output = self.execute_command('dns config')
|
||||
config: Dict[str, Union[Tuple[Ip6Addr, int], int, bool]] = {}
|
||||
config: Dict[str, Union[Tuple[Ip6Addr, int], int, bool, str]] = {}
|
||||
|
||||
#
|
||||
# Example output:
|
||||
# > dns config
|
||||
# Server: [fd00:0:0:0:0:0:0:1]:1234
|
||||
# ResponseTimeout: 5000 ms
|
||||
# MaxTxAttempts: 2
|
||||
# RecursionDesired: no
|
||||
# ServiceMode: srv_txt_opt
|
||||
# Nat64Mode: allow
|
||||
# TransportProtocol: udp
|
||||
# Done
|
||||
#
|
||||
|
||||
for line in output:
|
||||
k, v = line.split(': ')
|
||||
if k == 'Server':
|
||||
@@ -851,32 +896,50 @@ class OTCI(object):
|
||||
config['max_tx_attempts'] = int(v)
|
||||
elif k == 'RecursionDesired':
|
||||
config['recursion_desired'] = (v == 'yes')
|
||||
elif k == 'ServiceMode':
|
||||
config['service_mode'] = v
|
||||
elif k == 'Nat64Mode':
|
||||
config['nat64_mode'] = (v == 'allow')
|
||||
elif k == 'TransportProtocol':
|
||||
config['transport_protocol'] = v
|
||||
else:
|
||||
logging.warning("dns config ignored: %s", line)
|
||||
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def __add_with_default(l: list[str], var: Optional[Any], default: str = '0'):
|
||||
if var is not None:
|
||||
if isinstance(var, bool):
|
||||
l += ['1' if var else '0']
|
||||
else:
|
||||
l += [f'{var}']
|
||||
elif bool(l):
|
||||
l += [default]
|
||||
|
||||
def dns_set_config(self,
|
||||
server: Tuple[Union[str, ipaddress.IPv6Address], int],
|
||||
server: Union[Tuple[Union[str, ipaddress.IPv6Address], int], Tuple[()]],
|
||||
response_timeout: Optional[int] = None,
|
||||
max_tx_attempts: Optional[int] = None,
|
||||
recursion_desired: Optional[bool] = None):
|
||||
recursion_desired: Optional[bool] = None,
|
||||
service_mode: Optional[str] = None,
|
||||
transport_protocol: Optional[str] = None):
|
||||
"""Set DNS client query config."""
|
||||
cmd = f'dns config {str(server[0])} {server[1]}'
|
||||
if response_timeout is not None:
|
||||
cmd += f' {response_timeout}'
|
||||
|
||||
if not (max_tx_attempts is None or response_timeout is not None):
|
||||
raise AssertionError('must specify `response_timeout` if `max_tx_attempts` is specified.')
|
||||
if max_tx_attempts is not None:
|
||||
cmd += f' {max_tx_attempts}'
|
||||
|
||||
if not (recursion_desired is None or max_tx_attempts is not None):
|
||||
raise AssertionError('must specify `max_tx_attempts` if `recursion_desired` is specified.')
|
||||
|
||||
if recursion_desired is not None:
|
||||
cmd += f' {1 if recursion_desired else 0}'
|
||||
# working backwards so we can set defaults when required
|
||||
cmd_parts: List[str] = []
|
||||
self.__add_with_default(cmd_parts, transport_protocol)
|
||||
self.__add_with_default(cmd_parts, service_mode)
|
||||
self.__add_with_default(cmd_parts, recursion_desired)
|
||||
self.__add_with_default(cmd_parts, max_tx_attempts)
|
||||
self.__add_with_default(cmd_parts, response_timeout)
|
||||
if server:
|
||||
self.__add_with_default(cmd_parts, server[1] or None)
|
||||
self.__add_with_default(cmd_parts, server[0] or '::', '::')
|
||||
else:
|
||||
self.__add_with_default(cmd_parts, None)
|
||||
self.__add_with_default(cmd_parts, None, '::')
|
||||
|
||||
cmd = f'dns config {" ".join(cmd_parts[::-1])}'
|
||||
self.execute_command(cmd)
|
||||
|
||||
def dns_get_compression(self) -> bool:
|
||||
@@ -893,17 +956,27 @@ class OTCI(object):
|
||||
|
||||
def dns_browse(self,
|
||||
service: str,
|
||||
server: Optional[Tuple[Union[str, ipaddress.IPv6Address], int]] = None,
|
||||
server: Optional[Union[Tuple[Union[str, ipaddress.IPv6Address], int], Tuple[()]]] = None,
|
||||
response_timeout: Optional[int] = None,
|
||||
max_tx_attempts: Optional[int] = None,
|
||||
recursion_desired: Optional[bool] = None) -> List[Dict[str, Any]]:
|
||||
"""Browse DNS service instances."""
|
||||
args: List[Union[int, bool, str, ipaddress.IPv6Address, None]]
|
||||
if server is None:
|
||||
args = [service, response_timeout, max_tx_attempts, recursion_desired]
|
||||
|
||||
cmd_parts: List[str] = []
|
||||
self.__add_with_default(cmd_parts, recursion_desired)
|
||||
self.__add_with_default(cmd_parts, max_tx_attempts)
|
||||
self.__add_with_default(cmd_parts, response_timeout)
|
||||
if server:
|
||||
self.__add_with_default(cmd_parts, server[1] or None)
|
||||
self.__add_with_default(cmd_parts, server[0] or '::', '::')
|
||||
else:
|
||||
args = [service, *server, response_timeout, max_tx_attempts, recursion_desired]
|
||||
cmd = f'dns browse {" ".join([str(a) for a in args if a])}'
|
||||
self.__add_with_default(cmd_parts, None)
|
||||
self.__add_with_default(cmd_parts, None, '::')
|
||||
|
||||
if cmd_parts:
|
||||
cmd = f'dns browse {service} {" ".join(cmd_parts)}'
|
||||
else:
|
||||
cmd = f'dns browse {service}'
|
||||
output = '\n'.join(self.execute_command(cmd, 30.0))
|
||||
|
||||
result: List[Dict[str, Union[str, int, Ip6Addr, Dict[str, Union[bytes, bool]]]]] = []
|
||||
@@ -926,10 +999,13 @@ class OTCI(object):
|
||||
|
||||
return result
|
||||
|
||||
def dns_resolve(self, hostname: str) -> List[Dict[str, Union[Ip6Addr, int]]]:
|
||||
def dns_resolve(self,
|
||||
hostname: str,
|
||||
ip_address: str = '',
|
||||
ignore_result: bool = False) -> List[Dict[str, Union[Ip6Addr, int]]]:
|
||||
"""Resolve a DNS host name."""
|
||||
cmd = f'dns resolve {hostname}'
|
||||
output = self.execute_command(cmd, 30.0)
|
||||
cmd = 'dns resolve ' + ' '.join([x for x in [hostname, ip_address] if x])
|
||||
output = self.execute_command(cmd, 30.0, ignore_result=ignore_result)
|
||||
dns_resp = output[0]
|
||||
addrs = dns_resp.strip().split(' - ')[1].split(' ')
|
||||
ips = [Ip6Addr(item.strip()) for item in addrs[::2]]
|
||||
@@ -946,17 +1022,29 @@ class OTCI(object):
|
||||
server: Optional[Tuple[Union[str, ipaddress.IPv6Address], int]] = None,
|
||||
response_timeout: Optional[int] = None,
|
||||
max_tx_attempts: Optional[int] = None,
|
||||
recursion_desired: Optional[bool] = None) -> Dict[str, Any]:
|
||||
recursion_desired: Optional[bool] = None,
|
||||
ignore_result: bool = False) -> Dict[str, Any]:
|
||||
"""Resolves a service instance."""
|
||||
|
||||
cmd_parts: List[str] = []
|
||||
self.__add_with_default(cmd_parts, recursion_desired)
|
||||
self.__add_with_default(cmd_parts, max_tx_attempts)
|
||||
self.__add_with_default(cmd_parts, response_timeout)
|
||||
if server:
|
||||
self.__add_with_default(cmd_parts, server[1] or None)
|
||||
self.__add_with_default(cmd_parts, server[0] or '::')
|
||||
else:
|
||||
self.__add_with_default(cmd_parts, None)
|
||||
self.__add_with_default(cmd_parts, None, '::')
|
||||
|
||||
instance = self.__escape_escapable(instance)
|
||||
|
||||
args: List[Union[int, bool, str, ipaddress.IPv6Address, None]]
|
||||
if server is None:
|
||||
args = [response_timeout, max_tx_attempts, recursion_desired]
|
||||
if cmd_parts:
|
||||
cmd = f'dns service {instance} {service} {" ".join(cmd_parts[::-1])}'
|
||||
else:
|
||||
args = [*server, response_timeout, max_tx_attempts, recursion_desired]
|
||||
cmd = f'dns service {instance} {service} {" ".join([str(a) for a in args if a])}'
|
||||
output = self.execute_command(cmd, 30.0)
|
||||
cmd = f'dns service {instance} {service}'
|
||||
|
||||
output = self.execute_command(cmd, 30.0, ignore_result=ignore_result)
|
||||
|
||||
m = re.match(
|
||||
r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) ' +
|
||||
@@ -979,6 +1067,23 @@ class OTCI(object):
|
||||
else:
|
||||
raise CommandError(cmd, output)
|
||||
|
||||
def dns_resolve4(self,
|
||||
hostname: str,
|
||||
ip_address: str = '',
|
||||
ignore_result: bool = False) -> List[Dict[str, Union[Ip4Addr, int]]]:
|
||||
"""Resolve a DNS host name."""
|
||||
cmd = 'dns resolve4 ' + ' '.join([x for x in [hostname, ip_address] if x])
|
||||
output = self.execute_command(cmd, 30.0, ignore_result=ignore_result)
|
||||
dns_resp = output[0]
|
||||
addrs = dns_resp.strip().split(' - ')[1].split(' ')
|
||||
ips = [Ip4Addr(item.strip()) for item in addrs[::2]]
|
||||
ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
|
||||
|
||||
return [{
|
||||
'address': ip,
|
||||
'ttl': ttl,
|
||||
} for ip, ttl in zip(ips, ttls)]
|
||||
|
||||
#
|
||||
# SRP server & client utilities
|
||||
#
|
||||
@@ -996,12 +1101,12 @@ class OTCI(object):
|
||||
self.execute_command('srp server disable')
|
||||
|
||||
def srp_server_get_addressmode(self):
|
||||
"""Disable SRP server."""
|
||||
return self.__parse_str(self.execute_command(f'srp server addressmode'))
|
||||
"""Get the SRP server address mode."""
|
||||
return self.__parse_str(self.execute_command(f'srp server addrmode'))
|
||||
|
||||
def srp_server_set_addressmode(self, mode: Literal['unicast', 'anycast']):
|
||||
"""Disable SRP server."""
|
||||
self.execute_command(f'srp server addressmode {mode}')
|
||||
"""Set the SRP server address mode."""
|
||||
self.execute_command(f'srp server addrmode {mode}')
|
||||
|
||||
def srp_server_get_sequence_number(self) -> int:
|
||||
"""Set SRP server sequence number."""
|
||||
@@ -1485,30 +1590,6 @@ class OTCI(object):
|
||||
"""
|
||||
self.execute_command(f"csl period {period}")
|
||||
|
||||
def get_csl_channel(self) -> int:
|
||||
"""Get the channel CSL operates on
|
||||
|
||||
Returns:
|
||||
int: channel index
|
||||
"""
|
||||
return self.__parse_int(self.execute_command("csl channel"))
|
||||
|
||||
def set_csl_channel(self, channel: int):
|
||||
"""Set the CSL channel
|
||||
|
||||
Args:
|
||||
channel (int): channel on which CSL will operate
|
||||
"""
|
||||
self.execute_command(f"csl channel {channel}")
|
||||
|
||||
def get_csl_timeout(self) -> int:
|
||||
"""Get the CSL timeout
|
||||
|
||||
Returns:
|
||||
int: csl timeout [s]
|
||||
"""
|
||||
return self.__parse_int(self.execute_command("csl timeout"))
|
||||
|
||||
def set_csl_timeout(self, timeout: int):
|
||||
"""Set the CSL timeout
|
||||
|
||||
@@ -1527,16 +1608,16 @@ class OTCI(object):
|
||||
cfg: Dict[str, int] = {}
|
||||
for line in output:
|
||||
k, v = line.split(': ')
|
||||
if k == 'Channel':
|
||||
cfg['channel'] = int(v)
|
||||
elif k == 'Timeout':
|
||||
if k == 'channel':
|
||||
cfg[k] = int(v)
|
||||
elif k == 'timeout':
|
||||
matched = OTCI._CSL_TIMEOUT_PATTERN.match(v)
|
||||
assert matched is not None
|
||||
cfg['timeout'] = int(matched.group(1))
|
||||
elif k == 'Period':
|
||||
cfg[k] = int(matched.group(1))
|
||||
elif k == 'period':
|
||||
matched = OTCI._CSL_PERIOD_PATTERN.match(v)
|
||||
assert matched is not None
|
||||
cfg['period'] = int(matched.group(1))
|
||||
cfg[k] = int(matched.group(1))
|
||||
else:
|
||||
logging.warning("Ignore unknown CSL parameter: %s: %s", k, v)
|
||||
|
||||
@@ -1656,35 +1737,33 @@ class OTCI(object):
|
||||
self.execute_command(f'commissioner announce {channel_mask} {count} {period} {destination}')
|
||||
|
||||
def commissioner_energy_scan(self, channel_mask: int, count: int, period: int, duration: int,
|
||||
destination: str | Ip6Addr) -> List[Dict[int, List[int]]]:
|
||||
destination: str | Ip6Addr) -> Dict[int, List[int]]:
|
||||
"""Perform an energy scan on the specified channels."""
|
||||
output = self.execute_command(f'commissioner energy {channel_mask} {count} {period} {duration} {destination}')
|
||||
energy_reports: List[Dict[int, List[int]]] = []
|
||||
for line in output:
|
||||
_mask, _energies = line.split(": ")[1].split(" ", 1)
|
||||
channels = [b for b in bits_set(int(_mask))]
|
||||
energies = [int(e) for e in _energies.split(" ")]
|
||||
energy_reports.append({ch: energies[idx::len(channels)] for (idx, ch) in enumerate(channels)})
|
||||
return energy_reports
|
||||
ch_count = len(list(bits_set(channel_mask)))
|
||||
self.execute_command(f'commissioner energy {channel_mask} {count} {period} {duration} {destination}')
|
||||
output = self.__otcmd.wait(ch_count * count * (period + duration + 999) / 1000 + 1)
|
||||
|
||||
if len(output) > 1:
|
||||
raise UnexpectedCommandOutput(output)
|
||||
|
||||
_mask, _energies = output[0].split(": ")[1].split(" ", 1)
|
||||
channels = [b for b in bits_set(int(_mask, 16))]
|
||||
energies = [int(e) for e in _energies.split(" ")]
|
||||
return {ch: energies[idx::len(channels)] for (idx, ch) in enumerate(channels)}
|
||||
|
||||
def commissioner_mgmt_get(self,
|
||||
named_tlvs: Optional[Tuple[str, ...]] = None,
|
||||
hex_tlvs: Optional[Tuple[int, ...]] = None) -> str:
|
||||
hex_tlvs: Optional[Tuple[int, ...]] = None):
|
||||
"""Send a MGMT_GET request."""
|
||||
if not named_tlvs and not hex_tlvs:
|
||||
return ""
|
||||
_cmd: List[str] = ['commissioner', 'mgmtget']
|
||||
|
||||
if named_tlvs is not None:
|
||||
_named_tlvs = " " + " ".join(named_tlvs)
|
||||
else:
|
||||
_named_tlvs = ''
|
||||
if named_tlvs:
|
||||
_cmd += named_tlvs
|
||||
|
||||
if hex_tlvs is not None:
|
||||
_hex_tlvs = f' -x {"".join(f"{x:02x}" for x in hex_tlvs)}' or ''
|
||||
else:
|
||||
_hex_tlvs = ''
|
||||
if hex_tlvs:
|
||||
_cmd += [self.__detect_binary_cmd(), "".join(f"{x:02x}" for x in hex_tlvs)]
|
||||
|
||||
return self.__parse_str(self.execute_command(f'commissioner mgmtget{_named_tlvs}{_hex_tlvs}'))
|
||||
self.execute_command(' '.join(_cmd))
|
||||
|
||||
def commissioner_mgmt_set(self,
|
||||
locator: Optional[str] = None,
|
||||
@@ -1693,7 +1772,7 @@ class OTCI(object):
|
||||
joiner_udp_port: Optional[int] = None,
|
||||
tlvs: Optional[str] = None):
|
||||
"""Send a MGMT_SET request."""
|
||||
_names = ['locator', 'sessionid', 'steeringdata', 'joinerudpport', '-x']
|
||||
_names = ['locator', 'sessionid', 'steeringdata', 'joinerudpport', self.__detect_binary_cmd()]
|
||||
_tlvs: List[Union[int, str, None]] = [locator, session_id, steering_data, joiner_udp_port, tlvs]
|
||||
_cmd = [x for x in zip(_names, _tlvs) if x[1] is not None]
|
||||
|
||||
@@ -1705,10 +1784,14 @@ class OTCI(object):
|
||||
|
||||
def commissioner_panid_query(self, panid: int, channel_mask: int, destination: str | Ip6Addr) -> List[int]:
|
||||
"""Perform a PAN ID query on the specified channels."""
|
||||
output = self.execute_command(f'commissioner panid {panid} {channel_mask} {destination}')
|
||||
masks = [int(line.split(": ")[1].split(", ", 1)[1], 16) for line in output]
|
||||
conflict_mask = functools.reduce(lambda x, y: x | y, masks)
|
||||
return [b for b in bits_set(conflict_mask)]
|
||||
self.execute_command(f'commissioner panid {panid} {channel_mask} {destination}')
|
||||
output = self.__otcmd.wait(len(list(bits_set(channel_mask))))
|
||||
if output:
|
||||
masks = [int(line.split(": ")[1].split(", ", 1)[1], 16) for line in output]
|
||||
conflict_mask = functools.reduce(lambda x, y: x | y, masks)
|
||||
return [b for b in bits_set(conflict_mask)]
|
||||
else:
|
||||
return []
|
||||
|
||||
#
|
||||
# Joiner operations
|
||||
@@ -1871,7 +1954,7 @@ class OTCI(object):
|
||||
|
||||
def get_network_data_bytes(self) -> bytes:
|
||||
"""Get the raw Network Data."""
|
||||
hexstr = self.__parse_str(self.execute_command('netdata show -x'))
|
||||
hexstr = self.__parse_str(self.execute_command(f'netdata show {self.__detect_binary_cmd()}'))
|
||||
return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
|
||||
|
||||
def get_local_routes(self) -> List[Tuple[str, bool, str, Rloc16]]:
|
||||
@@ -1947,7 +2030,7 @@ class OTCI(object):
|
||||
if dataset in ('active', 'pending'):
|
||||
cmd = f'dataset commit {dataset}'
|
||||
else:
|
||||
raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
|
||||
raise InvalidArgumentsError(f'Unknown dataset: {dataset}')
|
||||
|
||||
self.execute_command(cmd)
|
||||
|
||||
@@ -2018,7 +2101,7 @@ class OTCI(object):
|
||||
|
||||
def get_dataset_bytes(self, dataset: str) -> bytes:
|
||||
if dataset in ('active', 'pending'):
|
||||
cmd = f'dataset {dataset} -x'
|
||||
cmd = f'dataset {dataset} {self.__detect_binary_cmd()}'
|
||||
else:
|
||||
raise InvalidArgumentsError(f'Unknown dataset: {dataset}')
|
||||
|
||||
@@ -2061,7 +2144,7 @@ class OTCI(object):
|
||||
self.execute_command(f'dataset wakeupchannel {wakeupchannel}')
|
||||
|
||||
if channel_mask is not None:
|
||||
self.execute_command(f'dataset channelmask {channel_mask}')
|
||||
self.execute_command(f'dataset channelmask {channel_mask:#08x}')
|
||||
|
||||
if extpanid is not None:
|
||||
self.execute_command(f'dataset extpanid {extpanid}')
|
||||
@@ -2077,7 +2160,7 @@ class OTCI(object):
|
||||
self.execute_command(f'dataset networkname {self.__escape_escapable(network_name)}')
|
||||
|
||||
if panid is not None:
|
||||
self.execute_command(f'dataset panid {panid}')
|
||||
self.execute_command(f'dataset panid {panid:#04x}')
|
||||
|
||||
if pskc is not None:
|
||||
self.execute_command(f'dataset pskc {pskc}')
|
||||
@@ -2094,7 +2177,7 @@ class OTCI(object):
|
||||
address: Optional[str | Ip6Addr] = None,
|
||||
named_tlvs: Optional[List[Tuple[str, str]]] = None,
|
||||
hex_tlvs: Optional[Tuple[int, ...]] = None):
|
||||
_cmd: List[str] = ['dataset', 'mgmtget', dataset]
|
||||
_cmd: List[str] = ['dataset', 'mgmtgetcommand', dataset]
|
||||
|
||||
if address is not None:
|
||||
_cmd += ['address', str(address)]
|
||||
@@ -2104,25 +2187,21 @@ class OTCI(object):
|
||||
_cmd += [item for sublist in named_tlvs for item in sublist]
|
||||
|
||||
if hex_tlvs is not None:
|
||||
_cmd += ['-x', ''.join([f'{tlv:02x}' for tlv in hex_tlvs])]
|
||||
_cmd += [self.__detect_binary_cmd(), ''.join([f'{tlv:02x}' for tlv in hex_tlvs])]
|
||||
|
||||
self.execute_command(' '.join(_cmd))
|
||||
|
||||
def dataset_mgmt_set_command(self,
|
||||
dataset: str,
|
||||
address: Optional[str | Ip6Addr] = None,
|
||||
named_tlvs: Optional[List[Tuple[str, Any]]] = None,
|
||||
hex_tlvs: Optional[str] = None):
|
||||
_cmd = ['dataset', 'mgmtset', dataset]
|
||||
|
||||
if address is not None:
|
||||
_cmd += ['address', address]
|
||||
_cmd = ['dataset', 'mgmtsetcommand', dataset]
|
||||
|
||||
if named_tlvs is not None:
|
||||
_cmd += list(sum(named_tlvs, ()))
|
||||
|
||||
if hex_tlvs is not None:
|
||||
_cmd += ['-x', hex_tlvs]
|
||||
_cmd += [self.__detect_binary_cmd(), hex_tlvs]
|
||||
|
||||
self.execute_command(' '.join(_cmd))
|
||||
|
||||
@@ -2213,6 +2292,9 @@ class OTCI(object):
|
||||
def __detect_networkkey_cmd(self) -> str:
|
||||
return 'networkkey' if self.api_version >= 126 else 'masterkey'
|
||||
|
||||
def __detect_binary_cmd(self) -> str:
|
||||
return '-x' if self.api_version >= 28 else 'binary'
|
||||
|
||||
#
|
||||
# Unicast Addresses management
|
||||
#
|
||||
@@ -2484,7 +2566,11 @@ class OTCI(object):
|
||||
|
||||
def get_dua_iid(self) -> str:
|
||||
"""Get the DUA IID for Thread 1.2 device."""
|
||||
return self.__parse_iid(self.execute_command('dua iid'))
|
||||
raw_iid = self.execute_command('dua iid')
|
||||
if raw_iid:
|
||||
return self.__parse_iid(raw_iid)
|
||||
else:
|
||||
return ''
|
||||
|
||||
def set_dua_iid(self, iid: str):
|
||||
"""Set the DUA IID for Thread 1.2 device."""
|
||||
@@ -2621,7 +2707,7 @@ class OTCI(object):
|
||||
return results
|
||||
|
||||
def __parse_numbers(self, line: str) -> List[int]:
|
||||
values = re.findall(r"\-?\d+", line)
|
||||
values = re.findall(r"-?\d+", line)
|
||||
return list(map(int, values))
|
||||
|
||||
def __valid_flags(self, flags: str, flags_set: str):
|
||||
@@ -2735,7 +2821,8 @@ class OTCI(object):
|
||||
port: Optional[int] = None,
|
||||
text: Optional[str] = None,
|
||||
random_bytes: Optional[int] = None,
|
||||
hex: Optional[str] = None):
|
||||
hex: Optional[str] = None,
|
||||
return_result: bool = True):
|
||||
"""Send a few bytes over UDP.
|
||||
|
||||
ip: the IPv6 destination address.
|
||||
@@ -2764,7 +2851,7 @@ class OTCI(object):
|
||||
self.__validate_hex(hex)
|
||||
cmd += f' -x {hex}'
|
||||
|
||||
self.execute_command(cmd)
|
||||
self.execute_command(cmd, ignore_result=not return_result)
|
||||
|
||||
def udp_get_link_security(self) -> bool:
|
||||
"""Gets whether the link security is enabled or disabled."""
|
||||
@@ -2861,6 +2948,10 @@ class OTCI(object):
|
||||
#
|
||||
# Diag Utilities
|
||||
#
|
||||
def diag(self):
|
||||
"""Get the current status of the diagnostics module."""
|
||||
return self.__parse_str(self.execute_command('diag'))
|
||||
|
||||
def diag_start(self):
|
||||
"""Start diagnostics mode."""
|
||||
self.execute_command('diag start')
|
||||
@@ -2875,8 +2966,7 @@ class OTCI(object):
|
||||
|
||||
def diag_get_channel(self) -> int:
|
||||
"""Get the IEEE 802.15.4 Channel value for diagnostics module."""
|
||||
line = self.__parse_str(self.execute_command('diag channel'))
|
||||
return int(line)
|
||||
return self.__parse_int(self.execute_command('diag channel'))
|
||||
|
||||
def diag_set_power(self, power: int):
|
||||
"""Set the tx power value(dBm) for diagnostics module."""
|
||||
@@ -2884,8 +2974,7 @@ class OTCI(object):
|
||||
|
||||
def diag_get_power(self) -> int:
|
||||
"""Get the tx power value(dBm) for diagnostics module."""
|
||||
line = self.__parse_str(self.execute_command('diag power'))
|
||||
return int(line)
|
||||
return self.__parse_int(self.execute_command('diag power'))
|
||||
|
||||
def diag_cw_start(self):
|
||||
"""Start transmitting continuous carrier wave."""
|
||||
@@ -2964,7 +3053,7 @@ class OTCI(object):
|
||||
"""Set radio to receive mode."""
|
||||
self.execute_command('diag radio receive')
|
||||
|
||||
def diag_radio_receive_number(self, number: int):
|
||||
def diag_radio_receive_number(self, number: int) -> List[Dict[str, Union[int, str]]]:
|
||||
"""Set radio to receive mode and receive specified number of packets."""
|
||||
#
|
||||
# The `diag radio receive <number> [lpr]` command example:
|
||||
@@ -2983,7 +3072,7 @@ class OTCI(object):
|
||||
if len(output) != number:
|
||||
raise UnexpectedCommandOutput(output)
|
||||
|
||||
result = []
|
||||
result: List[Dict[str, Union[int, str]]] = []
|
||||
|
||||
for line in output:
|
||||
data = line.split(',')
|
||||
@@ -3044,11 +3133,11 @@ class OTCI(object):
|
||||
result['sent_error_invalid_state_packets'] = int(output[4].split(":")[1])
|
||||
result['sent_error_others_packets'] = int(output[5].split(":")[1])
|
||||
|
||||
values = re.findall("\-?\d+", output[6])
|
||||
values = re.findall(r"-?\d+", output[6])
|
||||
result['first_received_packet_rssi'] = int(values[0])
|
||||
result['first_received_packet_lqi'] = int(values[1])
|
||||
|
||||
values = re.findall("\-?\d+", output[7])
|
||||
values = re.findall(r"-?\d+", output[7])
|
||||
result['last_received_packet_rssi'] = int(values[0])
|
||||
result['last_received_packet_lqi'] = int(values[1])
|
||||
|
||||
@@ -3223,15 +3312,46 @@ class OTCI(object):
|
||||
# Other TODOs
|
||||
#
|
||||
|
||||
def get_network_diagnostics(self, addr: Union[str, Ip6Addr], type: list[int]) -> str:
|
||||
def get_network_diagnostics(self, addr: Union[str, Ip6Addr],
|
||||
tlvs: list[int]) -> Dict[str, Union[str, Dict[str, str]]]:
|
||||
"""Get the network diagnostic information."""
|
||||
output = self.execute_command(f'networkdiagnostic get {addr} {" ".join(map(str, type))}')
|
||||
return str(output)
|
||||
args = [str(addr)] + list(map(str, tlvs))
|
||||
output = self.execute_command(f'networkdiagnostic get {" ".join(args)}')
|
||||
# line 0 is the hex representation of the diagnostics
|
||||
# below that is the human readable/parsed format
|
||||
result: Dict[str, Union[str, Dict[str, str]]] = {}
|
||||
diag: Optional[Union[str, Dict[str, str]]] = None
|
||||
for line in output[1:]:
|
||||
if not line.startswith(' '):
|
||||
k, v = line.split(': ', 1)
|
||||
diag = v or {}
|
||||
result[line.split(': ', 1)[0]] = diag
|
||||
else:
|
||||
assert diag is not None and isinstance(diag, dict)
|
||||
k, v = line.strip().split(': ')
|
||||
diag[k] = v
|
||||
return result
|
||||
|
||||
def get_network_diagnostics_bytes(self, addr: Union[str, Ip6Addr], tlvs: list[int]) -> str:
|
||||
"""Get the network diagnostic information."""
|
||||
args = [str(addr)] + list(map(str, tlvs))
|
||||
output = self.execute_command(f'networkdiagnostic get {" ".join(args)}')
|
||||
# line 0 is the hex representation of the diagnostics
|
||||
# below that is the human readable/parsed format
|
||||
return output[0].split(': ')[1]
|
||||
|
||||
def reset_network_diagnostics(self, addr: Union[str, Ip6Addr], type: list[int]):
|
||||
"""Reset the network diagnostic information."""
|
||||
self.execute_command(f'networkdiagnostic reset {addr} {" ".join(map(str, type))}')
|
||||
|
||||
def get_network_diagnostics_non_preferred_channels(self) -> int:
|
||||
"""Get the non-preferred channels in the network diagnostics."""
|
||||
return self.__parse_int(self.execute_command('networkdiagnostic nonpreferredchannels'), 16)
|
||||
|
||||
def set_network_diagnostics_non_preferred_channels(self, channels: int):
|
||||
"""Set the non-preferred channels in the network diagnostics."""
|
||||
self.execute_command(f'networkdiagnostic nonpreferredchannels {channels}')
|
||||
|
||||
__PARENT_KEY_MAP = {
|
||||
'Ext Addr': 'extaddr',
|
||||
'Rloc': 'rloc16',
|
||||
@@ -3239,17 +3359,24 @@ class OTCI(object):
|
||||
'Link Quality Out': 'lq_out',
|
||||
'Age': 'age',
|
||||
'Version': 'version',
|
||||
'CSL clock accuracy': 'csl_clock_accuracy',
|
||||
'CSL uncertainty': 'csl_uncertainty',
|
||||
}
|
||||
|
||||
def get_parent(self) -> Dict[str, int]:
|
||||
def get_parent(self) -> Dict[str, Union[int, str]]:
|
||||
"""Get the diagnostic information for a Thread Router as parent."""
|
||||
data: Dict[str, int] = {}
|
||||
data: Dict[str, Union[int, str]] = {}
|
||||
output = self.execute_command('parent')
|
||||
|
||||
try:
|
||||
for line in output:
|
||||
k, v = line.split(': ')
|
||||
data[OTCI.__PARENT_KEY_MAP[k]] = int(v, base=0)
|
||||
if OTCI.__PARENT_KEY_MAP[k] == 'extaddr':
|
||||
data[OTCI.__PARENT_KEY_MAP[k]] = v
|
||||
elif OTCI.__PARENT_KEY_MAP[k] == 'rloc16':
|
||||
data[OTCI.__PARENT_KEY_MAP[k]] = Rloc16(v, 16)
|
||||
else:
|
||||
data[OTCI.__PARENT_KEY_MAP[k]] = int(v, base=0)
|
||||
except KeyError:
|
||||
raise UnexpectedCommandOutput(output)
|
||||
|
||||
@@ -3561,12 +3688,12 @@ def connect_otbr_ssh(host: str,
|
||||
return OTCI(cmd_handler)
|
||||
|
||||
|
||||
def connect_otbr_adb_tcp(host: str, port: int = 5555, adb_key: Optional[str] = None):
|
||||
def connect_otbr_adb_tcp(host: str, port: int = 5555, adb_key: Optional[str] = None) -> OTCI:
|
||||
cmd_handler = OtbrAdbTcpCommandRunner(host, port, adb_key)
|
||||
return OTCI(cmd_handler)
|
||||
|
||||
|
||||
def connect_otbr_adb_usb(serial: str, adb_key: Optional[str] = None):
|
||||
def connect_otbr_adb_usb(serial: str, adb_key: Optional[str] = None) -> OTCI:
|
||||
cmd_handler = OtbrAdbUsbCommandRunner(serial, adb_key)
|
||||
return OTCI(cmd_handler)
|
||||
|
||||
|
||||
@@ -127,6 +127,22 @@ class Ip6Prefix(ipaddress.IPv6Network):
|
||||
return super().__hash__()
|
||||
|
||||
|
||||
class Ip4Addr(ipaddress.IPv4Address):
|
||||
"""Represents an IPv4 address."""
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, str):
|
||||
other = ipaddress.IPv4Address(other)
|
||||
|
||||
return super().__eq__(other)
|
||||
|
||||
def __repr__(self):
|
||||
return self.compressed
|
||||
|
||||
def __hash__(self):
|
||||
return super().__hash__()
|
||||
|
||||
|
||||
SecurityPolicy = namedtuple('SecurityPolicy', ['rotation_time', 'flags'])
|
||||
"""Represents a Security Policy configuration."""
|
||||
|
||||
|
||||
@@ -69,5 +69,5 @@ def bits_set(number: int) -> Generator[int, int, None]:
|
||||
while number != 0:
|
||||
if number & 1:
|
||||
yield idx
|
||||
else:
|
||||
number >>= 1
|
||||
number >>= 1
|
||||
idx += 1
|
||||
|
||||
@@ -131,6 +131,15 @@ class TestOTCI(unittest.TestCase):
|
||||
self.assertFalse(leader.get_router_eligible())
|
||||
leader.enable_router_eligible()
|
||||
|
||||
leader.set_mesh_local_prefix('fd00:dba::/64')
|
||||
self.assertEqual('fd00:dba::/64', leader.get_mesh_local_prefix())
|
||||
leader.set_mesh_local_prefix(TEST_MESH_LOCAL_PREFIX + '/64')
|
||||
leader.set_ml_iid('b1a5ed57a71571c5')
|
||||
leader.set_dua_iid('ad4a011dad4a011d')
|
||||
self.assertEqual('ad4a011dad4a011d', leader.get_dua_iid())
|
||||
leader.clear_dua_iid()
|
||||
self.assertEqual('', leader.get_dua_iid())
|
||||
|
||||
self.assertFalse(leader.get_ifconfig_state())
|
||||
# ifconfig up
|
||||
leader.ifconfig_up()
|
||||
@@ -198,6 +207,15 @@ class TestOTCI(unittest.TestCase):
|
||||
leader.set_allowlist([leader.get_extaddr()])
|
||||
leader.disable_allowlist()
|
||||
|
||||
leader.enable_denylist()
|
||||
leader.add_denylist(leader.get_extaddr())
|
||||
leader.remove_denylist(leader.get_extaddr())
|
||||
leader.set_denylist([leader.get_extaddr()])
|
||||
leader.disable_denylist()
|
||||
|
||||
leader.enable_ccm()
|
||||
leader.disable_ccm()
|
||||
|
||||
self.assertEqual([], leader.backbone_router_get_multicast_listeners())
|
||||
|
||||
leader.add_ipmaddr('ff04::1')
|
||||
@@ -255,7 +273,11 @@ class TestOTCI(unittest.TestCase):
|
||||
|
||||
logging.info("CSL config: %r", leader.get_csl_config())
|
||||
leader.config_csl(channel=13, period=16000, timeout=200)
|
||||
logging.info("CSL config: %r", leader.get_csl_config())
|
||||
cfg = leader.get_csl_config()
|
||||
logging.info("CSL config: %r", cfg)
|
||||
self.assertEqual(13, cfg['channel'])
|
||||
self.assertEqual(16000, cfg['period'])
|
||||
self.assertEqual(200, cfg['timeout'])
|
||||
|
||||
logging.info("EID-to-RLOC cache: %r", leader.get_eidcache())
|
||||
|
||||
@@ -331,6 +353,16 @@ class TestOTCI(unittest.TestCase):
|
||||
logging.info('dataset active -x: %r', leader.get_dataset_bytes('active'))
|
||||
logging.info('dataset pending -x: %r', leader.get_dataset_bytes('pending'))
|
||||
|
||||
leader.set_vendor_name('OpenThread')
|
||||
self.assertEqual('OpenThread', leader.get_vendor_name())
|
||||
leader.set_vendor_model('some_model')
|
||||
self.assertEqual('some_model', leader.get_vendor_model())
|
||||
leader.set_vendor_sw_version('1.0.0')
|
||||
self.assertEqual('1.0.0', leader.get_vendor_sw_version())
|
||||
|
||||
leader.set_minimal_delay_timer(1)
|
||||
self.assertEqual(1, leader.get_minimal_delay_timer())
|
||||
|
||||
# Test SRP server & client
|
||||
self._test_otci_srp(leader, leader)
|
||||
|
||||
@@ -355,7 +387,10 @@ class TestOTCI(unittest.TestCase):
|
||||
'server': (server.get_ipaddr_rloc(), 53),
|
||||
'response_timeout': 10000,
|
||||
'max_tx_attempts': 4,
|
||||
'recursion_desired': False
|
||||
'recursion_desired': False,
|
||||
'service_mode': 'srv_txt_opt',
|
||||
'transport_protocol': 'udp',
|
||||
'nat64_mode': True
|
||||
}, client.dns_get_config())
|
||||
|
||||
self.assertTrue(client.dns_get_compression())
|
||||
@@ -379,6 +414,11 @@ class TestOTCI(unittest.TestCase):
|
||||
server.srp_server_set_domain('default.service.arpa.')
|
||||
self.assertEqual('default.service.arpa.', server.srp_server_get_domain())
|
||||
|
||||
server.srp_server_set_sequence_number(0x55)
|
||||
self.assertEqual(0x55, server.srp_server_get_sequence_number())
|
||||
server.srp_server_set_addressmode('unicast')
|
||||
self.assertEqual('unicast', server.srp_server_get_addressmode())
|
||||
|
||||
default_leases = server.srp_server_get_lease()
|
||||
self.assertEqual(default_leases, (30, 97200, 30, 680400))
|
||||
server.srp_server_set_lease(1801, 7201, 86401, 1209601)
|
||||
@@ -508,6 +548,7 @@ class TestOTCI(unittest.TestCase):
|
||||
self.assertEqual('Removed', client.srp_client_get_host()['state'])
|
||||
self.assertEqual([], server.srp_server_get_hosts())
|
||||
self.assertEqual([], server.srp_server_get_services())
|
||||
client.srp_client_clear_host()
|
||||
|
||||
def _test_otci_example(self, node1: OTCI, node2: OTCI):
|
||||
node1.dataset_init_buffer()
|
||||
@@ -601,6 +642,8 @@ class TestOTCI(unittest.TestCase):
|
||||
commissioner.wait(5)
|
||||
self.assertEqual('active', commissioner.get_commissioner_state())
|
||||
|
||||
logging.info('commissioner.commissioner_get_session_id() = %d', commissioner.get_commissioner_session_id())
|
||||
|
||||
logging.info('commissioner.get_network_id_timeout() = %d', commissioner.get_network_id_timeout())
|
||||
commissioner.set_network_id_timeout(60)
|
||||
self.assertEqual(60, commissioner.get_network_id_timeout())
|
||||
@@ -711,7 +754,27 @@ class TestOTCI(unittest.TestCase):
|
||||
rtt: Dict[str, float] = cast(Dict[str, float], statistics['round_trip_time'])
|
||||
self.assertTrue(rtt['min'] - 1e-9 <= rtt['avg'] <= rtt['max'] + 1e-9)
|
||||
|
||||
ed_report = commissioner.commissioner_energy_scan(3 << commissioner.get_channel(), 4, 32, 1000,
|
||||
child1.get_ipaddr_rloc())
|
||||
comm_chan = commissioner.get_channel()
|
||||
self.assertEqual({comm_chan: [-30, -30, -30, -30], comm_chan + 1: [-30, -30, -30, -30]}, ed_report)
|
||||
|
||||
commissioner.commissioner_announce(TEST_CHANNEL_MASK, 1, 32, child1.get_ipaddr_rloc())
|
||||
|
||||
conflicts = commissioner.commissioner_panid_query(TEST_PANID, TEST_CHANNEL_MASK, child1.get_ipaddr_rloc())
|
||||
self.assertEqual([22], conflicts)
|
||||
|
||||
parent = child1.get_parent()
|
||||
self.assertEqual(parent['extaddr'], commissioner.get_extaddr())
|
||||
self.assertEqual(parent['rloc16'], commissioner.get_rloc16())
|
||||
|
||||
diags = commissioner.get_network_diagnostics(child1.get_ipaddr_rloc(), [0, 1])
|
||||
self.assertEqual({'Ext Address': f'{child1.get_extaddr()}', 'Rloc16': str(child1.get_rloc16())}, diags)
|
||||
diags = commissioner.get_network_diagnostics_bytes(child2.get_ipaddr_rloc(), [0, 1])
|
||||
self.assertEqual('0008' + child2.get_extaddr() + '0102' + f'{child2.get_rloc16():04x}', diags)
|
||||
|
||||
# Shutdown
|
||||
commissioner.commissioner_stop()
|
||||
leader.thread_stop()
|
||||
logging.info("node state: %s", leader.get_state())
|
||||
leader.ifconfig_down()
|
||||
|
||||
Reference in New Issue
Block a user