[otci] add OpenThread Controller Interface library (#5779)

This commit introduces the Proof of Concept implementation of the
OpenThread Controller Interface (OTCI) which can be used to connect
and manage various kinds of OT devices.

This commit also adds the tests to verify that OTCI works for both
simulation and real OT devices.

- Device Connectors
  - CLI Virtual Time Simulation
  - CLI Real Time Simulation
  - CLI SOC Device
  - NCP Virtual Time Simulation (not all APIs work)
  - Real device via SSH
- OTCI tests of simulation devices are executed in Github Actions
- OTCI tests of real devices can be executed like below:

# Install otci python library
cd tests/scripts/thread-cert/otci && python3 setup.py install --user
# Test OT CLI SOC device at /dev/ttyACM0
REAL_DEVICE=1 OT_CLI_SERIAL=/dev/ttyACM0 python3 tests/test_otci.py
# Test OTBR device via SSH
REAL_DEVICE=1 OTBR_SSH=172.16.243.151 python3 tests/test_otci.py
This commit is contained in:
Simon Lin
2020-12-31 03:29:27 +08:00
committed by GitHub
parent 069e26b51e
commit 22b043bf62
14 changed files with 3327 additions and 0 deletions
+70
View File
@@ -0,0 +1,70 @@
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
name: OTCI
on: [push, pull_request]
jobs:
cancel-previous-runs:
runs-on: ubuntu-20.04
steps:
- uses: rokroskar/workflow-run-cleanup-action@master
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
if: "github.ref != 'refs/heads/master'"
cli-sim:
runs-on: ubuntu-20.04
strategy:
matrix:
virtual_time: [0, 1]
env:
REFERENCE_DEVICE: 1
VIRTUAL_TIME: ${{ matrix.virtual_time }}
REAL_DEVICE: 0
steps:
- uses: actions/checkout@v2
- name: Bootstrap
run: |
sudo rm /etc/apt/sources.list.d/* && sudo apt-get update
sudo apt-get --no-install-recommends install -y g++-multilib python3-setuptools python3-wheel
python3 -m pip install -r tests/scripts/thread-cert/requirements.txt
- name: Build
run: |
./bootstrap
make -f examples/Makefile-simulation THREAD_VERSION=1.2 DUA=1 MLR=1 BACKBONE_ROUTER=1 CSL_RECEIVER=1
- name: Install OTCI Python Library
run: |
(cd tools/otci && python3 setup.py install --user)
- name: Run
run: |
export PYTHONPATH=./tests/scripts/thread-cert/
export OT_CLI=./output/simulation/bin/ot-cli-ftd
python3 tools/otci/tests/test_otci.py
+2
View File
@@ -147,6 +147,8 @@ class VirtualTime(BaseSimulator):
RADIO_ONLY = os.getenv('RADIO_DEVICE') is not None
NCP_SIM = os.getenv('NODE_TYPE', 'sim') == 'ncp-sim'
_message_factory = None
def __init__(self, use_message_factory=True):
super(VirtualTime, self).__init__()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+25
View File
@@ -0,0 +1,25 @@
Copyright (c) 2020, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
+53
View File
@@ -0,0 +1,53 @@
# OpenThread Control Interface
The OpenThread Control Interface (OTCI) is a library which provides uniform python interfaces to connect and control various kinds of devices running OpenThread.
## Supported device types
- OpenThread CLI
- SOC device via Serial
- OpenThread NCP (limited support via [pyspinel](https://pypi.org/project/pyspinel/))
- SOC device via Serial
- [OpenThread Border Router](https://github.com/openthread/ot-br-posix)
- OTBR device via SSH
## Example
```python
import otci
# Connect to an OTBR device via SSH
node1 = otci.connect_otbr_ssh("192.168.1.101")
# Connect to an OpenThread CLI device via Serial
node2 = otci.connect_cli_serial("/dev/ttyACM0"))
# Start node1 to become Leader
node1.dataset_init_buffer()
node1.dataset_set_buffer(network_name='test', master_key='00112233445566778899aabbccddeeff', panid=0xface, channel=11)
node1.dataset_commit_buffer('active')
node1.ifconfig_up()
node1.thread_start()
node1.wait(5)
assert node1.get_state() == "leader"
# Start Commissioner on node1
node1.commissioner_start()
node1.wait(3)
node1.commissioner_add_joiner("TEST123",eui64='*')
# Start node2
node2.ifconfig_up()
node2.set_router_selection_jitter(1)
# Start Joiner on node2 to join the network
node2.joiner_start("TEST123")
node2.wait(10, expect_line="Join success")
# Wait for node 2 to become Router
node2.thread_start()
node2.wait(5)
assert node2.get_state() == "router"
```
+49
View File
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from . import errors
from .constants import THREAD_VERSION_1_1, THREAD_VERSION_1_2
from .otci import OTCI
from .otci import \
connect_cli_sim, \
connect_cli_serial, \
connect_ncp_sim, \
connect_cmd_handler, \
connect_otbr_ssh
from .types import Rloc16, ChildId
_connectors = [
'connect_cli_sim',
'connect_cli_serial',
'connect_ncp_sim',
'connect_otbr_ssh',
'connect_cmd_handler',
]
__all__ = ['OTCI', 'errors', 'Rloc16', 'ChildId', 'THREAD_VERSION_1_1', 'THREAD_VERSION_1_2'] + _connectors
+211
View File
@@ -0,0 +1,211 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import queue
import re
import threading
import time
from abc import abstractmethod
from typing import Union, List, Pattern
from .connectors import OtCliHandler
from .errors import ExpectLineTimeoutError, CommandError
from .utils import match_line
class OTCommandHandler:
"""This abstract class defines interfaces of a OT Command Handler."""
@abstractmethod
def execute_command(self, cmd: str, timeout: float) -> List[str]:
"""Method execute_command should execute the OT CLI command within a timeout (in seconds) and return the
command output as a list of lines.
Note: each line SHOULD NOT contain '\r\n' at the end. The last line of output should be 'Done' or
'Error <code>: <msg>' following OT CLI conventions.
"""
pass
@abstractmethod
def close(self):
"""Method close should close the OT Command Handler."""
pass
@abstractmethod
def wait(self, duration: float) -> List[str]:
"""Method wait should wait for a given duration and return the OT CLI output during this period.
Normally, OT CLI does not output when it's not executing any command. But OT CLI can also output
asynchronously in some cases (e.g. `Join Success` when Joiner joins successfully).
"""
pass
class OtCliCommandRunner(OTCommandHandler):
__PATTERN_COMMAND_DONE_OR_ERROR = re.compile(
r'(Done|Error|Error \d+:.*|.*: command not found)$') # "Error" for spinel-cli.py
__PATTERN_LOG_LINE = re.compile(r'((\[(NONE|CRIT|WARN|NOTE|INFO|DEBG)\])'
r'|(-.*-+: )' # e.g. -CLI-----:
r')')
"""regex used to filter logs"""
__ASYNC_COMMANDS = {
'scan',
}
def __init__(self, otcli: OtCliHandler, is_spinel_cli=False):
self.__otcli: OtCliHandler = otcli
self.__is_spinel_cli = is_spinel_cli
self.__expect_command_echoback = not self.__is_spinel_cli
self.__pending_lines = queue.Queue()
self.__should_close = threading.Event()
self.__otcli_reader = threading.Thread(target=self.__otcli_read_routine)
self.__otcli_reader.setDaemon(True)
self.__otcli_reader.start()
def __repr__(self):
return repr(self.__otcli)
def execute_command(self, cmd, timeout=10) -> None:
self.__otcli.writeline(cmd)
if cmd in {'reset', 'factoryreset'}:
return []
if self.__expect_command_echoback:
self.__expect_line(timeout, cmd)
output = self.__expect_line(timeout,
OtCliCommandRunner.__PATTERN_COMMAND_DONE_OR_ERROR,
asynchronous=cmd.split()[0] in OtCliCommandRunner.__ASYNC_COMMANDS)
return output
def wait(self, duration: float) -> List[str]:
self.__otcli.wait(duration)
output = []
try:
while True:
line = self.__pending_lines.get_nowait()
output.append(line)
except queue.Empty:
pass
return output
def close(self):
self.__should_close.set()
self.__otcli.close()
#
# Private methods
#
def __expect_line(self, timeout: float, expect_line: Union[str, Pattern], asynchronous=False) -> List[str]:
output = []
if not asynchronous:
while True:
try:
line = self.__pending_lines.get(timeout=timeout)
except queue.Empty:
raise ExpectLineTimeoutError(expect_line)
output.append(line)
if match_line(line, expect_line):
break
else:
done = False
while not done and timeout > 0:
lines = self.wait(1)
timeout -= 1
for line in lines:
output.append(line)
if match_line(line, expect_line):
done = True
break
if not done:
raise ExpectLineTimeoutError(expect_line)
return output
def __otcli_read_routine(self):
while not self.__should_close.isSet():
line = self.__otcli.readline()
if line.startswith('> '):
line = line[2:]
logging.debug('%s: %s', self.__otcli, line)
if not OtCliCommandRunner.__PATTERN_LOG_LINE.match(line):
self.__pending_lines.put(line)
class OtbrSshCommandRunner(OTCommandHandler):
def __init__(self, host, port, username, password):
import paramiko
self.__host = host
self.__port = port
self.__ssh = paramiko.SSHClient()
self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.__ssh.connect(host,
port=port,
username=username,
password=password,
allow_agent=False,
look_for_keys=False)
def __repr__(self):
return f'{self.__host}:{self.__port}'
def execute_command(self, cmd: str, timeout: float) -> List[str]:
sh_cmd = f'sudo ot-ctl "{cmd}"'
cmd_in, cmd_out, cmd_err = self.__ssh.exec_command(sh_cmd, timeout=int(timeout), bufsize=1024)
err = cmd_err.read().decode('utf-8')
if err:
raise CommandError(cmd, [err])
output = [l.rstrip('\r\n') for l in cmd_out.readlines()]
return output
def close(self):
self.__ssh.close()
def wait(self, duration: float) -> List[str]:
time.sleep(duration)
return []
+161
View File
@@ -0,0 +1,161 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import subprocess
import time
from abc import abstractmethod
class OtCliHandler:
"""This abstract class defines interfaces for a OT CLI Handler."""
@abstractmethod
def readline(self) -> str:
"""Method readline should return the next line read from OT CLI."""
pass
@abstractmethod
def writeline(self, s: str) -> None:
"""Method writeline should write a line to the OT CLI.
It should block until all characters are written to OT CLI.
"""
pass
@abstractmethod
def wait(self, duration: float) -> None:
"""Method wait should wait for a given duration.
A normal implementation should just call `time.sleep(duration)`. This is intended for proceeding Virtual Time
Simulation instances.
"""
pass
@abstractmethod
def close(self) -> None:
"""Method close should close the OT CLI Handler."""
pass
class Simulator:
"""This abstract class defines interfaces for a Virtual Time Simulator."""
@abstractmethod
def go(self, duration: float):
"""Proceed the simulator for a given duration (in seconds)."""
pass
class OtCliPopen(OtCliHandler):
"""Connector for OT CLI process (a Popen instance)."""
def __init__(self, proc: subprocess.Popen, nodeid: int, simulator: Simulator):
self.__otcli_proc = proc
self.__nodeid = nodeid
self.__simulator = simulator
def __repr__(self):
return 'OTCli<%d>' % self.__nodeid
def readline(self) -> str:
return self.__otcli_proc.stdout.readline().rstrip('\r\n')
def writeline(self, s: str):
self.__otcli_proc.stdin.write(s + '\n')
self.__otcli_proc.stdin.flush()
def wait(self, duration: float):
if self.__simulator is not None:
# Virtual time simulation
self.__simulator.go(duration)
else:
# Real time simulation
time.sleep(duration)
def close(self):
self.__otcli_proc.stdin.close()
self.__otcli_proc.stdout.close()
self.__otcli_proc.wait()
class OtCliSim(OtCliPopen):
"""Connector for OT CLI Simulation instances."""
def __init__(self, executable: str, nodeid: int, simulator: Simulator):
logging.info('%s: executable=%s', self.__class__.__name__, executable)
proc = subprocess.Popen(args=[executable, str(nodeid)],
executable=executable,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
encoding='utf-8',
bufsize=1024)
super().__init__(proc, nodeid, simulator)
class OtNcpSim(OtCliHandler):
"""Connector for OT NCP Simulation instances."""
def __init__(self, executable: str, nodeid: int, simulator: Simulator):
logging.info('%s: executable=%s', self.__class__.__name__, executable)
proc = subprocess.Popen(args=f'spinel-cli.py -p "{executable}" -n {nodeid} 2>&1',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
encoding='utf-8',
bufsize=1024,
shell=True)
super().__init__(proc, nodeid, simulator)
class OtCliSerial(OtCliHandler):
"""Connector for OT CLI SOC devices via Serial."""
def __init__(self, dev: str, baudrate: int):
self.__dev = dev
self.__baudrate = baudrate
import serial
self.__serial = serial.Serial(self.__dev, self.__baudrate, timeout=None, exclusive=True)
def __repr__(self):
return self.__dev
def readline(self) -> str:
line = self.__serial.readline().decode('utf-8').rstrip('\r\n')
return line
def writeline(self, s: str):
self.__serial.write((s + '\n').encode('utf-8'))
def wait(self, duration: float):
time.sleep(duration)
def close(self):
self.__serial.close()
+32
View File
@@ -0,0 +1,32 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Thread versions
THREAD_VERSION_1_1 = 2
THREAD_VERSION_1_2 = 3
+64
View File
@@ -0,0 +1,64 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from typing import List
class OTCIError(Exception):
"""Base class for OTCI Errors."""
pass
class ExpectLineTimeoutError(OTCIError):
"""OTCI failed to find an expected line before timeout."""
def __init__(self, line):
super(ExpectLineTimeoutError, self).__init__("Expected line %r, but timed out" % line)
class CommandError(OTCIError):
"""OTCI failed to execute a command."""
def __init__(self, cmd: str, output: List[str]):
self.__output = output
super(CommandError, self).__init__("Command error while executing %r:\n%s\n" % (cmd, '\n'.join(output)))
def error(self) -> str:
return self.__output[-1]
class UnexpectedCommandOutput(OTCIError):
"""OTCI got unexpected command output."""
def __init__(self, output: List[str]):
super(UnexpectedCommandOutput, self).__init__("Unexpected command output:\n%s\n" % '\n'.join(output))
class InvalidArgumentsError(OTCIError):
"""Invalid arguments."""
pass
File diff suppressed because it is too large Load Diff
+131
View File
@@ -0,0 +1,131 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import ipaddress
from collections import namedtuple
class ChildId(int):
"""Represents a Child ID."""
pass
class RouterId(int):
"""Represents a Router ID."""
pass
class Rloc16(int):
"""Represents a RLOC16."""
def __repr__(self):
return hex(self)
class PartitionId(int):
"""Represents a Thread Network Partition ID."""
pass
class DeviceMode(str):
"""Represents a device mode."""
def __new__(cls, o: str):
ins = str.__new__(cls, o)
if ins != '-':
for c in ins:
if c not in 'rdn':
raise ValueError(o)
# check for empty mode (SED should use "-")
if not ins:
raise ValueError(o)
# check for duplicate chars
if len(ins) != len(set(ins)):
raise ValueError(o)
return ins
class ThreadState(str):
"""Represents a Thread state."""
_VALID_VALUES = {'disabled', 'detached', 'child', 'router', 'leader'}
def __new__(cls, o: str):
ins = str.__new__(cls, o)
if ins not in ThreadState._VALID_VALUES:
raise ValueError(o)
return ins
class Ip6Addr(ipaddress.IPv6Address):
"""Represents an IPv6 address."""
def __eq__(self, other):
if isinstance(other, str):
other = ipaddress.IPv6Address(other)
return super().__eq__(other)
def __repr__(self):
return self.compressed
def __hash__(self):
return super().__hash__()
class Ip6Prefix(ipaddress.IPv6Network):
"""Represents an IPv6 prefix."""
def __eq__(self, other):
if isinstance(other, str):
other = ipaddress.IPv6Network(other)
return super().__eq__(other)
def __repr__(self):
return self.compressed
def __hash__(self):
return super().__hash__()
SecurityPolicy = namedtuple('SecurityPolicy', ['rotation_time', 'flags'])
"""Represents a Security Policy configuration."""
if __name__ == '__main__':
assert Ip6Addr('2001:0:0:0:0:0:0:1') == '2001::1'
assert repr(Ip6Addr('2001:0:0:0:0:0:0:1')) == '2001::1'
assert str(Ip6Addr('2001:0:0:0:0:0:0:1')) == '2001::1'
assert Ip6Prefix('2001:0:0:0::/64') == '2001::/64'
assert repr(Ip6Prefix('2001:0:0:0::/64')) == '2001::/64'
assert str(Ip6Prefix('2001:0:0:0::/64')) == '2001::/64'
+63
View File
@@ -0,0 +1,63 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import functools
from typing import Union, Collection, Any, Pattern
def match_line(line: str, expect_line: Union[str, Pattern, Collection[Any]]) -> bool:
"""Checks if a line is expected (matched by one of the given patterns)."""
if isinstance(expect_line, Pattern):
match = expect_line.match(line)
elif isinstance(expect_line, str):
match = (line == expect_line)
else:
match = any(match_line(line, x) for x in expect_line)
return match
def cached(func):
"""Decorator cached makes the function to cache its result and return it in duplicate calls."""
prop_name = '__cached_' + func.__name__
@functools.wraps(func)
def _cached_func(self):
try:
return getattr(self, prop_name)
except AttributeError:
val = func(self)
setattr(self, prop_name, val)
return val
return _cached_func
def constant_property(func):
"""A constant property is a property that only evaluated once."""
return property(cached(func))
+50
View File
@@ -0,0 +1,50 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="otci-openthread",
version="0.0.1",
author="The OpenThread Authors",
description="OpenThread Controller Interface",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/openthread/openthread",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: BSD 3-Clause License",
"Operating System :: OS Independent",
],
python_requires='>=3.6',
install_requires=['pySerial', 'paramiko', 'pyspinel'],
)
+515
View File
@@ -0,0 +1,515 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import json
import logging
import os
import subprocess
import unittest
import otci
from otci.errors import CommandError
logging.basicConfig(level=logging.INFO)
TEST_CHANNEL = 22
TEST_NETWORK_NAME = 'OT CI'
TEST_PANID = 0xeeee
TEST_MASTERKEY = 'ffeeddccbbaa99887766554433221100'
REAL_DEVICE = int(os.getenv('REAL_DEVICE', '0'))
class TestOTCI(unittest.TestCase):
def testCliRealDevice(self):
if not REAL_DEVICE:
self.skipTest('not for virtual device')
if os.getenv('OTBR_SSH'):
node = otci.connect_otbr_ssh(os.getenv('OTBR_SSH'))
elif os.getenv('OT_CLI_SERIAL'):
node = otci.connect_cli_serial(os.getenv('OT_CLI_SERIAL'))
else:
self.fail("Please set OT_CLI_SERIAL or OTBR_SSH to test the real device.")
node.factory_reset()
self._test_otci_single_node(node)
def testCliSimRealTime(self):
if REAL_DEVICE:
self.skipTest('not for real device')
subprocess.check_call('rm -rf tmp/', shell=True)
VIRTUAL_TIME = int(os.getenv('VIRTUAL_TIME', "1"))
import simulator
if VIRTUAL_TIME:
sim = simulator.VirtualTime(use_message_factory=False)
else:
sim = None
if os.getenv('OT_CLI'):
executable = os.getenv('OT_CLI')
connector = otci.connect_cli_sim
elif os.getenv('OT_NCP'):
executable = os.getenv('OT_NCP')
connector = otci.connect_ncp_sim
else:
self.fail("Please set OT_CLI to test virtual device")
node1 = connector(executable, 1, simulator=sim)
self._test_otci_single_node(node1)
node1.factory_reset()
node2 = connector(executable, 2, simulator=sim)
node3 = connector(executable, 3, simulator=sim)
node4 = connector(executable, 4, simulator=sim)
self._test_otci_example(node1, node2)
node1.factory_reset()
node2.factory_reset()
self._test_otci_multi_nodes(node1, node2, node3, node4)
def _test_otci_single_node(self, leader):
logging.info('leader version: %r', leader.version)
logging.info('leader thread version: %r', leader.thread_version)
logging.info('API version: %r', leader.api_version)
logging.info('log level: %r', leader.get_log_level())
leader.enable_promiscuous()
self.assertTrue(leader.get_promiscuous())
leader.disable_promiscuous()
self.assertFalse(leader.get_promiscuous())
try:
logging.info("RCP version: %r", leader.get_rcp_version())
except CommandError:
pass
self.assertTrue(leader.get_router_eligible())
leader.disable_router_eligible()
self.assertFalse(leader.get_router_eligible())
leader.enable_router_eligible()
self.assertFalse(leader.get_ifconfig_state())
# ifconfig up
leader.ifconfig_up()
self.assertTrue(leader.get_ifconfig_state())
logging.info('leader eui64 = %r', leader.get_eui64())
logging.info('leader extpanid = %r', leader.get_extpanid())
logging.info('leader masterkey = %r', leader.get_master_key())
extaddr = leader.get_extaddr()
self.assertEqual(16, len(extaddr))
int(extaddr, 16)
new_extaddr = 'aabbccddeeff0011'
leader.set_extaddr(new_extaddr)
self.assertEqual(new_extaddr, leader.get_extaddr())
leader.set_network_name(TEST_NETWORK_NAME)
leader.set_master_key(TEST_MASTERKEY)
self.assertEqual(TEST_MASTERKEY, leader.get_master_key())
leader.set_panid(TEST_PANID)
self.assertEqual(TEST_PANID, leader.get_panid())
leader.set_channel(TEST_CHANNEL)
self.assertEqual(TEST_CHANNEL, leader.get_channel())
leader.set_network_name(TEST_NETWORK_NAME)
self.assertEqual(TEST_NETWORK_NAME, leader.get_network_name())
self.assertEqual('rdn', leader.get_mode())
leader.set_mode('-')
self.assertEqual('-', leader.get_mode())
leader.set_mode('rdn')
self.assertEqual('rdn', leader.get_mode())
logging.info('leader weight: %d', leader.get_leader_weight())
leader.set_leader_weight(72)
logging.info('domain name: %r', leader.get_domain_name())
leader.set_domain_name("DefaultDomain2")
self.assertEqual("DefaultDomain2", leader.get_domain_name())
self.assertEqual(leader.get_preferred_partition_id(), 0)
leader.set_preferred_partition_id(0xabcddead)
self.assertEqual(leader.get_preferred_partition_id(), 0xabcddead)
leader.thread_start()
leader.wait(5)
self.assertEqual('leader', leader.get_state())
self.assertEqual(0xabcddead, leader.get_leader_data()['partition_id'])
logging.info('leader key sequence counter = %d', leader.get_key_sequence_counter())
rloc16 = leader.get_rloc16()
leader_id = leader.get_router_id()
self.assertEqual(rloc16, leader_id << 10)
self.assertFalse(leader.get_child_list())
self.assertEqual({}, leader.get_child_table())
leader.enable_allowlist()
leader.add_allowlist(leader.get_extaddr())
leader.remove_allowlist(leader.get_extaddr())
leader.set_allowlist([leader.get_extaddr()])
leader.disable_allowlist()
leader.add_ipmaddr('ff04::1')
leader.del_ipmaddr('ff04::1')
leader.add_ipmaddr('ff04::2')
logging.info('leader ipmaddrs: %r', leader.get_ipmaddrs())
self.assertFalse(leader.has_ipmaddr('ff04::1'))
self.assertTrue(leader.has_ipmaddr('ff04::2'))
self.assertTrue(leader.get_ipaddr_rloc())
self.assertTrue(leader.get_ipaddr_linklocal())
self.assertTrue(leader.get_ipaddr_mleid())
leader.add_ipaddr('2001::1')
leader.del_ipaddr('2001::1')
leader.add_ipaddr('2001::2')
logging.info('leader ipaddrs: %r', leader.get_ipaddrs())
self.assertFalse(leader.has_ipaddr('2001::1'))
self.assertTrue(leader.has_ipaddr('2001::2'))
logging.info('leader bbr state: %r', leader.get_backbone_router_state())
bbr_config = leader.get_backbone_router_config()
logging.info('leader bbr config: %r', bbr_config)
logging.info('leader PBBR: %r', leader.get_primary_backbone_router_info())
leader.set_backbone_router_config(seqno=bbr_config['seqno'] + 1, delay=10, timeout=301)
self.assertEqual({
'seqno': bbr_config['seqno'] + 1,
'delay': 10,
'timeout': 301
}, leader.get_backbone_router_config())
leader.enable_backbone_router()
leader.wait(3)
logging.info('leader bbr state: %r', leader.get_backbone_router_state())
logging.info('leader bbr config: %r', leader.get_backbone_router_config())
logging.info('leader PBBR: %r', leader.get_primary_backbone_router_info())
logging.info('leader bufferinfo: %r', leader.get_message_buffer_info())
logging.info('child ipaddrs: %r', leader.get_child_ipaddrs())
logging.info('child ipmax: %r', leader.get_child_ip_max())
leader.set_child_ip_max(2)
self.assertEqual(2, leader.get_child_ip_max())
logging.info('childmax: %r', leader.get_max_children())
logging.info('counter names: %r', leader.counter_names)
for counter_name in leader.counter_names:
logging.info('counter %s: %r', counter_name, leader.get_counter(counter_name))
leader.reset_counter(counter_name)
self.assertTrue(all(x == 0 for x in leader.get_counter(counter_name).values()))
logging.info("CSL config: %r", leader.get_csl_config())
leader.config_csl(channel=13, period=100, timeout=200)
logging.info("CSL config: %r", leader.get_csl_config())
logging.info("EID-to-RLOC cache: %r", leader.get_eidcache())
logging.info("ipmaddr promiscuous: %r", leader.get_ipmaddr_promiscuous())
leader.enable_ipmaddr_promiscuous()
self.assertTrue(leader.get_ipmaddr_promiscuous())
leader.disable_ipmaddr_promiscuous()
self.assertFalse(leader.get_ipmaddr_promiscuous())
logging.info("leader data: %r", leader.get_leader_data())
logging.info("leader neighbor list: %r", leader.get_neighbor_list())
logging.info("leader neighbor table: %r", leader.get_neighbor_table())
logging.info("Leader external routes: %r", leader.get_routes())
leader.add_route('2002::/64')
leader.register_network_data()
logging.info("Leader external routes: %r", leader.get_routes())
self.assertEqual(1, len(leader.get_router_list()))
self.assertEqual(1, len(leader.get_router_table()))
logging.info("Leader router table: %r", leader.get_router_table())
logging.info('scan: %r', leader.scan())
logging.info('scan energy: %r', leader.scan_energy())
leader.add_service(44970, '112233', 'aabbcc')
leader.register_network_data()
leader.add_service(44971, b'\x11\x22\x33', b'\xaa\xbb\xcc\xdd')
leader.add_prefix("2001::/64")
logging.info("network data: %r", leader.get_network_data())
logging.info("network data raw: %r", leader.get_network_data_bytes())
logging.info('txpower %r', leader.get_txpower())
leader.set_txpower(-10)
self.assertEqual(-10, leader.get_txpower())
self.assertTrue(leader.is_singleton())
leader.coap_start()
leader.coap_set_test_resource_path('test')
leader.coap_test_set_resource_content('helloworld')
leader.coap_get(leader.get_ipaddr_rloc(), 'test')
leader.coap_put(leader.get_ipaddr_rloc(), 'test', 'con', 'xxx')
leader.coap_post(leader.get_ipaddr_rloc(), 'test', 'con', 'xxx')
leader.coap_delete(leader.get_ipaddr_rloc(), 'test', 'con', 'xxx')
leader.wait(1)
leader.coap_stop()
leader.udp_open()
leader.udp_bind("::", 1234)
leader.udp_send(leader.get_ipaddr_rloc(), 1234, text='hello')
leader.udp_send(leader.get_ipaddr_rloc(), 1234, random_bytes=3)
leader.udp_send(leader.get_ipaddr_rloc(), 1234, hex='112233')
leader.wait(1)
leader.udp_close()
logging.info('dataset: %r', leader.get_dataset())
logging.info('dataset active: %r', leader.get_dataset('active'))
leader.dataset_init_buffer()
leader.dataset_commit_buffer('pending')
leader.dataset_init_buffer(get_active_dataset=True)
leader.dataset_init_buffer(get_pending_dataset=True)
logging.info('dataset: %r', leader.get_dataset())
logging.info('dataset active: %r', leader.get_dataset('active'))
logging.info('dataset pending: %r', leader.get_dataset('pending'))
logging.info('dataset active -x: %r', leader.get_dataset_bytes('active'))
logging.info('dataset pending -x: %r', leader.get_dataset_bytes('pending'))
def _test_otci_example(self, node1, node2):
node1.dataset_init_buffer()
node1.dataset_set_buffer(network_name='test',
master_key='00112233445566778899aabbccddeeff',
panid=0xface,
channel=11)
node1.dataset_commit_buffer('active')
node1.ifconfig_up()
node1.thread_start()
node1.wait(5)
assert node1.get_state() == "leader"
node1.commissioner_start()
node1.wait(3)
node1.commissioner_add_joiner("TEST123", eui64='*')
node2.ifconfig_up()
node2.set_router_selection_jitter(1)
node2.joiner_start("TEST123")
node2.wait(10, expect_line="Join success")
node2.thread_start()
node2.wait(5)
assert node2.get_state() == "router"
def _test_otci_multi_nodes(self, leader, commissioner, child1, child2):
self.assertFalse(leader.get_ifconfig_state())
# ifconfig up
leader.ifconfig_up()
self.assertTrue(leader.get_ifconfig_state())
logging.info('leader eui64 = %r', leader.get_eui64())
logging.info('leader extpanid = %r', leader.get_extpanid())
logging.info('leader masterkey = %r', leader.get_master_key())
extaddr = leader.get_extaddr()
self.assertEqual(16, len(extaddr))
int(extaddr, 16)
new_extaddr = 'aabbccddeeff0011'
leader.set_extaddr(new_extaddr)
self.assertEqual(new_extaddr, leader.get_extaddr())
leader.set_network_name(TEST_NETWORK_NAME)
leader.set_master_key(TEST_MASTERKEY)
self.assertEqual(TEST_MASTERKEY, leader.get_master_key())
leader.set_panid(TEST_PANID)
self.assertEqual(TEST_PANID, leader.get_panid())
leader.set_channel(TEST_CHANNEL)
self.assertEqual(TEST_CHANNEL, leader.get_channel())
leader.set_network_name(TEST_NETWORK_NAME)
self.assertEqual(TEST_NETWORK_NAME, leader.get_network_name())
self.assertEqual('rdn', leader.get_mode())
leader.thread_start()
leader.wait(5)
self.assertEqual('leader', leader.get_state())
logging.info('leader key sequence counter = %d', leader.get_key_sequence_counter())
rloc16 = leader.get_rloc16()
leader_id = leader.get_router_id()
self.assertEqual(rloc16, leader_id << 10)
commissioner.ifconfig_up()
commissioner.set_channel(TEST_CHANNEL)
commissioner.set_panid(TEST_PANID)
commissioner.set_network_name(TEST_NETWORK_NAME)
commissioner.set_router_selection_jitter(1)
commissioner.set_master_key(TEST_MASTERKEY)
commissioner.thread_start()
commissioner.wait(5)
self.assertEqual('router', commissioner.get_state())
for dst_ip in leader.get_ipaddrs():
commissioner.ping(dst_ip, size=10, count=1, interval=2, hoplimit=3)
self.assertEqual('disabled', commissioner.get_commissioiner_state())
commissioner.commissioner_start()
commissioner.wait(5)
self.assertEqual('active', commissioner.get_commissioiner_state())
logging.info('commissioner.get_network_id_timeout() = %d', commissioner.get_network_id_timeout())
commissioner.set_network_id_timeout(60)
self.assertEqual(60, commissioner.get_network_id_timeout())
commissioner.commissioner_add_joiner('TEST123', eui64='*')
commissioner.wait(3)
child1.ifconfig_up()
logging.info("child1 scan: %r", child1.scan())
logging.info("child1 scan energy: %r", child1.scan_energy())
child1.set_mode('rn')
child1.set_router_selection_jitter(1)
child1.joiner_start('TEST123')
logging.info('joiner id = %r', child1.get_joiner_id())
child1.wait(10, expect_line="Join success")
child1.enable_allowlist()
child1.disable_allowlist()
child1.add_allowlist(commissioner.get_extaddr())
child1.remove_allowlist(commissioner.get_extaddr())
child1.set_allowlist([commissioner.get_extaddr()])
child1.thread_start()
child1.wait(3)
self.assertEqual('child', child1.get_state())
child1.thread_stop()
child1.set_mode('n')
child1.set_poll_period(1000)
self.assertEqual(1000, child1.get_poll_period())
child1.thread_start()
child1.wait(3)
self.assertEqual('child', child1.get_state())
child2.ifconfig_up()
child2.set_mode('rn')
child2.set_router_selection_jitter(1)
child2.joiner_start('TEST123')
logging.info('joiner id = %r', child2.get_joiner_id())
child2.wait(10, expect_line="Join success")
child2.enable_allowlist()
child2.disable_allowlist()
child2.add_allowlist(commissioner.get_extaddr())
child2.remove_allowlist(commissioner.get_extaddr())
child2.set_allowlist([commissioner.get_extaddr()])
child2.thread_start()
child2.wait(3)
self.assertEqual('child', child2.get_state())
child_table = commissioner.get_child_table()
logging.info('commissioiner child table: \n%s\n', json.dumps(child_table, indent=True))
child_list = commissioner.get_child_list()
logging.info('commissioiner child list: %r', child_list)
for child_id in child_list:
logging.info('child %s info: %r', child_id, commissioner.get_child_info(child_id))
logging.info('child1 info: %r', commissioner.get_child_info(child1.get_rloc16()))
logging.info('child2 info: %r', commissioner.get_child_info(child2.get_rloc16()))
self.assertEqual(set(commissioner.get_child_list()), set(commissioner.get_child_table().keys()))
child1.add_ipmaddr('ff04::1')
child1.del_ipmaddr('ff04::1')
child1.add_ipmaddr('ff04::2')
logging.info('child1 ipmaddrs: %r', child1.get_ipmaddrs())
self.assertFalse(child1.has_ipmaddr('ff04::1'))
self.assertTrue(child1.has_ipmaddr('ff04::2'))
child1.add_ipaddr('2001::1')
child1.del_ipaddr('2001::1')
child1.add_ipaddr('2001::2')
logging.info('child1 ipaddrs: %r', child1.get_ipaddrs())
self.assertFalse(child1.has_ipaddr('2001::1'))
self.assertTrue(child1.has_ipaddr('2001::2'))
logging.info('child ipaddrs: %r', commissioner.get_child_ipaddrs())
logging.info("EID-to-RLOC cache: %r", leader.get_eidcache())
logging.info("leader neighbor list: %r", leader.get_neighbor_list())
logging.info("leader neighbor table: %r", leader.get_neighbor_table())
logging.info("prefixes: %r", commissioner.get_prefixes())
commissioner.add_prefix('2001::/64')
commissioner.register_network_data()
commissioner.wait(1)
logging.info("prefixes: %r", commissioner.get_prefixes())
self.assertEqual(2, len(leader.get_router_list()))
self.assertEqual(2, len(leader.get_router_table()))
self.assertFalse(leader.is_singleton())
# Shutdown
leader.thread_stop()
logging.info("node state: %s", leader.get_state())
leader.ifconfig_down()
self.assertFalse(leader.get_ifconfig_state())
leader.close()
if __name__ == '__main__':
unittest.main()