[otci] fix parsing of dns_resolve4 and csl get/set methods (#11786)

fix parsing of

* dns_resolve4 returns a synthesized ipv6 address, but parsing expected an ipv4 address

fix csl methods

* add support for getting csl uncertainty/accuracy
* add support for setting the csl channel
* remove the non existing get_csl_period

renames variables/arguments that had a python builtin name
This commit is contained in:
Thomas
2025-08-12 20:14:05 +02:00
committed by GitHub
parent 3ec7f23c5d
commit 09214764ec
2 changed files with 111 additions and 75 deletions
+110 -74
View File
@@ -26,6 +26,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
"""Python wrapper around the OpenThread CLI"""
import functools
import ipaddress
import logging
@@ -38,8 +40,8 @@ from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshComma
from .command_handlers import OtbrAdbUsbCommandRunner
from .connectors import Simulator
from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError
from .types import ChildId, Rloc16, Ip4Addr, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix
from .types import RouterTableEntry, NetifIdentifier
from .types import ChildId, Rloc16, Ip4Addr, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy
from .types import Ip6Prefix, RouterTableEntry, NetifIdentifier
from .utils import match_line, constant_property, bits_set
@@ -66,8 +68,8 @@ class OTCI(object):
"""Gets the string representation of the OTCI instance."""
return repr(self.__otcmd)
def set_filter(self, filter: re.Pattern[str]):
self.__otcmd.set_filter(filter)
def set_filter(self, new_filter: re.Pattern[str]):
self.__otcmd.set_filter(new_filter)
def wait(self, duration: float, expect_line: Optional[Union[str, Pattern[str], Collection[str]]] = None):
"""Wait for a given duration.
@@ -464,11 +466,11 @@ class OTCI(object):
def set_panid(self, panid: int):
"""Get the IEEE 802.15.4 PAN ID value."""
self.execute_command('panid %d' % panid)
self.execute_command(f'panid {panid}')
def set_network_name(self, name: str):
"""Set network name."""
self.execute_command('networkname %s' % self.__escape_escapable(name))
self.execute_command(f'networkname {self.__escape_escapable(name)}')
def get_network_name(self):
"""Get network name."""
@@ -650,7 +652,7 @@ class OTCI(object):
k, v = line.split(': ')
data[OTCI.__LEADER_DATA_KEY_MAP[k]] = int(v)
except KeyError:
raise UnexpectedCommandOutput(output)
raise UnexpectedCommandOutput(output) from None
return data
@@ -746,10 +748,10 @@ class OTCI(object):
def col(col_name: str):
return self.__get_table_col(col_name, headers, fields)
id = col('ID')
rid = col('ID')
table[RouterId(id)] = router = RouterTableEntry({
'id': RouterId(id),
table[RouterId(rid)] = router = RouterTableEntry({
'id': RouterId(rid),
'rloc16': Rloc16(col('RLOC16'), 16),
'next_hop': int(col('Next Hop')),
'path_cost': int(col('Path Cost')),
@@ -763,12 +765,12 @@ class OTCI(object):
router['link'] = int(col('Link'))
else:
# support older version of OT which does not output `Link` field
router['link'] = self.get_router_info(RouterId(id), silent=True)['link']
router['link'] = self.get_router_info(RouterId(rid), silent=True)['link']
return table
def get_router_info(self, id: int, silent: bool = False) -> RouterTableEntry:
cmd = f'router {id}'
def get_router_info(self, rid: int, silent: bool = False) -> RouterTableEntry:
cmd = f'router {rid}'
output = self.execute_command(cmd, silent=silent)
items = [line.strip().split(': ') for line in output]
@@ -779,7 +781,7 @@ class OTCI(object):
return self.__get_table_col(col_name, headers, fields)
return RouterTableEntry({
'id': RouterId(id),
'id': RouterId(rid),
'rloc16': Rloc16(col('Rloc'), 16),
'alloc': int(col('Alloc')),
'next_hop': int(col('Next Hop'), 16) >> 10, # convert RLOC16 to Router ID
@@ -818,7 +820,7 @@ class OTCI(object):
def col(col_name: str):
return self.__get_table_col(col_name, headers, fields)
id = int(col("ID"))
cid = int(col("ID"))
r, d, n = int(col("R")), int(col("D")), int(col("N"))
#
@@ -832,7 +834,7 @@ class OTCI(object):
f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}{"-" if r == d == n == 0 else ""}')
child: Dict[str, Union[ChildId, Rloc16, int, str]] = {
'id': ChildId(id),
'id': ChildId(cid),
'rloc16': Rloc16(col('RLOC16'), 16),
'timeout': int(col('Timeout')),
'age': int(col('Age')),
@@ -854,7 +856,7 @@ class OTCI(object):
if 'Suprvsn' in headers:
child['suprvsn'] = int(col('Suprvsn'))
table[ChildId(id)] = child
table[ChildId(cid)] = child
return table
@@ -895,11 +897,11 @@ class OTCI(object):
elif k == 'MaxTxAttempts':
config['max_tx_attempts'] = int(v)
elif k == 'RecursionDesired':
config['recursion_desired'] = (v == 'yes')
config['recursion_desired'] = v == 'yes'
elif k == 'ServiceMode':
config['service_mode'] = v
elif k == 'Nat64Mode':
config['nat64_mode'] = (v == 'allow')
config['nat64_mode'] = v == 'allow'
elif k == 'TransportProtocol':
config['transport_protocol'] = v
else:
@@ -908,14 +910,14 @@ class OTCI(object):
return config
@staticmethod
def __add_with_default(l: list[str], var: Optional[Any], default: str = '0'):
def __add_with_default(li: list[str], var: Optional[Any], default: str = '0'):
if var is not None:
if isinstance(var, bool):
l += ['1' if var else '0']
li += ['1' if var else '0']
else:
l += [f'{var}']
elif bool(l):
l += [default]
li += [f'{var}']
elif bool(li):
li += [default]
def dns_set_config(self,
server: Union[Tuple[Union[str, ipaddress.IPv6Address], int], Tuple[()]],
@@ -1070,19 +1072,26 @@ class OTCI(object):
def dns_resolve4(self,
hostname: str,
ip_address: str = '',
ignore_result: bool = False) -> List[Dict[str, Union[Ip4Addr, int]]]:
ignore_result: bool = False) -> List[Dict[str, Union[Ip4Addr, Ip6Addr, int]]]:
"""Resolve a DNS host name."""
cmd = 'dns resolve4 ' + ' '.join([x for x in [hostname, ip_address] if x])
output = self.execute_command(cmd, 30.0, ignore_result=ignore_result)
dns_resp = output[0]
addrs = dns_resp.strip().split(' - ')[1].split(' ')
ips = [Ip4Addr(item.strip()) for item in addrs[::2]]
ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
try:
addrs = dns_resp.strip().split(' - ')[1].split(' ')
# OpenThread synthesizes an IPv6 address from the IPv4 answer
ips = [Ip6Addr(item.strip()) for item in addrs[::2]]
ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
return [{
'address': ip,
'ttl': ttl,
} for ip, ttl in zip(ips, ttls)]
return [{
'ipv6': ip,
'ipv4': Ip4Addr(int("".join(ip.exploded.rsplit(':', 2)[1:]), 16)),
'ttl': ttl,
} for ip, ttl in zip(ips, ttls)]
except Exception:
if ignore_result:
return []
raise UnexpectedCommandOutput(output) from None
#
# SRP server & client utilities
@@ -1102,15 +1111,15 @@ class OTCI(object):
def srp_server_get_addressmode(self):
"""Get the SRP server address mode."""
return self.__parse_str(self.execute_command(f'srp server addrmode'))
return self.__parse_str(self.execute_command('srp server addrmode'))
def srp_server_set_addressmode(self, mode: Literal['unicast', 'anycast']):
"""Set the SRP server address mode."""
self.execute_command(f'srp server addrmode {mode}')
def srp_server_get_sequence_number(self) -> int:
"""Set SRP server sequence number."""
return self.__parse_int(self.execute_command(f'srp server seqnum'))
"""Get SRP server sequence number."""
return self.__parse_int(self.execute_command('srp server seqnum'))
def srp_server_set_sequence_number(self, seq: int):
"""Set SRP server sequence number."""
@@ -1147,7 +1156,7 @@ class OTCI(object):
if v not in ('true', 'false'):
raise UnexpectedCommandOutput(output)
info['deleted'] = (v == 'true')
info['deleted'] = v == 'true'
elif k == 'addresses':
if not v.startswith('[') or not v.endswith(']'):
@@ -1174,7 +1183,7 @@ class OTCI(object):
if v not in ('true', 'false'):
raise UnexpectedCommandOutput(output)
info['deleted'] = (v == 'true')
info['deleted'] = v == 'true'
elif k == 'addresses':
if not v.startswith('[') or not v.endswith(']'):
@@ -1572,15 +1581,21 @@ class OTCI(object):
Only for Reference Device."""
self.execute_command(f'pollperiod {poll_period}')
# TODO: csl
def get_csl_period(self) -> int:
"""Get the CSL period
def get_csl_accuracy(self) -> int:
"""Get the CSL accuracy in ppm
Returns:
int: csl period [us] (multiple of 160us)
int: csl accuracy [ppm]
"""
return self.__parse_int(self.execute_command("csl period"))
return self.__parse_int(self.execute_command("csl accuracy"))
def get_csl_uncertainty(self) -> int:
"""Get the CSL uncertainty
Returns:
int: CSL uncertainty [10us]
"""
return self.__parse_int(self.execute_command("csl uncertainty"))
def set_csl_period(self, period: int):
"""Set the CSL timeout
@@ -1590,6 +1605,14 @@ class OTCI(object):
"""
self.execute_command(f"csl period {period}")
def set_csl_channel(self, channel: int):
"""Set the CSL channel
Args:
channel (int): channel on which CSL will operate
"""
self.execute_command(f"csl channel {channel}")
def set_csl_timeout(self, timeout: int):
"""Set the CSL timeout
@@ -1844,7 +1867,7 @@ class OTCI(object):
try:
return Ip6Prefix(str(ipaddress.IPv6Network(output[0])))
except ValueError:
raise UnexpectedCommandOutput(output)
raise UnexpectedCommandOutput(output) from None
def __parse_prefixes(self, output: List[str]) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
prefixes: List[Tuple[Ip6Prefix, str, str, Rloc16]] = []
@@ -2369,15 +2392,16 @@ class OTCI(object):
# Border Router utilities
#
def get_br_omr_prefix(self,
type: Optional[Literal["local", "favored"]] = None) -> Dict[str, Tuple[Ip6Prefix, str]]:
prefix_type: Optional[Literal["local",
"favored"]] = None) -> Dict[str, Tuple[Ip6Prefix, str]]:
"""Get the Border Router On-Mesh Prefix."""
prefixes: Dict[str, Tuple[Ip6Prefix, str]] = {}
types = ('local', 'favored')
cmd = 'br omrprefix'
if type is not None:
if type not in types:
raise InvalidArgumentsError(f"Unknown type: {type}")
cmd += f' {type}'
if prefix_type is not None:
if prefix_type not in types:
raise InvalidArgumentsError(f"Unknown type: {prefix_type}")
cmd += f' {prefix_type}'
output = self.execute_command(cmd)
@@ -2562,7 +2586,7 @@ class OTCI(object):
def set_domain_name(self, name: str):
"""Set the Thread Domain Name for Thread 1.2 device."""
self.execute_command('domainname %s' % self.__escape_escapable(name))
self.execute_command(f'domainname {self.__escape_escapable(name)}')
def get_dua_iid(self) -> str:
"""Get the DUA IID for Thread 1.2 device."""
@@ -2821,7 +2845,7 @@ class OTCI(object):
port: Optional[int] = None,
text: Optional[str] = None,
random_bytes: Optional[int] = None,
hex: Optional[str] = None,
hex_str: Optional[str] = None,
return_result: bool = True):
"""Send a few bytes over UDP.
@@ -2835,7 +2859,7 @@ class OTCI(object):
if (ip is None) != (port is None):
raise InvalidArgumentsError("Please specify both `ip` and `port`.")
if (text is not None) + (random_bytes is not None) + (hex is not None) != 1:
if (text is not None) + (random_bytes is not None) + (hex_str is not None) != 1:
raise InvalidArgumentsError("Please specify `text` or `random_bytes` or `hex`.")
cmd = 'udp send'
@@ -2847,9 +2871,9 @@ class OTCI(object):
cmd += f' -t {text}'
elif random_bytes is not None:
cmd += f' -s {random_bytes}'
elif hex is not None:
self.__validate_hex(hex)
cmd += f' -x {hex}'
elif hex_str is not None:
self.__validate_hex(hex_str)
cmd += f' -x {hex_str}'
self.execute_command(cmd, ignore_result=not return_result)
@@ -2899,28 +2923,40 @@ class OTCI(object):
"""Stops the application coap service."""
self.execute_command('coap stop')
def coap_get(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con"):
cmd = f'coap get {addr} {uri_path} {type}'
def coap_get(self, addr: Union[str, Ip6Addr], uri_path: str, coap_type: str = "con"):
cmd = f'coap get {addr} {uri_path} {coap_type}'
self.execute_command(cmd)
def coap_put(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
cmd = f'coap put {addr} {uri_path} {type}'
def coap_put(self,
addr: Union[str, Ip6Addr],
uri_path: str,
coap_type: str = "con",
payload: Optional[str] = None):
cmd = f'coap put {addr} {uri_path} {coap_type}'
if payload is not None:
cmd += f' {payload}'
self.execute_command(cmd)
def coap_post(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
cmd = f'coap post {addr} {uri_path} {type}'
def coap_post(self,
addr: Union[str, Ip6Addr],
uri_path: str,
coap_type: str = "con",
payload: Optional[str] = None):
cmd = f'coap post {addr} {uri_path} {coap_type}'
if payload is not None:
cmd += f' {payload}'
self.execute_command(cmd)
def coap_delete(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
cmd = f'coap delete {addr} {uri_path} {type}'
def coap_delete(self,
addr: Union[str, Ip6Addr],
uri_path: str,
coap_type: str = "con",
payload: Optional[str] = None):
cmd = f'coap delete {addr} {uri_path} {coap_type}'
if payload is not None:
cmd += f' {payload}'
@@ -2995,7 +3031,7 @@ class OTCI(object):
is_security_processed: Optional[bool] = None,
is_header_updated: Optional[bool] = None):
"""Set the frame (hex encoded) to be used by `diag send` and `diag repeat`."""
command = f'diag frame '
command = 'diag frame '
command += self.__get_optional_int_argument('-b', max_csma_backoffs)
command += self.__get_optional_int_argument('-d', tx_delay)
command += self.__get_optional_int_argument('-C', rx_channel_after_tx_done)
@@ -3186,7 +3222,7 @@ class OTCI(object):
# Done
#
result: List[Dict[str, Union[int, bytes]]] = []
output = self.execute_command(f'diag powersettings')
output = self.execute_command('diag powersettings')
if len(output) < 3:
raise UnexpectedCommandOutput(output)
@@ -3340,9 +3376,9 @@ class OTCI(object):
# below that is the human readable/parsed format
return output[0].split(': ')[1]
def reset_network_diagnostics(self, addr: Union[str, Ip6Addr], type: list[int]):
def reset_network_diagnostics(self, addr: Union[str, Ip6Addr], diag_type: list[int]):
"""Reset the network diagnostic information."""
self.execute_command(f'networkdiagnostic reset {addr} {" ".join(map(str, type))}')
self.execute_command(f'networkdiagnostic reset {addr} {" ".join(map(str, diag_type))}')
def get_network_diagnostics_non_preferred_channels(self) -> int:
"""Get the non-preferred channels in the network diagnostics."""
@@ -3378,7 +3414,7 @@ class OTCI(object):
else:
data[OTCI.__PARENT_KEY_MAP[k]] = int(v, base=0)
except KeyError:
raise UnexpectedCommandOutput(output)
raise UnexpectedCommandOutput(output) from None
return data
@@ -3557,7 +3593,7 @@ class OTCI(object):
try:
self.__validate_network_key(networkkey)
except ValueError:
raise UnexpectedCommandOutput(output)
raise UnexpectedCommandOutput(output) from None
return networkkey
@@ -3573,7 +3609,7 @@ class OTCI(object):
try:
self.__validate_hex64b(extaddr)
except ValueError:
raise UnexpectedCommandOutput(output)
raise UnexpectedCommandOutput(output) from None
return extaddr
@@ -3612,7 +3648,7 @@ class OTCI(object):
def __validate_hex_or_bytes(self, data: Union[str, bytes]) -> str:
if isinstance(data, bytes):
return ''.join('%02x' % c for c in data)
return ''.join(f'{c:02x}' for c in data)
elif isinstance(data, str):
self.__validate_hex(data)
return data
@@ -3624,14 +3660,14 @@ class OTCI(object):
return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
def __bytes_to_hex(self, data: bytes) -> str:
return ''.join('%02x' % b for b in data)
return ''.join(f'{b:02x}' for b in data)
def __escape_escapable(self, s: str) -> str:
"""Escape CLI escapable characters in the given string.
"""
escapable_chars = '\\ \t\r\n'
for char in escapable_chars:
s = s.replace(char, '\\%s' % char)
s = s.replace(char, f'\\{char}')
return s
def __txt_to_hex(self, txt: Dict[str, Union[str, bytes, bool]]) -> str:
@@ -3652,7 +3688,7 @@ class OTCI(object):
txt_bin += bytes([len(entry)])
txt_bin += entry
return ''.join('%02x' % b for b in txt_bin)
return ''.join(f'{b:02x}' for b in txt_bin)
def __get_optional_int_argument(self, arg_name: str, arg_value: Optional[int] = None):
return arg_name + f' {arg_value} ' if arg_value is not None else ''
+1 -1
View File
@@ -334,7 +334,7 @@ class TestOTCI(unittest.TestCase):
leader.udp_bind("::", 1234, netif=netif)
leader.udp_send(leader.get_ipaddr_rloc(), 1234, text='hello')
leader.udp_send(leader.get_ipaddr_rloc(), 1234, random_bytes=3)
leader.udp_send(leader.get_ipaddr_rloc(), 1234, hex='112233')
leader.udp_send(leader.get_ipaddr_rloc(), 1234, hex_str='112233')
leader.wait(1)
leader.udp_close()