mirror of
https://github.com/espressif/openthread.git
synced 2026-06-05 21:14:49 +00:00
[otci] add diag radio related commands support (#11112)
This commit is contained in:
+84
-2
@@ -2550,9 +2550,29 @@ class OTCI(object):
|
||||
"""Stop transmitting continuous carrier wave."""
|
||||
self.execute_command('diag cw stop')
|
||||
|
||||
def diag_frame(self, frame: str):
|
||||
def diag_frame(self,
|
||||
frame: str,
|
||||
max_csma_backoffs: Optional[int] = None,
|
||||
csma_ca_enabled: Optional[bool] = None,
|
||||
rx_channel_after_tx_done: Optional[int] = None,
|
||||
tx_delay: Optional[int] = None,
|
||||
tx_power: Optional[int] = None,
|
||||
max_frame_retries: Optional[int] = None,
|
||||
is_security_processed: Optional[bool] = None,
|
||||
is_header_updated: Optional[bool] = None):
|
||||
"""Set the frame (hex encoded) to be used by `diag send` and `diag repeat`."""
|
||||
self.execute_command(f'diag frame {frame}')
|
||||
command = f'diag frame '
|
||||
command += self.__get_optional_int_argument('-b', max_csma_backoffs)
|
||||
command += self.__get_optional_int_argument('-d', tx_delay)
|
||||
command += self.__get_optional_int_argument('-C', rx_channel_after_tx_done)
|
||||
command += self.__get_optional_int_argument('-p', tx_power)
|
||||
command += self.__get_optional_int_argument('-r', max_frame_retries)
|
||||
command += self.__get_optional_bool_argument('-c', csma_ca_enabled)
|
||||
command += self.__get_optional_bool_argument('-s', is_security_processed)
|
||||
command += self.__get_optional_bool_argument('-u', is_header_updated)
|
||||
command += f'{frame}'
|
||||
|
||||
self.execute_command(command)
|
||||
|
||||
def diag_stream_start(self):
|
||||
"""Start transmitting a stream of characters."""
|
||||
@@ -2586,10 +2606,66 @@ class OTCI(object):
|
||||
"""Enter radio sleep mode."""
|
||||
self.execute_command('diag radio sleep')
|
||||
|
||||
def diag_radio_enable(self):
|
||||
"""Enable the radio."""
|
||||
self.execute_command('diag radio enable')
|
||||
|
||||
def diag_radio_disable(self):
|
||||
"""Disable the radio."""
|
||||
self.execute_command('diag radio disable')
|
||||
|
||||
def diag_radio_receive(self):
|
||||
"""Set radio to receive mode."""
|
||||
self.execute_command('diag radio receive')
|
||||
|
||||
def diag_radio_receive_number(self, number: int):
|
||||
"""Set radio to receive mode and receive specified number of packets."""
|
||||
#
|
||||
# The `diag radio receive <number> [lpr]` command example:
|
||||
#
|
||||
# > diag radio receive 5 lpr
|
||||
# 0, rssi:-49, lqi:119, len:10, psdu:000102030405060771e
|
||||
# 1, rssi:-51, lqi:112, len:10, psdu:000102030405060771e
|
||||
# 2, rssi:-42, lqi:120, len:10, psdu:000102030405060771e
|
||||
# 3, rssi:-54, lqi:111, len:10, psdu:000102030405060771e
|
||||
# 4, rssi:-56, lqi:108, len:10, psdu:000102030405060771e
|
||||
# Done
|
||||
#
|
||||
|
||||
output = self.execute_command(f'diag radio receive {number} lpr')
|
||||
|
||||
if len(output) != number:
|
||||
raise UnexpectedCommandOutput(output)
|
||||
|
||||
result = []
|
||||
|
||||
for line in output:
|
||||
data = line.split(',')
|
||||
|
||||
if len(data) != 5:
|
||||
raise UnexpectedCommandOutput(data)
|
||||
|
||||
result.append({
|
||||
'rssi': int(data[1].split(":")[1]),
|
||||
'lqi': int(data[2].split(":")[1]),
|
||||
'len': int(data[3].split(":")[1]),
|
||||
'psdu': data[4].split(":")[1],
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
def diag_enable_radio_receive_filter(self):
|
||||
"""Enable the radio receive filter."""
|
||||
self.execute_command('diag radio receive filter enable')
|
||||
|
||||
def diag_disable_radio_receive_filter(self):
|
||||
"""Disable the radio receive filter."""
|
||||
self.execute_command('diag radio receive filter disable')
|
||||
|
||||
def diag_set_radio_receive_filter_dest_mac_address(self, dest_mac_address: str):
|
||||
"""Set the destination mac address of the radio receive filter."""
|
||||
self.execute_command(f'diag radio receive filter {dest_mac_address}')
|
||||
|
||||
def diag_get_radio_state(self) -> str:
|
||||
"""Get the state of the radio."""
|
||||
return self.__parse_str(self.execute_command('diag radio state'))
|
||||
@@ -3036,6 +3112,12 @@ class OTCI(object):
|
||||
|
||||
return ''.join('%02x' % b for b in txt_bin)
|
||||
|
||||
def __get_optional_int_argument(self, arg_name: str, arg_value: Optional[int] = None):
|
||||
return arg_name + f' {arg_value} ' if arg_value is not None else ''
|
||||
|
||||
def __get_optional_bool_argument(self, arg_name: str, arg_value: Optional[bool] = None):
|
||||
return arg_name + ' ' if arg_value is not None and arg_value else ''
|
||||
|
||||
|
||||
def connect_cli_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI:
|
||||
cli_handler = connectors.OtCliSim(executable, nodeid, simulator=simulator)
|
||||
|
||||
Reference in New Issue
Block a user