diff --git a/src/cli/README.md b/src/cli/README.md index 21afc0dbf..7cb2689c6 100644 --- a/src/cli/README.md +++ b/src/cli/README.md @@ -1196,9 +1196,9 @@ CSL period is shown in microseconds. ```bash > csl -Channel: 11 -Period: 160000us -Timeout: 1000s +channel: 11 +period: 160000us +timeout: 1000s Done ``` diff --git a/src/cli/cli.cpp b/src/cli/cli.cpp index 16ed8ae66..461f15903 100644 --- a/src/cli/cli.cpp +++ b/src/cli/cli.cpp @@ -7535,7 +7535,7 @@ template <> otError Interpreter::Process(Arg aArgs[]) * @code * networkdiagnostic get ff02::1 0 1 * DIAG_GET.rsp/ans: 00080e336e1c41494e1c01020c00 - * Ext Address: '0e336e1c41494e1c' + * Ext Address: 0e336e1c41494e1c * Rloc16: 0x0c00 * Done * DIAG_GET.rsp/ans: 00083efcdb7e3f9eb0f201021800 diff --git a/tools/otci/README.md b/tools/otci/README.md index b6c45dcf0..0261b2136 100644 --- a/tools/otci/README.md +++ b/tools/otci/README.md @@ -1,6 +1,6 @@ # OpenThread Control Interface -The OpenThread Control Interface (OTCI) is a library which provides uniform python interfaces to connect and control various kinds of devices running OpenThread. +The OpenThread Control Interface (OTCI) is a library which provides uniform Python interfaces to connect and control various kinds of devices running OpenThread. ## Supported device types @@ -22,7 +22,7 @@ node1 = otci.connect_otbr_ssh("192.168.1.101") # Connect to an OpenThread CLI device via Serial node2 = otci.connect_cli_serial("/dev/ttyACM0")) -# Start node1 to become Leader +# Start node1 and wait for it to become the Leader node1.dataset_init_buffer() node1.dataset_set_buffer(network_name='test', network_key='00112233445566778899aabbccddeeff', panid=0xface, channel=11) node1.dataset_commit_buffer('active') @@ -32,7 +32,7 @@ node1.thread_start() node1.wait(5) assert node1.get_state() == "leader" -# Start Commissioner on node1 +# Start the Commissioner role on node1 node1.commissioner_start() node1.wait(3) @@ -42,11 +42,11 @@ node1.commissioner_add_joiner("TEST123",eui64='*') node2.ifconfig_up() node2.set_router_selection_jitter(1) -# Start Joiner on node2 to join the network +# Start the Joiner role on node2 and wait for it to join the network node2.joiner_start("TEST123") node2.wait(10, expect_line="Join success") -# Wait for node 2 to become Router +# Wait for node 2 to become a Router node2.thread_start() node2.wait(5) assert node2.get_state() == "router" diff --git a/tools/otci/otci/__init__.py b/tools/otci/otci/__init__.py index a81fa32e8..68f9a8bb7 100644 --- a/tools/otci/otci/__init__.py +++ b/tools/otci/otci/__init__.py @@ -28,8 +28,9 @@ # from . import errors -from .constants import THREAD_VERSION_1_1, THREAD_VERSION_1_2 +from .constants import THREAD_VERSION_1_1, THREAD_VERSION_1_2, THREAD_VERSION_1_3, THREAD_VERSION_1_4 from .command_handlers import OTCommandHandler +from .connectors import OtCliHandler from .otci import OTCI from .otci import ( connect_cli_sim, @@ -56,6 +57,7 @@ _connectors = [ __all__ = [ 'OTCI', 'OTCommandHandler', + 'OTCliHandler', 'errors', 'Rloc16', 'ChildId', diff --git a/tools/otci/otci/command_handlers.py b/tools/otci/otci/command_handlers.py index 18c9ce9ea..30242b6cb 100644 --- a/tools/otci/otci/command_handlers.py +++ b/tools/otci/otci/command_handlers.py @@ -84,26 +84,30 @@ class OTCommandHandler(ABC): def shell(self, cmd: str, timeout: float) -> List[str]: raise NotImplementedError("shell command is not supported on %s" % self.__class__.__name__) + @classmethod + def set_filter(cls, filter: re.Pattern[str]): + return + class OtCliCommandRunner(OTCommandHandler): __PATTERN_COMMAND_DONE_OR_ERROR = re.compile( r'(Done|Error|Error \d+:.*|.*: command not found)$') # "Error" for spinel-cli.py - __PATTERN_LOG_LINE = re.compile(r'((\[(NONE|CRIT|WARN|NOTE|INFO|DEBG)\])' + __pattern_log_line = re.compile(r'((\[(NONE|CRIT|WARN|NOTE|INFO|DEBG)\])' r'|(-.*-+: )' # e.g. -CLI-----: r'|(\[[DINWC\-]\] (?=[\w\-]{14}:)\w+-*:)' # e.g. [I] Mac-----------: r')') """regex used to filter logs""" - assert __PATTERN_LOG_LINE.match('[I] ChannelMonitor: debug log') - assert __PATTERN_LOG_LINE.match('[I] Mac-----------: info log') - assert __PATTERN_LOG_LINE.match('[N] MeshForwarder-: note log') - assert __PATTERN_LOG_LINE.match('[W] Notifier------: warn log') - assert __PATTERN_LOG_LINE.match('[C] Mle-----------: critical log') - assert __PATTERN_LOG_LINE.match('[-] Settings------: none log') - assert not __PATTERN_LOG_LINE.match('[-] Settings-----: none log') # not enough `-` after module name + assert __pattern_log_line.match('[I] ChannelMonitor: debug log') + assert __pattern_log_line.match('[I] Mac-----------: info log') + assert __pattern_log_line.match('[N] MeshForwarder-: note log') + assert __pattern_log_line.match('[W] Notifier------: warn log') + assert __pattern_log_line.match('[C] Mle-----------: critical log') + assert __pattern_log_line.match('[-] Settings------: none log') + assert not __pattern_log_line.match('[-] Settings-----: none log') # not enough `-` after module name - __ASYNC_COMMANDS = {'scan', 'ping', 'discover'} + __ASYNC_COMMANDS = {'scan', 'ping', 'discover', 'networkdiagnostic get'} def __init__(self, otcli: OtCliHandler, is_spinel_cli: bool = False): self.__otcli: OtCliHandler = otcli @@ -134,7 +138,8 @@ class OtCliCommandRunner(OTCommandHandler): output = self.__expect_line(timeout, OtCliCommandRunner.__PATTERN_COMMAND_DONE_OR_ERROR, - asynchronous=cmd.split()[0] in OtCliCommandRunner.__ASYNC_COMMANDS) + asynchronous=any(cmd.startswith(x) for x in OtCliCommandRunner.__ASYNC_COMMANDS)) + return output def execute_platform_command(self, cmd: str, timeout: float = 10) -> List[str]: @@ -162,6 +167,18 @@ class OtCliCommandRunner(OTCommandHandler): def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]): self.__line_read_callback = callback + @classmethod + def set_filter(cls, filter: re.Pattern[str]): + """Set a different filter for the read routine that still matches the original filter""" + assert filter.match('[I] ChannelMonitor: debug log') + assert filter.match('[I] Mac-----------: info log') + assert filter.match('[N] MeshForwarder-: note log') + assert filter.match('[W] Notifier------: warn log') + assert filter.match('[C] Mle-----------: critical log') + assert filter.match('[-] Settings------: none log') + assert not filter.match('[-] Settings-----: none log') # not enough `-` after module name + cls.__pattern_log_line = filter + # # Private methods # @@ -216,6 +233,8 @@ class OtCliCommandRunner(OTCommandHandler): if line is None: break + line = line.rstrip() + if line.startswith('> '): line = line[2:] @@ -224,8 +243,9 @@ class OtCliCommandRunner(OTCommandHandler): logging.debug('%s: %s', self.__otcli, line) - if not OtCliCommandRunner.__PATTERN_LOG_LINE.match(line): - logging.info('%s: %s', self.__otcli, line) + if not OtCliCommandRunner.__pattern_log_line.match(line): + if line: + logging.info('%s: %s', self.__otcli, line) self.__pending_lines.put(line) diff --git a/tools/otci/otci/connectors.py b/tools/otci/otci/connectors.py index fa511c818..571640210 100644 --- a/tools/otci/otci/connectors.py +++ b/tools/otci/otci/connectors.py @@ -138,12 +138,12 @@ class OtNcpSim(OtCliPopen): class OtCliSerial(OtCliHandler): """Connector for OT CLI SOC devices via Serial.""" - def __init__(self, dev: str, baudrate: int): + def __init__(self, dev: str, baudrate: int, timeout: float = 0.1): self.__dev = dev self.__baudrate = baudrate import serial - self.__serial = serial.Serial(self.__dev, self.__baudrate, timeout=0.1, exclusive=True) + self.__serial = serial.Serial(self.__dev, self.__baudrate, timeout=timeout, exclusive=True) self.writeline('\r\n') self.__linebuffer = b'' diff --git a/tools/otci/otci/otci.py b/tools/otci/otci/otci.py index 582c04b39..d62885180 100644 --- a/tools/otci/otci/otci.py +++ b/tools/otci/otci/otci.py @@ -38,7 +38,7 @@ from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshComma from .command_handlers import OtbrAdbUsbCommandRunner from .connectors import Simulator from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError -from .types import ChildId, Rloc16, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix +from .types import ChildId, Rloc16, Ip4Addr, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix from .types import RouterTableEntry, NetifIdentifier from .utils import match_line, constant_property, bits_set @@ -66,6 +66,9 @@ class OTCI(object): """Gets the string representation of the OTCI instance.""" return repr(self.__otcmd) + def set_filter(self, filter: re.Pattern[str]): + self.__otcmd.set_filter(filter) + def wait(self, duration: float, expect_line: Optional[Union[str, Pattern[str], Collection[str]]] = None): """Wait for a given duration. @@ -98,21 +101,27 @@ class OTCI(object): cmd: str, timeout: float = 10, silent: bool = False, - already_is_ok: bool = True) -> List[str]: + already_is_ok: bool = True, + ignore_result: bool = False) -> List[str]: for i in range(self.__exec_command_retry + 1): try: - return self.__execute_command(cmd, timeout, silent, already_is_ok=already_is_ok) - except Exception: + return self.__execute_command(cmd, + timeout, + silent, + already_is_ok=already_is_ok, + ignore_result=ignore_result) + except Exception as e: self.wait(2) if i == self.__exec_command_retry: - raise + raise e from None assert False def __execute_command(self, cmd: str, timeout: float = 10, silent: bool = False, - already_is_ok: bool = True) -> List[str]: + already_is_ok: bool = True, + ignore_result: bool = False) -> List[str]: """Execute the OpenThread CLI command. :param cmd: The command to execute. @@ -129,7 +138,7 @@ class OTCI(object): for line in output: self.log('info', '%s', line) - if cmd in ('reset', 'factoryreset'): + if cmd in ('reset', 'factoryreset') or ignore_result: return output if output[-1] == 'Done' or (already_is_ok and output[-1] == 'Error 24: Already'): @@ -206,6 +215,27 @@ class OTCI(object): # which would lead to ValueError. return 0 + # + # Vendor operations + # + def get_vendor_name(self) -> str: + return self.__parse_str(self.execute_command('vendor name')) + + def set_vendor_name(self, name: str): + self.execute_command(f'vendor name {name}') + + def get_vendor_model(self) -> str: + return self.__parse_str(self.execute_command('vendor model')) + + def set_vendor_model(self, model: str): + self.execute_command(f'vendor model {model}') + + def get_vendor_sw_version(self) -> str: + return self.__parse_str(self.execute_command('vendor swversion')) + + def set_vendor_sw_version(self, version: str): + self.execute_command(f'vendor swversion {version}') + # # Basic device operations # @@ -834,10 +864,25 @@ class OTCI(object): _IPV6_SERVER_PORT_PATTERN = re.compile(r'\[(.*)\]:(\d+)') - def dns_get_config(self) -> Dict[str, Union[Tuple[Ip6Addr, int], int, bool]]: - """Get DNS client query config.""" + def dns_get_config(self) -> Dict[str, Union[Tuple[Ip6Addr, int], int, bool, str]]: + """Get DNS client query config. + """ output = self.execute_command('dns config') - config: Dict[str, Union[Tuple[Ip6Addr, int], int, bool]] = {} + config: Dict[str, Union[Tuple[Ip6Addr, int], int, bool, str]] = {} + + # + # Example output: + # > dns config + # Server: [fd00:0:0:0:0:0:0:1]:1234 + # ResponseTimeout: 5000 ms + # MaxTxAttempts: 2 + # RecursionDesired: no + # ServiceMode: srv_txt_opt + # Nat64Mode: allow + # TransportProtocol: udp + # Done + # + for line in output: k, v = line.split(': ') if k == 'Server': @@ -851,32 +896,50 @@ class OTCI(object): config['max_tx_attempts'] = int(v) elif k == 'RecursionDesired': config['recursion_desired'] = (v == 'yes') + elif k == 'ServiceMode': + config['service_mode'] = v + elif k == 'Nat64Mode': + config['nat64_mode'] = (v == 'allow') + elif k == 'TransportProtocol': + config['transport_protocol'] = v else: logging.warning("dns config ignored: %s", line) return config + @staticmethod + def __add_with_default(l: list[str], var: Optional[Any], default: str = '0'): + if var is not None: + if isinstance(var, bool): + l += ['1' if var else '0'] + else: + l += [f'{var}'] + elif bool(l): + l += [default] + def dns_set_config(self, - server: Tuple[Union[str, ipaddress.IPv6Address], int], + server: Union[Tuple[Union[str, ipaddress.IPv6Address], int], Tuple[()]], response_timeout: Optional[int] = None, max_tx_attempts: Optional[int] = None, - recursion_desired: Optional[bool] = None): + recursion_desired: Optional[bool] = None, + service_mode: Optional[str] = None, + transport_protocol: Optional[str] = None): """Set DNS client query config.""" - cmd = f'dns config {str(server[0])} {server[1]}' - if response_timeout is not None: - cmd += f' {response_timeout}' - - if not (max_tx_attempts is None or response_timeout is not None): - raise AssertionError('must specify `response_timeout` if `max_tx_attempts` is specified.') - if max_tx_attempts is not None: - cmd += f' {max_tx_attempts}' - - if not (recursion_desired is None or max_tx_attempts is not None): - raise AssertionError('must specify `max_tx_attempts` if `recursion_desired` is specified.') - - if recursion_desired is not None: - cmd += f' {1 if recursion_desired else 0}' + # working backwards so we can set defaults when required + cmd_parts: List[str] = [] + self.__add_with_default(cmd_parts, transport_protocol) + self.__add_with_default(cmd_parts, service_mode) + self.__add_with_default(cmd_parts, recursion_desired) + self.__add_with_default(cmd_parts, max_tx_attempts) + self.__add_with_default(cmd_parts, response_timeout) + if server: + self.__add_with_default(cmd_parts, server[1] or None) + self.__add_with_default(cmd_parts, server[0] or '::', '::') + else: + self.__add_with_default(cmd_parts, None) + self.__add_with_default(cmd_parts, None, '::') + cmd = f'dns config {" ".join(cmd_parts[::-1])}' self.execute_command(cmd) def dns_get_compression(self) -> bool: @@ -893,17 +956,27 @@ class OTCI(object): def dns_browse(self, service: str, - server: Optional[Tuple[Union[str, ipaddress.IPv6Address], int]] = None, + server: Optional[Union[Tuple[Union[str, ipaddress.IPv6Address], int], Tuple[()]]] = None, response_timeout: Optional[int] = None, max_tx_attempts: Optional[int] = None, recursion_desired: Optional[bool] = None) -> List[Dict[str, Any]]: """Browse DNS service instances.""" - args: List[Union[int, bool, str, ipaddress.IPv6Address, None]] - if server is None: - args = [service, response_timeout, max_tx_attempts, recursion_desired] + + cmd_parts: List[str] = [] + self.__add_with_default(cmd_parts, recursion_desired) + self.__add_with_default(cmd_parts, max_tx_attempts) + self.__add_with_default(cmd_parts, response_timeout) + if server: + self.__add_with_default(cmd_parts, server[1] or None) + self.__add_with_default(cmd_parts, server[0] or '::', '::') else: - args = [service, *server, response_timeout, max_tx_attempts, recursion_desired] - cmd = f'dns browse {" ".join([str(a) for a in args if a])}' + self.__add_with_default(cmd_parts, None) + self.__add_with_default(cmd_parts, None, '::') + + if cmd_parts: + cmd = f'dns browse {service} {" ".join(cmd_parts)}' + else: + cmd = f'dns browse {service}' output = '\n'.join(self.execute_command(cmd, 30.0)) result: List[Dict[str, Union[str, int, Ip6Addr, Dict[str, Union[bytes, bool]]]]] = [] @@ -926,10 +999,13 @@ class OTCI(object): return result - def dns_resolve(self, hostname: str) -> List[Dict[str, Union[Ip6Addr, int]]]: + def dns_resolve(self, + hostname: str, + ip_address: str = '', + ignore_result: bool = False) -> List[Dict[str, Union[Ip6Addr, int]]]: """Resolve a DNS host name.""" - cmd = f'dns resolve {hostname}' - output = self.execute_command(cmd, 30.0) + cmd = 'dns resolve ' + ' '.join([x for x in [hostname, ip_address] if x]) + output = self.execute_command(cmd, 30.0, ignore_result=ignore_result) dns_resp = output[0] addrs = dns_resp.strip().split(' - ')[1].split(' ') ips = [Ip6Addr(item.strip()) for item in addrs[::2]] @@ -946,17 +1022,29 @@ class OTCI(object): server: Optional[Tuple[Union[str, ipaddress.IPv6Address], int]] = None, response_timeout: Optional[int] = None, max_tx_attempts: Optional[int] = None, - recursion_desired: Optional[bool] = None) -> Dict[str, Any]: + recursion_desired: Optional[bool] = None, + ignore_result: bool = False) -> Dict[str, Any]: """Resolves a service instance.""" + + cmd_parts: List[str] = [] + self.__add_with_default(cmd_parts, recursion_desired) + self.__add_with_default(cmd_parts, max_tx_attempts) + self.__add_with_default(cmd_parts, response_timeout) + if server: + self.__add_with_default(cmd_parts, server[1] or None) + self.__add_with_default(cmd_parts, server[0] or '::') + else: + self.__add_with_default(cmd_parts, None) + self.__add_with_default(cmd_parts, None, '::') + instance = self.__escape_escapable(instance) - args: List[Union[int, bool, str, ipaddress.IPv6Address, None]] - if server is None: - args = [response_timeout, max_tx_attempts, recursion_desired] + if cmd_parts: + cmd = f'dns service {instance} {service} {" ".join(cmd_parts[::-1])}' else: - args = [*server, response_timeout, max_tx_attempts, recursion_desired] - cmd = f'dns service {instance} {service} {" ".join([str(a) for a in args if a])}' - output = self.execute_command(cmd, 30.0) + cmd = f'dns service {instance} {service}' + + output = self.execute_command(cmd, 30.0, ignore_result=ignore_result) m = re.match( r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) ' + @@ -979,6 +1067,23 @@ class OTCI(object): else: raise CommandError(cmd, output) + def dns_resolve4(self, + hostname: str, + ip_address: str = '', + ignore_result: bool = False) -> List[Dict[str, Union[Ip4Addr, int]]]: + """Resolve a DNS host name.""" + cmd = 'dns resolve4 ' + ' '.join([x for x in [hostname, ip_address] if x]) + output = self.execute_command(cmd, 30.0, ignore_result=ignore_result) + dns_resp = output[0] + addrs = dns_resp.strip().split(' - ')[1].split(' ') + ips = [Ip4Addr(item.strip()) for item in addrs[::2]] + ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]] + + return [{ + 'address': ip, + 'ttl': ttl, + } for ip, ttl in zip(ips, ttls)] + # # SRP server & client utilities # @@ -996,12 +1101,12 @@ class OTCI(object): self.execute_command('srp server disable') def srp_server_get_addressmode(self): - """Disable SRP server.""" - return self.__parse_str(self.execute_command(f'srp server addressmode')) + """Get the SRP server address mode.""" + return self.__parse_str(self.execute_command(f'srp server addrmode')) def srp_server_set_addressmode(self, mode: Literal['unicast', 'anycast']): - """Disable SRP server.""" - self.execute_command(f'srp server addressmode {mode}') + """Set the SRP server address mode.""" + self.execute_command(f'srp server addrmode {mode}') def srp_server_get_sequence_number(self) -> int: """Set SRP server sequence number.""" @@ -1485,30 +1590,6 @@ class OTCI(object): """ self.execute_command(f"csl period {period}") - def get_csl_channel(self) -> int: - """Get the channel CSL operates on - - Returns: - int: channel index - """ - return self.__parse_int(self.execute_command("csl channel")) - - def set_csl_channel(self, channel: int): - """Set the CSL channel - - Args: - channel (int): channel on which CSL will operate - """ - self.execute_command(f"csl channel {channel}") - - def get_csl_timeout(self) -> int: - """Get the CSL timeout - - Returns: - int: csl timeout [s] - """ - return self.__parse_int(self.execute_command("csl timeout")) - def set_csl_timeout(self, timeout: int): """Set the CSL timeout @@ -1527,16 +1608,16 @@ class OTCI(object): cfg: Dict[str, int] = {} for line in output: k, v = line.split(': ') - if k == 'Channel': - cfg['channel'] = int(v) - elif k == 'Timeout': + if k == 'channel': + cfg[k] = int(v) + elif k == 'timeout': matched = OTCI._CSL_TIMEOUT_PATTERN.match(v) assert matched is not None - cfg['timeout'] = int(matched.group(1)) - elif k == 'Period': + cfg[k] = int(matched.group(1)) + elif k == 'period': matched = OTCI._CSL_PERIOD_PATTERN.match(v) assert matched is not None - cfg['period'] = int(matched.group(1)) + cfg[k] = int(matched.group(1)) else: logging.warning("Ignore unknown CSL parameter: %s: %s", k, v) @@ -1656,35 +1737,33 @@ class OTCI(object): self.execute_command(f'commissioner announce {channel_mask} {count} {period} {destination}') def commissioner_energy_scan(self, channel_mask: int, count: int, period: int, duration: int, - destination: str | Ip6Addr) -> List[Dict[int, List[int]]]: + destination: str | Ip6Addr) -> Dict[int, List[int]]: """Perform an energy scan on the specified channels.""" - output = self.execute_command(f'commissioner energy {channel_mask} {count} {period} {duration} {destination}') - energy_reports: List[Dict[int, List[int]]] = [] - for line in output: - _mask, _energies = line.split(": ")[1].split(" ", 1) - channels = [b for b in bits_set(int(_mask))] - energies = [int(e) for e in _energies.split(" ")] - energy_reports.append({ch: energies[idx::len(channels)] for (idx, ch) in enumerate(channels)}) - return energy_reports + ch_count = len(list(bits_set(channel_mask))) + self.execute_command(f'commissioner energy {channel_mask} {count} {period} {duration} {destination}') + output = self.__otcmd.wait(ch_count * count * (period + duration + 999) / 1000 + 1) + + if len(output) > 1: + raise UnexpectedCommandOutput(output) + + _mask, _energies = output[0].split(": ")[1].split(" ", 1) + channels = [b for b in bits_set(int(_mask, 16))] + energies = [int(e) for e in _energies.split(" ")] + return {ch: energies[idx::len(channels)] for (idx, ch) in enumerate(channels)} def commissioner_mgmt_get(self, named_tlvs: Optional[Tuple[str, ...]] = None, - hex_tlvs: Optional[Tuple[int, ...]] = None) -> str: + hex_tlvs: Optional[Tuple[int, ...]] = None): """Send a MGMT_GET request.""" - if not named_tlvs and not hex_tlvs: - return "" + _cmd: List[str] = ['commissioner', 'mgmtget'] - if named_tlvs is not None: - _named_tlvs = " " + " ".join(named_tlvs) - else: - _named_tlvs = '' + if named_tlvs: + _cmd += named_tlvs - if hex_tlvs is not None: - _hex_tlvs = f' -x {"".join(f"{x:02x}" for x in hex_tlvs)}' or '' - else: - _hex_tlvs = '' + if hex_tlvs: + _cmd += [self.__detect_binary_cmd(), "".join(f"{x:02x}" for x in hex_tlvs)] - return self.__parse_str(self.execute_command(f'commissioner mgmtget{_named_tlvs}{_hex_tlvs}')) + self.execute_command(' '.join(_cmd)) def commissioner_mgmt_set(self, locator: Optional[str] = None, @@ -1693,7 +1772,7 @@ class OTCI(object): joiner_udp_port: Optional[int] = None, tlvs: Optional[str] = None): """Send a MGMT_SET request.""" - _names = ['locator', 'sessionid', 'steeringdata', 'joinerudpport', '-x'] + _names = ['locator', 'sessionid', 'steeringdata', 'joinerudpport', self.__detect_binary_cmd()] _tlvs: List[Union[int, str, None]] = [locator, session_id, steering_data, joiner_udp_port, tlvs] _cmd = [x for x in zip(_names, _tlvs) if x[1] is not None] @@ -1705,10 +1784,14 @@ class OTCI(object): def commissioner_panid_query(self, panid: int, channel_mask: int, destination: str | Ip6Addr) -> List[int]: """Perform a PAN ID query on the specified channels.""" - output = self.execute_command(f'commissioner panid {panid} {channel_mask} {destination}') - masks = [int(line.split(": ")[1].split(", ", 1)[1], 16) for line in output] - conflict_mask = functools.reduce(lambda x, y: x | y, masks) - return [b for b in bits_set(conflict_mask)] + self.execute_command(f'commissioner panid {panid} {channel_mask} {destination}') + output = self.__otcmd.wait(len(list(bits_set(channel_mask)))) + if output: + masks = [int(line.split(": ")[1].split(", ", 1)[1], 16) for line in output] + conflict_mask = functools.reduce(lambda x, y: x | y, masks) + return [b for b in bits_set(conflict_mask)] + else: + return [] # # Joiner operations @@ -1871,7 +1954,7 @@ class OTCI(object): def get_network_data_bytes(self) -> bytes: """Get the raw Network Data.""" - hexstr = self.__parse_str(self.execute_command('netdata show -x')) + hexstr = self.__parse_str(self.execute_command(f'netdata show {self.__detect_binary_cmd()}')) return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2)) def get_local_routes(self) -> List[Tuple[str, bool, str, Rloc16]]: @@ -1947,7 +2030,7 @@ class OTCI(object): if dataset in ('active', 'pending'): cmd = f'dataset commit {dataset}' else: - raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') + raise InvalidArgumentsError(f'Unknown dataset: {dataset}') self.execute_command(cmd) @@ -2018,7 +2101,7 @@ class OTCI(object): def get_dataset_bytes(self, dataset: str) -> bytes: if dataset in ('active', 'pending'): - cmd = f'dataset {dataset} -x' + cmd = f'dataset {dataset} {self.__detect_binary_cmd()}' else: raise InvalidArgumentsError(f'Unknown dataset: {dataset}') @@ -2061,7 +2144,7 @@ class OTCI(object): self.execute_command(f'dataset wakeupchannel {wakeupchannel}') if channel_mask is not None: - self.execute_command(f'dataset channelmask {channel_mask}') + self.execute_command(f'dataset channelmask {channel_mask:#08x}') if extpanid is not None: self.execute_command(f'dataset extpanid {extpanid}') @@ -2077,7 +2160,7 @@ class OTCI(object): self.execute_command(f'dataset networkname {self.__escape_escapable(network_name)}') if panid is not None: - self.execute_command(f'dataset panid {panid}') + self.execute_command(f'dataset panid {panid:#04x}') if pskc is not None: self.execute_command(f'dataset pskc {pskc}') @@ -2094,7 +2177,7 @@ class OTCI(object): address: Optional[str | Ip6Addr] = None, named_tlvs: Optional[List[Tuple[str, str]]] = None, hex_tlvs: Optional[Tuple[int, ...]] = None): - _cmd: List[str] = ['dataset', 'mgmtget', dataset] + _cmd: List[str] = ['dataset', 'mgmtgetcommand', dataset] if address is not None: _cmd += ['address', str(address)] @@ -2104,25 +2187,21 @@ class OTCI(object): _cmd += [item for sublist in named_tlvs for item in sublist] if hex_tlvs is not None: - _cmd += ['-x', ''.join([f'{tlv:02x}' for tlv in hex_tlvs])] + _cmd += [self.__detect_binary_cmd(), ''.join([f'{tlv:02x}' for tlv in hex_tlvs])] self.execute_command(' '.join(_cmd)) def dataset_mgmt_set_command(self, dataset: str, - address: Optional[str | Ip6Addr] = None, named_tlvs: Optional[List[Tuple[str, Any]]] = None, hex_tlvs: Optional[str] = None): - _cmd = ['dataset', 'mgmtset', dataset] - - if address is not None: - _cmd += ['address', address] + _cmd = ['dataset', 'mgmtsetcommand', dataset] if named_tlvs is not None: _cmd += list(sum(named_tlvs, ())) if hex_tlvs is not None: - _cmd += ['-x', hex_tlvs] + _cmd += [self.__detect_binary_cmd(), hex_tlvs] self.execute_command(' '.join(_cmd)) @@ -2213,6 +2292,9 @@ class OTCI(object): def __detect_networkkey_cmd(self) -> str: return 'networkkey' if self.api_version >= 126 else 'masterkey' + def __detect_binary_cmd(self) -> str: + return '-x' if self.api_version >= 28 else 'binary' + # # Unicast Addresses management # @@ -2484,7 +2566,11 @@ class OTCI(object): def get_dua_iid(self) -> str: """Get the DUA IID for Thread 1.2 device.""" - return self.__parse_iid(self.execute_command('dua iid')) + raw_iid = self.execute_command('dua iid') + if raw_iid: + return self.__parse_iid(raw_iid) + else: + return '' def set_dua_iid(self, iid: str): """Set the DUA IID for Thread 1.2 device.""" @@ -2621,7 +2707,7 @@ class OTCI(object): return results def __parse_numbers(self, line: str) -> List[int]: - values = re.findall(r"\-?\d+", line) + values = re.findall(r"-?\d+", line) return list(map(int, values)) def __valid_flags(self, flags: str, flags_set: str): @@ -2735,7 +2821,8 @@ class OTCI(object): port: Optional[int] = None, text: Optional[str] = None, random_bytes: Optional[int] = None, - hex: Optional[str] = None): + hex: Optional[str] = None, + return_result: bool = True): """Send a few bytes over UDP. ip: the IPv6 destination address. @@ -2764,7 +2851,7 @@ class OTCI(object): self.__validate_hex(hex) cmd += f' -x {hex}' - self.execute_command(cmd) + self.execute_command(cmd, ignore_result=not return_result) def udp_get_link_security(self) -> bool: """Gets whether the link security is enabled or disabled.""" @@ -2861,6 +2948,10 @@ class OTCI(object): # # Diag Utilities # + def diag(self): + """Get the current status of the diagnostics module.""" + return self.__parse_str(self.execute_command('diag')) + def diag_start(self): """Start diagnostics mode.""" self.execute_command('diag start') @@ -2875,8 +2966,7 @@ class OTCI(object): def diag_get_channel(self) -> int: """Get the IEEE 802.15.4 Channel value for diagnostics module.""" - line = self.__parse_str(self.execute_command('diag channel')) - return int(line) + return self.__parse_int(self.execute_command('diag channel')) def diag_set_power(self, power: int): """Set the tx power value(dBm) for diagnostics module.""" @@ -2884,8 +2974,7 @@ class OTCI(object): def diag_get_power(self) -> int: """Get the tx power value(dBm) for diagnostics module.""" - line = self.__parse_str(self.execute_command('diag power')) - return int(line) + return self.__parse_int(self.execute_command('diag power')) def diag_cw_start(self): """Start transmitting continuous carrier wave.""" @@ -2964,7 +3053,7 @@ class OTCI(object): """Set radio to receive mode.""" self.execute_command('diag radio receive') - def diag_radio_receive_number(self, number: int): + def diag_radio_receive_number(self, number: int) -> List[Dict[str, Union[int, str]]]: """Set radio to receive mode and receive specified number of packets.""" # # The `diag radio receive [lpr]` command example: @@ -2983,7 +3072,7 @@ class OTCI(object): if len(output) != number: raise UnexpectedCommandOutput(output) - result = [] + result: List[Dict[str, Union[int, str]]] = [] for line in output: data = line.split(',') @@ -3044,11 +3133,11 @@ class OTCI(object): result['sent_error_invalid_state_packets'] = int(output[4].split(":")[1]) result['sent_error_others_packets'] = int(output[5].split(":")[1]) - values = re.findall("\-?\d+", output[6]) + values = re.findall(r"-?\d+", output[6]) result['first_received_packet_rssi'] = int(values[0]) result['first_received_packet_lqi'] = int(values[1]) - values = re.findall("\-?\d+", output[7]) + values = re.findall(r"-?\d+", output[7]) result['last_received_packet_rssi'] = int(values[0]) result['last_received_packet_lqi'] = int(values[1]) @@ -3223,15 +3312,46 @@ class OTCI(object): # Other TODOs # - def get_network_diagnostics(self, addr: Union[str, Ip6Addr], type: list[int]) -> str: + def get_network_diagnostics(self, addr: Union[str, Ip6Addr], + tlvs: list[int]) -> Dict[str, Union[str, Dict[str, str]]]: """Get the network diagnostic information.""" - output = self.execute_command(f'networkdiagnostic get {addr} {" ".join(map(str, type))}') - return str(output) + args = [str(addr)] + list(map(str, tlvs)) + output = self.execute_command(f'networkdiagnostic get {" ".join(args)}') + # line 0 is the hex representation of the diagnostics + # below that is the human readable/parsed format + result: Dict[str, Union[str, Dict[str, str]]] = {} + diag: Optional[Union[str, Dict[str, str]]] = None + for line in output[1:]: + if not line.startswith(' '): + k, v = line.split(': ', 1) + diag = v or {} + result[line.split(': ', 1)[0]] = diag + else: + assert diag is not None and isinstance(diag, dict) + k, v = line.strip().split(': ') + diag[k] = v + return result + + def get_network_diagnostics_bytes(self, addr: Union[str, Ip6Addr], tlvs: list[int]) -> str: + """Get the network diagnostic information.""" + args = [str(addr)] + list(map(str, tlvs)) + output = self.execute_command(f'networkdiagnostic get {" ".join(args)}') + # line 0 is the hex representation of the diagnostics + # below that is the human readable/parsed format + return output[0].split(': ')[1] def reset_network_diagnostics(self, addr: Union[str, Ip6Addr], type: list[int]): """Reset the network diagnostic information.""" self.execute_command(f'networkdiagnostic reset {addr} {" ".join(map(str, type))}') + def get_network_diagnostics_non_preferred_channels(self) -> int: + """Get the non-preferred channels in the network diagnostics.""" + return self.__parse_int(self.execute_command('networkdiagnostic nonpreferredchannels'), 16) + + def set_network_diagnostics_non_preferred_channels(self, channels: int): + """Set the non-preferred channels in the network diagnostics.""" + self.execute_command(f'networkdiagnostic nonpreferredchannels {channels}') + __PARENT_KEY_MAP = { 'Ext Addr': 'extaddr', 'Rloc': 'rloc16', @@ -3239,17 +3359,24 @@ class OTCI(object): 'Link Quality Out': 'lq_out', 'Age': 'age', 'Version': 'version', + 'CSL clock accuracy': 'csl_clock_accuracy', + 'CSL uncertainty': 'csl_uncertainty', } - def get_parent(self) -> Dict[str, int]: + def get_parent(self) -> Dict[str, Union[int, str]]: """Get the diagnostic information for a Thread Router as parent.""" - data: Dict[str, int] = {} + data: Dict[str, Union[int, str]] = {} output = self.execute_command('parent') try: for line in output: k, v = line.split(': ') - data[OTCI.__PARENT_KEY_MAP[k]] = int(v, base=0) + if OTCI.__PARENT_KEY_MAP[k] == 'extaddr': + data[OTCI.__PARENT_KEY_MAP[k]] = v + elif OTCI.__PARENT_KEY_MAP[k] == 'rloc16': + data[OTCI.__PARENT_KEY_MAP[k]] = Rloc16(v, 16) + else: + data[OTCI.__PARENT_KEY_MAP[k]] = int(v, base=0) except KeyError: raise UnexpectedCommandOutput(output) @@ -3561,12 +3688,12 @@ def connect_otbr_ssh(host: str, return OTCI(cmd_handler) -def connect_otbr_adb_tcp(host: str, port: int = 5555, adb_key: Optional[str] = None): +def connect_otbr_adb_tcp(host: str, port: int = 5555, adb_key: Optional[str] = None) -> OTCI: cmd_handler = OtbrAdbTcpCommandRunner(host, port, adb_key) return OTCI(cmd_handler) -def connect_otbr_adb_usb(serial: str, adb_key: Optional[str] = None): +def connect_otbr_adb_usb(serial: str, adb_key: Optional[str] = None) -> OTCI: cmd_handler = OtbrAdbUsbCommandRunner(serial, adb_key) return OTCI(cmd_handler) diff --git a/tools/otci/otci/types.py b/tools/otci/otci/types.py index 879a3234a..2654dadea 100644 --- a/tools/otci/otci/types.py +++ b/tools/otci/otci/types.py @@ -127,6 +127,22 @@ class Ip6Prefix(ipaddress.IPv6Network): return super().__hash__() +class Ip4Addr(ipaddress.IPv4Address): + """Represents an IPv4 address.""" + + def __eq__(self, other): + if isinstance(other, str): + other = ipaddress.IPv4Address(other) + + return super().__eq__(other) + + def __repr__(self): + return self.compressed + + def __hash__(self): + return super().__hash__() + + SecurityPolicy = namedtuple('SecurityPolicy', ['rotation_time', 'flags']) """Represents a Security Policy configuration.""" diff --git a/tools/otci/otci/utils.py b/tools/otci/otci/utils.py index 39d609d99..2d942fc49 100644 --- a/tools/otci/otci/utils.py +++ b/tools/otci/otci/utils.py @@ -69,5 +69,5 @@ def bits_set(number: int) -> Generator[int, int, None]: while number != 0: if number & 1: yield idx - else: - number >>= 1 + number >>= 1 + idx += 1 diff --git a/tools/otci/tests/test_otci.py b/tools/otci/tests/test_otci.py index 4b94b38cb..19b75271a 100644 --- a/tools/otci/tests/test_otci.py +++ b/tools/otci/tests/test_otci.py @@ -131,6 +131,15 @@ class TestOTCI(unittest.TestCase): self.assertFalse(leader.get_router_eligible()) leader.enable_router_eligible() + leader.set_mesh_local_prefix('fd00:dba::/64') + self.assertEqual('fd00:dba::/64', leader.get_mesh_local_prefix()) + leader.set_mesh_local_prefix(TEST_MESH_LOCAL_PREFIX + '/64') + leader.set_ml_iid('b1a5ed57a71571c5') + leader.set_dua_iid('ad4a011dad4a011d') + self.assertEqual('ad4a011dad4a011d', leader.get_dua_iid()) + leader.clear_dua_iid() + self.assertEqual('', leader.get_dua_iid()) + self.assertFalse(leader.get_ifconfig_state()) # ifconfig up leader.ifconfig_up() @@ -198,6 +207,15 @@ class TestOTCI(unittest.TestCase): leader.set_allowlist([leader.get_extaddr()]) leader.disable_allowlist() + leader.enable_denylist() + leader.add_denylist(leader.get_extaddr()) + leader.remove_denylist(leader.get_extaddr()) + leader.set_denylist([leader.get_extaddr()]) + leader.disable_denylist() + + leader.enable_ccm() + leader.disable_ccm() + self.assertEqual([], leader.backbone_router_get_multicast_listeners()) leader.add_ipmaddr('ff04::1') @@ -255,7 +273,11 @@ class TestOTCI(unittest.TestCase): logging.info("CSL config: %r", leader.get_csl_config()) leader.config_csl(channel=13, period=16000, timeout=200) - logging.info("CSL config: %r", leader.get_csl_config()) + cfg = leader.get_csl_config() + logging.info("CSL config: %r", cfg) + self.assertEqual(13, cfg['channel']) + self.assertEqual(16000, cfg['period']) + self.assertEqual(200, cfg['timeout']) logging.info("EID-to-RLOC cache: %r", leader.get_eidcache()) @@ -331,6 +353,16 @@ class TestOTCI(unittest.TestCase): logging.info('dataset active -x: %r', leader.get_dataset_bytes('active')) logging.info('dataset pending -x: %r', leader.get_dataset_bytes('pending')) + leader.set_vendor_name('OpenThread') + self.assertEqual('OpenThread', leader.get_vendor_name()) + leader.set_vendor_model('some_model') + self.assertEqual('some_model', leader.get_vendor_model()) + leader.set_vendor_sw_version('1.0.0') + self.assertEqual('1.0.0', leader.get_vendor_sw_version()) + + leader.set_minimal_delay_timer(1) + self.assertEqual(1, leader.get_minimal_delay_timer()) + # Test SRP server & client self._test_otci_srp(leader, leader) @@ -355,7 +387,10 @@ class TestOTCI(unittest.TestCase): 'server': (server.get_ipaddr_rloc(), 53), 'response_timeout': 10000, 'max_tx_attempts': 4, - 'recursion_desired': False + 'recursion_desired': False, + 'service_mode': 'srv_txt_opt', + 'transport_protocol': 'udp', + 'nat64_mode': True }, client.dns_get_config()) self.assertTrue(client.dns_get_compression()) @@ -379,6 +414,11 @@ class TestOTCI(unittest.TestCase): server.srp_server_set_domain('default.service.arpa.') self.assertEqual('default.service.arpa.', server.srp_server_get_domain()) + server.srp_server_set_sequence_number(0x55) + self.assertEqual(0x55, server.srp_server_get_sequence_number()) + server.srp_server_set_addressmode('unicast') + self.assertEqual('unicast', server.srp_server_get_addressmode()) + default_leases = server.srp_server_get_lease() self.assertEqual(default_leases, (30, 97200, 30, 680400)) server.srp_server_set_lease(1801, 7201, 86401, 1209601) @@ -508,6 +548,7 @@ class TestOTCI(unittest.TestCase): self.assertEqual('Removed', client.srp_client_get_host()['state']) self.assertEqual([], server.srp_server_get_hosts()) self.assertEqual([], server.srp_server_get_services()) + client.srp_client_clear_host() def _test_otci_example(self, node1: OTCI, node2: OTCI): node1.dataset_init_buffer() @@ -601,6 +642,8 @@ class TestOTCI(unittest.TestCase): commissioner.wait(5) self.assertEqual('active', commissioner.get_commissioner_state()) + logging.info('commissioner.commissioner_get_session_id() = %d', commissioner.get_commissioner_session_id()) + logging.info('commissioner.get_network_id_timeout() = %d', commissioner.get_network_id_timeout()) commissioner.set_network_id_timeout(60) self.assertEqual(60, commissioner.get_network_id_timeout()) @@ -711,7 +754,27 @@ class TestOTCI(unittest.TestCase): rtt: Dict[str, float] = cast(Dict[str, float], statistics['round_trip_time']) self.assertTrue(rtt['min'] - 1e-9 <= rtt['avg'] <= rtt['max'] + 1e-9) + ed_report = commissioner.commissioner_energy_scan(3 << commissioner.get_channel(), 4, 32, 1000, + child1.get_ipaddr_rloc()) + comm_chan = commissioner.get_channel() + self.assertEqual({comm_chan: [-30, -30, -30, -30], comm_chan + 1: [-30, -30, -30, -30]}, ed_report) + + commissioner.commissioner_announce(TEST_CHANNEL_MASK, 1, 32, child1.get_ipaddr_rloc()) + + conflicts = commissioner.commissioner_panid_query(TEST_PANID, TEST_CHANNEL_MASK, child1.get_ipaddr_rloc()) + self.assertEqual([22], conflicts) + + parent = child1.get_parent() + self.assertEqual(parent['extaddr'], commissioner.get_extaddr()) + self.assertEqual(parent['rloc16'], commissioner.get_rloc16()) + + diags = commissioner.get_network_diagnostics(child1.get_ipaddr_rloc(), [0, 1]) + self.assertEqual({'Ext Address': f'{child1.get_extaddr()}', 'Rloc16': str(child1.get_rloc16())}, diags) + diags = commissioner.get_network_diagnostics_bytes(child2.get_ipaddr_rloc(), [0, 1]) + self.assertEqual('0008' + child2.get_extaddr() + '0102' + f'{child2.get_rloc16():04x}', diags) + # Shutdown + commissioner.commissioner_stop() leader.thread_stop() logging.info("node state: %s", leader.get_state()) leader.ifconfig_down()