mirror of
https://github.com/espressif/openthread.git
synced 2026-06-29 17:00:23 +00:00
580 lines
19 KiB
Python
580 lines
19 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Copyright (c) 2016, The OpenThread Authors.
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
# 1. Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
# 2. Redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
# 3. Neither the name of the copyright holder nor the
|
|
# names of its contributors may be used to endorse or promote products
|
|
# derived from this software without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
#
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import pexpect
|
|
|
|
class otCli:
|
|
def __init__(self, nodeid):
|
|
self.nodeid = nodeid
|
|
self.verbose = int(float(os.getenv('VERBOSE', 0)))
|
|
self.node_type = os.getenv('NODE_TYPE', 'sim')
|
|
|
|
if self.node_type == 'soc':
|
|
self.__init_soc(nodeid)
|
|
elif self.node_type == 'ncp-sim':
|
|
self.__init_ncp_sim(nodeid)
|
|
else:
|
|
self.__init_sim(nodeid)
|
|
|
|
if self.verbose:
|
|
self.pexpect.logfile_read = sys.stdout
|
|
|
|
def __init_sim(self, nodeid):
|
|
""" Initialize a simulation node. """
|
|
if "OT_CLI_PATH" in os.environ.keys():
|
|
cmd = os.environ['OT_CLI_PATH']
|
|
elif "top_builddir" in os.environ.keys():
|
|
srcdir = os.environ['top_builddir']
|
|
cmd = '%s/examples/apps/cli/ot-cli-ftd' % srcdir
|
|
else:
|
|
cmd = './ot-cli-ftd'
|
|
cmd += ' %d' % nodeid
|
|
print ("%s" % cmd)
|
|
|
|
self.pexpect = pexpect.spawn(cmd, timeout=4)
|
|
|
|
# Add delay to ensure that the process is ready to receive commands.
|
|
time.sleep(0.2)
|
|
|
|
|
|
def __init_ncp_sim(self, nodeid):
|
|
""" Initialize an NCP simulation node. """
|
|
if "top_builddir" in os.environ.keys():
|
|
builddir = os.environ['top_builddir']
|
|
cmd = 'spinel-cli.py -p %s/examples/apps/ncp/ot-ncp-ftd -n' % (builddir)
|
|
else:
|
|
cmd = './ot-ncp-ftd'
|
|
cmd += ' %d' % nodeid
|
|
print ("%s" % cmd)
|
|
|
|
self.pexpect = pexpect.spawn(cmd, timeout=4)
|
|
time.sleep(0.2)
|
|
self.pexpect.expect('spinel-cli >')
|
|
self.debug(int(os.getenv('DEBUG', '0')))
|
|
|
|
def __init_soc(self, nodeid):
|
|
""" Initialize a System-on-a-chip node connected via UART. """
|
|
import fdpexpect
|
|
serialPort = '/dev/ttyUSB%d' % ((nodeid-1)*2)
|
|
self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR|os.O_NONBLOCK|os.O_NOCTTY))
|
|
|
|
def __del__(self):
|
|
if self.pexpect.isalive():
|
|
self.send_command('exit')
|
|
self.pexpect.expect(pexpect.EOF)
|
|
self.pexpect.terminate()
|
|
self.pexpect.close(force=True)
|
|
|
|
def send_command(self, cmd):
|
|
print ("%d: %s" % (self.nodeid, cmd))
|
|
self.pexpect.sendline(cmd)
|
|
|
|
def get_commands(self):
|
|
self.send_command('?')
|
|
self.pexpect.expect('Commands:')
|
|
commands = []
|
|
while True:
|
|
i = self.pexpect.expect(['Done', '(\S+)'])
|
|
if i != 0:
|
|
commands.append(self.pexpect.match.groups()[0])
|
|
else:
|
|
break
|
|
return commands
|
|
|
|
def set_mode(self, mode):
|
|
cmd = 'mode ' + mode
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def debug(self, level):
|
|
self.send_command('debug '+str(level))
|
|
|
|
def interface_up(self):
|
|
self.send_command('ifconfig up')
|
|
self.pexpect.expect('Done')
|
|
|
|
def interface_down(self):
|
|
self.send_command('ifconfig down')
|
|
self.pexpect.expect('Done')
|
|
|
|
def thread_start(self):
|
|
self.send_command('thread start')
|
|
self.pexpect.expect('Done')
|
|
|
|
def thread_stop(self):
|
|
self.send_command('thread stop')
|
|
self.pexpect.expect('Done')
|
|
|
|
def commissioner_start(self):
|
|
cmd = 'commissioner start'
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def commissioner_add_joiner(self, addr, psk):
|
|
cmd = 'commissioner joiner add ' + addr + ' ' + psk
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def joiner_start(self, pskd='', provisioning_url=''):
|
|
cmd = 'joiner start ' + pskd + ' ' + provisioning_url
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def clear_whitelist(self):
|
|
self.send_command('whitelist clear')
|
|
self.pexpect.expect('Done')
|
|
|
|
def enable_whitelist(self):
|
|
self.send_command('whitelist enable')
|
|
self.pexpect.expect('Done')
|
|
|
|
def disable_whitelist(self):
|
|
self.send_command('whitelist disable')
|
|
self.pexpect.expect('Done')
|
|
|
|
def add_whitelist(self, addr, rssi=None):
|
|
cmd = 'whitelist add ' + addr
|
|
if rssi != None:
|
|
cmd += ' ' + str(rssi)
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def remove_whitelist(self, addr):
|
|
cmd = 'whitelist remove ' + addr
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_addr16(self):
|
|
self.send_command('rloc16')
|
|
i = self.pexpect.expect('([0-9a-fA-F]{4})')
|
|
if i == 0:
|
|
addr16 = int(self.pexpect.match.groups()[0], 16)
|
|
self.pexpect.expect('Done')
|
|
return addr16
|
|
|
|
def get_addr64(self):
|
|
self.send_command('extaddr')
|
|
i = self.pexpect.expect('([0-9a-fA-F]{16})')
|
|
if i == 0:
|
|
addr64 = self.pexpect.match.groups()[0].decode("utf-8")
|
|
self.pexpect.expect('Done')
|
|
return addr64
|
|
|
|
def get_hashmacaddr(self):
|
|
self.send_command('hashmacaddr')
|
|
i = self.pexpect.expect('([0-9a-fA-F]{16})')
|
|
if i == 0:
|
|
addr = self.pexpect.match.groups()[0].decode("utf-8")
|
|
self.pexpect.expect('Done')
|
|
return addr
|
|
|
|
def get_channel(self):
|
|
self.send_command('channel')
|
|
i = self.pexpect.expect('(\d+)\r\n')
|
|
if i == 0:
|
|
channel = int(self.pexpect.match.groups()[0])
|
|
self.pexpect.expect('Done')
|
|
return channel
|
|
|
|
def set_channel(self, channel):
|
|
cmd = 'channel %d' % channel
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_masterkey(self):
|
|
self.send_command('masterkey')
|
|
i = self.pexpect.expect('([0-9a-fA-F]{32})')
|
|
if i == 0:
|
|
masterkey = self.pexpect.match.groups()[0].decode("utf-8")
|
|
self.pexpect.expect('Done')
|
|
return masterkey
|
|
|
|
def set_masterkey(self, masterkey):
|
|
cmd = 'masterkey ' + masterkey
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_key_sequence_counter(self):
|
|
self.send_command('keysequence counter')
|
|
i = self.pexpect.expect('(\d+)\r\n')
|
|
if i == 0:
|
|
key_sequence_counter = int(self.pexpect.match.groups()[0])
|
|
self.pexpect.expect('Done')
|
|
return key_sequence_counter
|
|
|
|
def set_key_sequence_counter(self, key_sequence_counter):
|
|
cmd = 'keysequence counter %d' % key_sequence_counter
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def set_key_switch_guardtime(self, key_switch_guardtime):
|
|
cmd = 'keysequence guardtime %d' % key_switch_guardtime
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def set_network_id_timeout(self, network_id_timeout):
|
|
cmd = 'networkidtimeout %d' % network_id_timeout
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_network_name(self):
|
|
self.send_command('networkname')
|
|
while True:
|
|
i = self.pexpect.expect(['Done', '(\S+)'])
|
|
if i != 0:
|
|
network_name = self.pexpect.match.groups()[0].decode('utf-8')
|
|
else:
|
|
break
|
|
return network_name
|
|
|
|
def set_network_name(self, network_name):
|
|
cmd = 'networkname ' + network_name
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_panid(self):
|
|
self.send_command('panid')
|
|
i = self.pexpect.expect('([0-9a-fA-F]{4})')
|
|
if i == 0:
|
|
panid = int(self.pexpect.match.groups()[0], 16)
|
|
self.pexpect.expect('Done')
|
|
return panid
|
|
|
|
def set_panid(self, panid):
|
|
cmd = 'panid %d' % panid
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_partition_id(self):
|
|
self.send_command('leaderpartitionid')
|
|
i = self.pexpect.expect('(\d+)\r\n')
|
|
if i == 0:
|
|
weight = self.pexpect.match.groups()[0]
|
|
self.pexpect.expect('Done')
|
|
return weight
|
|
|
|
def set_partition_id(self, partition_id):
|
|
cmd = 'leaderpartitionid %d' % partition_id
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def set_router_upgrade_threshold(self, threshold):
|
|
cmd = 'routerupgradethreshold %d' % threshold
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def set_router_downgrade_threshold(self, threshold):
|
|
cmd = 'routerdowngradethreshold %d' % threshold
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def release_router_id(self, router_id):
|
|
cmd = 'releaserouterid %d' % router_id
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_state(self):
|
|
states = ['detached', 'child', 'router', 'leader']
|
|
self.send_command('state')
|
|
match = self.pexpect.expect(states)
|
|
self.pexpect.expect('Done')
|
|
return states[match]
|
|
|
|
def set_state(self, state):
|
|
cmd = 'state ' + state
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_timeout(self):
|
|
self.send_command('childtimeout')
|
|
i = self.pexpect.expect('(\d+)\r\n')
|
|
if i == 0:
|
|
timeout = self.pexpect.match.groups()[0]
|
|
self.pexpect.expect('Done')
|
|
return timeout
|
|
|
|
def set_timeout(self, timeout):
|
|
cmd = 'childtimeout %d' % timeout
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def set_max_children(self, number):
|
|
cmd = 'childmax %d' % number
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_weight(self):
|
|
self.send_command('leaderweight')
|
|
i = self.pexpect.expect('(\d+)\r\n')
|
|
if i == 0:
|
|
weight = self.pexpect.match.groups()[0]
|
|
self.pexpect.expect('Done')
|
|
return weight
|
|
|
|
def set_weight(self, weight):
|
|
cmd = 'leaderweight %d' % weight
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def add_ipaddr(self, ipaddr):
|
|
cmd = 'ipaddr add ' + ipaddr
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def get_addrs(self):
|
|
addrs = []
|
|
self.send_command('ipaddr')
|
|
|
|
while True:
|
|
i = self.pexpect.expect(['(\S+:\S+)\r\n', 'Done'])
|
|
if i == 0:
|
|
addrs.append(self.pexpect.match.groups()[0].decode("utf-8"))
|
|
elif i == 1:
|
|
break
|
|
|
|
return addrs
|
|
|
|
def get_context_reuse_delay(self):
|
|
self.send_command('contextreusedelay')
|
|
i = self.pexpect.expect('(\d+)\r\n')
|
|
if i == 0:
|
|
timeout = self.pexpect.match.groups()[0]
|
|
self.pexpect.expect('Done')
|
|
return timeout
|
|
|
|
def set_context_reuse_delay(self, delay):
|
|
cmd = 'contextreusedelay %d' % delay
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def add_prefix(self, prefix, flags, prf = 'med'):
|
|
cmd = 'prefix add ' + prefix + ' ' + flags + ' ' + prf
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def remove_prefix(self, prefix):
|
|
cmd = 'prefix remove ' + prefix
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def add_route(self, prefix, prf = 'med'):
|
|
cmd = 'route add ' + prefix + ' ' + prf
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def remove_route(self, prefix):
|
|
cmd = 'route remove ' + prefix
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def register_netdata(self):
|
|
self.send_command('netdataregister')
|
|
self.pexpect.expect('Done')
|
|
|
|
def energy_scan(self, mask, count, period, scan_duration, ipaddr):
|
|
cmd = 'commissioner energy ' + str(mask) + ' ' + str(count) + ' ' + str(period) + ' ' + str(scan_duration) + ' ' + ipaddr
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Energy:', timeout=8)
|
|
|
|
def panid_query(self, panid, mask, ipaddr):
|
|
cmd = 'commissioner panid ' + str(panid) + ' ' + str(mask) + ' ' + ipaddr
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Conflict:', timeout=8)
|
|
|
|
def scan(self):
|
|
self.send_command('scan')
|
|
|
|
results = []
|
|
while True:
|
|
i = self.pexpect.expect(['\|\s(\S+)\s+\|\s(\S+)\s+\|\s([0-9a-fA-F]{4})\s\|\s([0-9a-fA-F]{16})\s\|\s(\d+)\r\n',
|
|
'Done'])
|
|
if i == 0:
|
|
results.append(self.pexpect.match.groups())
|
|
else:
|
|
break
|
|
|
|
return results
|
|
|
|
def ping(self, ipaddr, num_responses=1, size=None, timeout=5000):
|
|
cmd = 'ping ' + ipaddr
|
|
if size != None:
|
|
cmd += ' ' + str(size)
|
|
|
|
self.send_command(cmd)
|
|
|
|
result = True
|
|
try:
|
|
responders = {}
|
|
while len(responders) < num_responses:
|
|
i = self.pexpect.expect(['from (\S+):'])
|
|
if i == 0:
|
|
responders[self.pexpect.match.groups()[0]] = 1
|
|
self.pexpect.expect('\n')
|
|
except pexpect.TIMEOUT:
|
|
result = False
|
|
|
|
return result
|
|
|
|
def reset(self):
|
|
self.send_command('reset')
|
|
|
|
def set_router_selection_jitter(self, jitter):
|
|
cmd = 'routerselectionjitter %d' % jitter
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def set_active_dataset(self, timestamp, panid=None, channel=None, channel_mask=None, master_key=None):
|
|
self.send_command('dataset clear')
|
|
self.pexpect.expect('Done')
|
|
|
|
cmd = 'dataset activetimestamp %d' % timestamp
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
if panid != None:
|
|
cmd = 'dataset panid %d' % panid
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
if channel != None:
|
|
cmd = 'dataset channel %d' % channel
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
if channel_mask != None:
|
|
cmd = 'dataset channelmask %d' % channel_mask
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
if master_key != None:
|
|
cmd = 'dataset masterkey ' + master_key
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
self.send_command('dataset commit active')
|
|
self.pexpect.expect('Done')
|
|
|
|
def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None):
|
|
self.send_command('dataset clear')
|
|
self.pexpect.expect('Done')
|
|
|
|
cmd = 'dataset pendingtimestamp %d' % pendingtimestamp
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
cmd = 'dataset activetimestamp %d' % activetimestamp
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
if panid != None:
|
|
cmd = 'dataset panid %d' % panid
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
if channel != None:
|
|
cmd = 'dataset channel %d' % channel
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
self.send_command('dataset commit pending')
|
|
self.pexpect.expect('Done')
|
|
|
|
def announce_begin(self, mask, count, period, ipaddr):
|
|
cmd = 'commissioner announce ' + str(mask) + ' ' + str(count) + ' ' + str(period) + ' ' + ipaddr
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def send_mgmt_active_set(self, active_timestamp=None, channel=None, channel_mask=None, extended_panid=None,
|
|
panid=None, master_key=None, mesh_local=None, network_name=None, binary=None):
|
|
cmd = 'dataset mgmtsetcommand active '
|
|
|
|
if active_timestamp != None:
|
|
cmd += 'activetimestamp %d ' % active_timestamp
|
|
|
|
if channel != None:
|
|
cmd += 'channel %d ' % channel
|
|
|
|
if channel_mask != None:
|
|
cmd += 'channelmask %d ' % channel_mask
|
|
|
|
if extended_panid != None:
|
|
cmd += 'extpanid ' + extended_panid + ' '
|
|
|
|
if panid != None:
|
|
cmd += 'panid %d ' % panid
|
|
|
|
if master_key != None:
|
|
cmd += 'masterkey ' + master_key + ' '
|
|
|
|
if mesh_local != None:
|
|
cmd += 'localprefix ' + mesh_local + ' '
|
|
|
|
if network_name != None:
|
|
cmd += 'networkname ' + network_name + ' '
|
|
|
|
if binary != None:
|
|
cmd += 'binary ' + binary + ' '
|
|
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|
|
|
|
def send_mgmt_pending_set(self, pending_timestamp=None, active_timestamp=None, delay_timer=None, channel=None,
|
|
panid=None, master_key=None, mesh_local=None, network_name=None):
|
|
cmd = 'dataset mgmtsetcommand pending '
|
|
|
|
if pending_timestamp != None:
|
|
cmd += 'pendingtimestamp %d ' % pending_timestamp
|
|
|
|
if active_timestamp != None:
|
|
cmd += 'activetimestamp %d ' % active_timestamp
|
|
|
|
if delay_timer != None:
|
|
cmd += 'delaytimer %d ' % delay_timer
|
|
|
|
if channel != None:
|
|
cmd += 'channel %d ' % channel
|
|
|
|
if panid != None:
|
|
cmd += 'panid %d ' % panid
|
|
|
|
if master_key != None:
|
|
cmd += 'masterkey ' + master_key + ' '
|
|
|
|
if mesh_local != None:
|
|
cmd += 'localprefix ' + mesh_local + ' '
|
|
|
|
if network_name != None:
|
|
cmd += 'networkname ' + network_name + ' '
|
|
|
|
self.send_command(cmd)
|
|
self.pexpect.expect('Done')
|