[tcat] enable TCAT Commissioner to receive Alerts/TLV events over TLS and improve connection mgmt (#12011)

This enables the TCAT Commissioner to receive data such as TLS Alerts,
or asynchronously sent 'event' TLVs, over TLS.  Processing TLS Alert
is required to detect the sending of Alert by the TCAT Device, which
is a requirement to be verified in cert tests. An async background
process is started to receive and log the received events.

Also some minor improvements in connection state management: when
certain commands are given after the TCAT link is disconnected, or
when a TCAT link could not be established, a message will be printed
to clearly say it's disconnected, instead of a cryptic error. Error
messages are now clearly prefixed with 'Error:'.

The CA certificate store for CommCert3 is extended with an additional
CA certificate, so that it can be verified in cert tests that a TCAT
Device rejects a wrong Commissioner with a TLS Alert (previously this
couldn't be tested).

Also includes a fix of the pyproject.toml such that Poetry does not
display the long warning on installation.

Also includes an improvement of TLV displaying to the user with a
STRING field, if the value is a string.

Also includes some syntax fixes that were flagged by the IDE, such as
missing return types for methods, or member variables that were not
initialized in the __init__().
This commit is contained in:
Esko Dijk
2026-01-27 23:24:48 +01:00
committed by GitHub
parent 179f77021d
commit f121ebcffa
17 changed files with 626 additions and 278 deletions
+1 -1
View File
@@ -19,7 +19,7 @@ If pipx is not available, it can be installed for Linux/Windows/MacOS following
Then, install this project using Poetry:
```
poetry install --no-root
poetry install
```
This will install all the required modules to a virtual environment, which can be used by calling `poetry run <COMMAND>` from the project directory.
@@ -12,3 +12,17 @@ A1UdDgQWBBTgTKehLD14MpkRU+S6azYhYsAkKjAKBggqhkjOPQQDAgNIADBFAiA2
Wp9JGbwiqbW0l0fTS+AKdp6xFXkmuePftuUTsnMKcgIhAPdC1zdx8fHPoTnRLpiH
Pt2/QkcSashR9zOp9MrBnRPb
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIICOzCCAeGgAwIBAgIJAKOc2hehOGoBMAoGCCqGSM49BAMCMHExJjAkBgNVBAMM
HVRocmVhZCBDZXJ0aWZpY2F0aW9uIERldmljZUNBMRkwFwYDVQQKDBBUaHJlYWQg
R3JvdXAgSW5jMRIwEAYDVQQHDAlTYW4gUmFtb24xCzAJBgNVBAgMAkNBMQswCQYD
VQQGEwJVUzAeFw0yNDA1MDMyMDAyMThaFw00NDA0MjgyMDAyMThaMHExJjAkBgNV
BAMMHVRocmVhZCBDZXJ0aWZpY2F0aW9uIERldmljZUNBMRkwFwYDVQQKDBBUaHJl
YWQgR3JvdXAgSW5jMRIwEAYDVQQHDAlTYW4gUmFtb24xCzAJBgNVBAgMAkNBMQsw
CQYDVQQGEwJVUzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABGy850VBIPTkN3oL
x++zIUsZk2k26w4fuieFz9oNvjdb5W14+Yf3mvGWsl4NHyLxqhmamVAR4h7zWRlZ
0XyMVpKjYjBgMB4GA1UdEQQXMBWBE3RvbUB0aHJlYWRncm91cC5vcmcwDgYDVR0P
AQH/BAQDAgGGMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYEFF+rGyloiKHUtDGo
hmHn52ZZ7fgZMAoGCCqGSM49BAMCA0gAMEUCIQCTq1qjPZs9fAJB6ppTXs588Pnu
eVFOwC8bd//D99KiHAIgU84kwFHIyDvFqu6y+u1hFqBGsiuTmKwZ2PHhVe/xK1k=
-----END CERTIFICATE-----
+64 -56
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -29,24 +29,26 @@
import asyncio
import argparse
import logging
import os
from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \
BBTC_RX_CHAR_UUID
from ble.ble_stream import BleStream
from bleak import BLEDevice
from ble.ble_stream_secure import BleStreamSecure
from ble.udp_stream import UdpStream
from ble import ble_scanner
from cli.cli import CLI
from dataset.dataset import ThreadDataset
from cli.command import CommandResult
from utils import select_device_by_user_input, quit_with_reason
from tlv.tcat_tlv import TcatTLVType
from tlv.tlv import TLV
from utils import hexdump_ot, select_device_by_user_input, quit_with_reason
logger = logging.getLogger(__name__)
logged_modules = ['ble', 'cli', 'dataset', 'tlv', 'utils']
async def main():
logging.basicConfig(level=logging.WARNING)
log_level = logging.WARNING
logging.basicConfig(level=log_level)
parser = argparse.ArgumentParser(description='Device parameters')
parser.add_argument('-a', '--adapter', help='Select HCI adapter')
@@ -61,80 +63,86 @@ async def main():
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('ble.ble_stream').setLevel(logging.DEBUG)
logging.getLogger('ble.ble_stream_secure').setLevel(logging.DEBUG)
logging.getLogger('ble.udp_stream').setLevel(logging.DEBUG)
log_level = logging.DEBUG
elif args.info:
logger.setLevel(logging.INFO)
logging.getLogger('ble.ble_stream').setLevel(logging.INFO)
logging.getLogger('ble.ble_stream_secure').setLevel(logging.INFO)
logging.getLogger('ble.udp_stream').setLevel(logging.INFO)
log_level = logging.INFO
logger.setLevel(log_level)
for module in logged_modules:
logging.getLogger(module).setLevel(log_level)
is_debug = logger.getEffectiveLevel() <= logging.DEBUG
device = await get_device_by_args(args)
ble_sstream = None
if device is not None:
print(f'Connecting to {device}')
ble_sstream = BleStreamSecure(device)
ble_sstream.load_cert(
certfile=os.path.join(args.cert_path, 'commissioner_cert.pem'),
keyfile=os.path.join(args.cert_path, 'commissioner_key.pem'),
cafile=os.path.join(args.cert_path, 'ca_cert.pem'),
)
logger.info(f"Certificates and key loaded from '{args.cert_path}'")
print('Setting up secure TLS channel..', end='')
ok = False
try:
cb = None
if not is_debug:
cb = handshake_progress_bar
ok = await ble_sstream.do_handshake(progress_callback=cb)
except Exception as e:
logger.error(e)
if ok:
print('Done')
else:
print('Failed')
quit_with_reason('TLS handshake failure')
# create CLI and (if selected) connect to TCAT device
ds = ThreadDataset()
cli = CLI(ds, args, ble_sstream)
loop = asyncio.get_running_loop()
cli = CLI(ds, args)
if device is not None:
if not await cli.connect(device):
quit_with_reason('Failed to connect to TCAT device: TLS handshake failed.')
# Task 1: run a receiver that gets unsolicited event data or TLS Alerts from TLS server.
receiver_task = asyncio.create_task(receive_loop(cli.context))
# Task 2: run the CLI
print('Enter \'help\' to see available commands or \'exit\' to exit the application.')
loop = asyncio.get_running_loop()
while True:
user_input = await loop.run_in_executor(None, lambda: input('> '))
if user_input.lower() == 'exit':
break
try:
result: CommandResult = await cli.evaluate_input(user_input)
if result:
result.pretty_print()
result.pretty_print()
except Exception as e:
logger.error(e)
logger.debug(e, exc_info=True)
print('Disconnecting...')
if ble_sstream is not None:
await ble_sstream.close()
# Stop Task 1
receiver_task.cancel()
try:
await receiver_task
except asyncio.CancelledError:
# CancelledError is expected when awaiting the canceled task - not an error.
pass
# Disconnect from TCAT device (if still needed)
await cli.disconnect()
async def get_device_by_args(args):
async def receive_loop(cli_context: dict):
while True:
bless: BleStreamSecure = cli_context['ble_sstream']
if bless is not None:
data = await bless.recv_unsolicited_event()
if data:
logger.info('Received event data from TCAT Device:\n' + hexdump_ot("Event", data))
tlv = TLV.from_bytes(data)
validate_unsolicited_tlv(tlv)
continue
await asyncio.sleep(0.100)
def validate_unsolicited_tlv(tlv: TLV):
if tlv.type in [
TcatTLVType.APPLICATION_DATA_1.value, TcatTLVType.APPLICATION_DATA_2.value,
TcatTLVType.APPLICATION_DATA_3.value, TcatTLVType.APPLICATION_DATA_4.value
]:
num = tlv.type - TcatTLVType.APPLICATION_DATA_1.value + 1
logger.info(f" - Send Application Data {num} {hex(tlv.type)}")
elif tlv.type in [TcatTLVType.RESPONSE_EVENT.value]:
logger.info(f" - Response Event {hex(tlv.type)}")
else:
logger.error(f"Error: Illegal unsolicited TLV type sent by TCAT Device: {hex(tlv.type)}")
async def get_device_by_args(args) -> BLEDevice | UdpStream | None:
device = None
if args.mac:
device = await ble_scanner.find_first_by_mac(args.mac)
device = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
elif args.name:
device = await ble_scanner.find_first_by_name(args.name)
device = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
elif args.scan:
tcat_devices = await ble_scanner.scan_tcat_devices(adapter=args.adapter)
device = select_device_by_user_input(tcat_devices)
if device:
device = await BleStream.create(device, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
elif args.simulation:
device = UdpStream("127.0.0.1", int(args.simulation))
+9 -7
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -26,29 +26,31 @@
POSSIBILITY OF SUCH DAMAGE.
"""
from typing import Optional
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.uuids import normalize_uuid_str
from bbtc import BBTC_SERVICE_UUID
from typing import Optional
from ble.ble_connection_constants import BBTC_SERVICE_UUID
from ble.ble_advertisement_data import AdvertisedData
async def find_first_by_name(name):
async def find_first_by_name(name) -> BLEDevice:
match_name = lambda dev, adv_data: name == dev.name
device = await BleakScanner.find_device_by_filter(match_name)
return device
async def find_first_by_mac(mac):
async def find_first_by_mac(mac) -> BLEDevice:
match_mac = lambda dev, adv_data: mac.upper() == dev.address
device = await BleakScanner.find_device_by_filter(match_mac)
return device
async def scan_tcat_devices(adapter: Optional[str] = None):
async def scan_tcat_devices(adapter: Optional[str] = None) -> list[tuple[BLEDevice, Optional[AdvertisedData]]]:
scanner = BleakScanner()
tcat_devices: list[BLEDevice] = []
tcat_devices: list[tuple[BLEDevice, Optional[AdvertisedData]]] = []
service_uuids = [normalize_uuid_str(BBTC_SERVICE_UUID)]
discovered_devices = await scanner.discover(return_adv=True, service_uuids=service_uuids, adapter=adapter)
for _, (device, adv) in discovered_devices.items():
+7 -8
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2026, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -26,11 +26,10 @@
POSSIBILITY OF SUCH DAMAGE.
"""
import asyncio
from itertools import count, takewhile
from typing import Iterator, Union
import logging
import time
from asyncio import sleep
from typing import Iterator, Union
from bleak import BleakClient
from bleak.backends.device import BLEDevice
@@ -62,7 +61,7 @@ class BleStream:
def __handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
logger.debug(f'rx {len(data)} bytes')
self.__receive_buffer += data
self.__last_recv_time = time.time()
self.__last_recv_time = asyncio.get_running_loop().time()
@staticmethod
def __sliced(data: bytes, n: int) -> Iterator[bytes]:
@@ -84,12 +83,12 @@ class BleStream:
await self.client.write_gatt_char(rx_char, s)
return len(data)
async def recv(self, bufsize, recv_timeout=0.2):
async def recv(self, bufsize, recv_timeout=0.200):
if not self.__receive_buffer:
return b''
while time.time() - self.__last_recv_time <= recv_timeout:
await sleep(0.1)
while asyncio.get_running_loop().time() - self.__last_recv_time <= recv_timeout:
await asyncio.sleep(0.020)
data = self.__receive_buffer[:bufsize]
self.__receive_buffer = self.__receive_buffer[bufsize:]
+191 -45
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2026, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -30,11 +30,11 @@ import _ssl
import asyncio
import logging
import ssl
from time import time
from typing import Optional, Callable
from cryptography.x509 import load_der_x509_certificate
from cryptography.hazmat.primitives.serialization import (Encoding, PublicFormat)
from tlv.tlv import TLV
from tlv.tcat_tlv import TcatTLVType
import utils
@@ -53,6 +53,10 @@ class BleStreamSecure:
self.cert = ''
self.peer_challenge = None
self._peer_public_key = None
self._recv_lock = asyncio.Lock()
self._close_notify_sent = False
self._close_notify_received = False
self._async_events_queue = asyncio.Queue()
def load_cert(self, certfile='', keyfile='', cafile=''):
if certfile and keyfile:
@@ -65,7 +69,10 @@ class BleStreamSecure:
if cafile:
self.ssl_context.load_verify_locations(cafile=cafile)
async def do_handshake(self, timeout=30.0, progress_callback: Optional[Callable[[bool], None]] = None) -> bool:
async def do_handshake(self,
buffersize: int = 4096,
timeout: float = 30.0,
progress_callback: Optional[Callable[[bool], None]] = None) -> bool:
"""
Performs a TLS handshake with a TCAT Device, reporting progress via an optional callback.
@@ -79,6 +86,9 @@ class BleStreamSecure:
Returns:
True if the TLS handshake was successful, False otherwise.
"""
self._close_notify_sent = False
self._close_notify_received = False
self._peer_public_key = None
self.ssl_object = self.ssl_context.wrap_bio(
incoming=self.incoming,
outgoing=self.outgoing,
@@ -87,8 +97,8 @@ class BleStreamSecure:
)
try:
start = time()
while (time() - start) < timeout:
start = asyncio.get_running_loop().time()
while (asyncio.get_running_loop().time() - start) < timeout:
try:
if progress_callback:
progress_callback(False)
@@ -96,26 +106,26 @@ class BleStreamSecure:
break
# SSLWantWrite means ssl wants to send data over the link,
# but might need a receive first
# but might need to receive first
except ssl.SSLWantWriteError:
output = await self.stream.recv(4096)
if output:
self.incoming.write(output)
data = self.outgoing.read()
if data:
await self.stream.send(data)
await asyncio.sleep(0.02)
recv_data = await self.stream.recv(buffersize)
if recv_data:
self.incoming.write(recv_data)
send_data = self.outgoing.read()
if send_data:
await self.stream.send(send_data)
await asyncio.sleep(0.020)
# SSLWantRead means ssl wants to receive data from the link,
# but might need to send first
except ssl.SSLWantReadError:
data = self.outgoing.read()
if data:
await self.stream.send(data)
output = await self.stream.recv(4096)
if output:
self.incoming.write(output)
await asyncio.sleep(0.02)
send_data = self.outgoing.read()
if send_data:
await self.stream.send(send_data)
recv_data = await self.stream.recv(buffersize)
if recv_data:
self.incoming.write(recv_data)
await asyncio.sleep(0.020)
except ssl.SSLCertVerificationError as err:
if progress_callback:
@@ -151,50 +161,184 @@ class BleStreamSecure:
self.log_cert_identities()
return True
async def send(self, bytes):
logger.debug(f"tx {len(bytes)} bytes\n" + utils.hexdump_ot("Tx", bytes))
self.ssl_object.write(bytes)
encode = self.outgoing.read(4096)
await self.stream.send(encode)
# Precondition: caller must handle all exceptions raised
async def _send(self, data: bytes, buffersize: int = 4096) -> None:
hexdump_str = utils.hexdump_ot("Tx", data) if len(data) > 0 else ''
logger.debug(f"tx {len(data)} bytes\n{hexdump_str}")
self.ssl_object.write(data)
while self.outgoing.pending > 0:
encrypted_chunk = self.outgoing.read(buffersize)
await self.stream.send(encrypted_chunk)
async def recv(self, buffersize, timeout=1):
end_time = asyncio.get_event_loop().time() + timeout
# Precondition: must only be called by _recv()
async def _on_alert_from_peer(self):
self._close_notify_received = True
if not self._close_notify_sent:
logger.warning('TLS connection closed by peer.')
else:
logger.debug('TLS connection closed by local, close-notify alert received from peer.')
await self._close_tls_gracefully()
# Precondition: caller must acquire _recv_lock before calling this method
# Precondition: caller must handle all exceptions raised
async def _recv(self, buffersize: int = 4096, timeout: float = 0.0) -> bytes:
if self._close_notify_received and self._close_notify_sent:
return b''
slp_time = 0.020
end_time = asyncio.get_running_loop().time() + timeout
data = await self.stream.recv(buffersize)
while not data and asyncio.get_event_loop().time() < end_time:
await asyncio.sleep(0.1)
while not data and asyncio.get_running_loop().time() < end_time:
await asyncio.sleep(slp_time)
data = await self.stream.recv(buffersize)
if not data:
logger.warning('No response when response expected.')
return b''
self.incoming.write(data)
while True:
try:
decode = self.ssl_object.read(4096)
decode = self.ssl_object.read(buffersize)
break
# if recv called before entire message was received from the link
# if _recv called before entire message was received from the link
except ssl.SSLWantReadError:
more = await self.stream.recv(buffersize)
while not more:
await asyncio.sleep(0.1)
await asyncio.sleep(slp_time)
more = await self.stream.recv(buffersize)
self.incoming.write(more)
logger.debug(f"rx {len(decode)} bytes\n" + utils.hexdump_ot("Rx", decode))
hexdump_str = utils.hexdump_ot("Rx", decode) if len(decode) > 0 else ''
logger.debug(f"rx {len(decode)} bytes\n{hexdump_str}")
# ssl_object.read returns 0 bytes when peer side sent TLS Alert close/notify.
if len(decode) == 0:
await self._on_alert_from_peer()
return decode
async def send_with_resp(self, bytes):
await self.send(bytes)
res = await self.recv(buffersize=4096, timeout=5)
return res
async def send_with_resp(self, data: bytes, timeout: float = 5.0) -> bytes:
"""
Send data to the server over the secure TLS connection and wait for response data.
async def close(self):
if self.ssl_object.session is not None:
logger.debug('sending Disconnect command TLV')
data = TLV(TcatTLVType.DISCONNECT.value, bytes()).to_bytes()
self.peer_challenge = None
Args:
data: The data to send to the server.
timeout: The maximum time in seconds to wait for the response data. Defaults to 5.0 seconds.
Returns:
bytes: The received response data as bytes, or empty b'' if no data TLV/line was received
within the timeout.
"""
async with self._recv_lock:
# first receive any pending unsolicited events and store in FIFO queue
while True:
pend_data = await self._recv(timeout=0.0)
if len(pend_data) == 0:
break
await self._async_events_queue.put(pend_data)
# then send the data (command) and wait for the response (1 TLV or line)
await self._send(data)
res = await self._recv(timeout=timeout)
if len(res) == 0:
logger.error(f'No response when response TLV/line expected (timeout={timeout}s).')
return res
async def recv_unsolicited_event(self) -> bytes:
"""
Receive unsolicited event data, if any, from the server over the secure TLS connection.
This method is non-blocking and returns immediately if no unsolicited event data is available.
Events are returned in FIFO order of reception. To receive multiple events, call this method
repeatedly until no more data is available.
Returns:
bytes: The received event data as bytes, or empty b'' if no data is available.
"""
# dequeue the next event, if any
try:
data = self._async_events_queue.get_nowait()
return data
except asyncio.QueueEmpty:
pass
# when connected, receive any pending unsolicited events incoming from the peer (and queue these)
while self.is_connected:
async with self._recv_lock:
data = await self._recv(timeout=0.0)
if len(data) == 0:
break
await self._async_events_queue.put(data)
# dequeue the next event, if any
try:
data = self._async_events_queue.get_nowait()
return data
except asyncio.QueueEmpty:
return b''
async def close(self, timeout: float = 5.0):
try:
if self.is_connected:
logger.debug('sending Disconnect command TLV')
data = TLV(TcatTLVType.DISCONNECT.value, bytes()).to_bytes()
try:
await self._send(data)
except asyncio.CancelledError:
raise
except Exception as err:
logger.warning(f'Failed to send Disconnect command TLV: {err}')
logger.debug(err, exc_info=True)
async with self._recv_lock:
await self._close_tls_gracefully(timeout=timeout) # send out Alert after the command.
finally:
self._peer_public_key = None
await self.send(data)
self.peer_challenge = None
self.ssl_object = None
# Closes the TLS connection by repeated calls to unwrap() until it succeeds or times out.
# Precondition: must only be called by close() with _recv_lock acquired, or by _closed_by_peer()
async def _close_tls_gracefully(self, timeout: float = 5.0, buffersize: int = 4096):
try:
async with asyncio.timeout(timeout):
while True:
try:
self.ssl_object.unwrap()
send_data = self.outgoing.read() # send final Alert (if any)
if send_data:
await self.stream.send(send_data)
self._close_notify_sent = True
self._close_notify_received = True
break
except ssl.SSLWantReadError:
recv_data = await self.stream.recv(buffersize)
if recv_data:
self.incoming.write(recv_data)
else:
await asyncio.sleep(0.020) # small pause to allow asyncio.timeout to occur
except ssl.SSLWantWriteError:
send_data = self.outgoing.read()
if send_data:
await self.stream.send(send_data)
except asyncio.TimeoutError:
logger.warning(f'TLS closing procedure timed out (timeout={timeout} s).')
except Exception as err:
logger.warning(f'TLS closing procedure incomplete: {err}')
logger.debug(err, exc_info=True)
finally:
self.ssl_object = None
@property
def is_connected(self):
return not self._close_notify_sent and not self._close_notify_received and \
self._peer_public_key is not None and self.ssl_object is not None
@property
def peer_public_key(self):
@@ -215,7 +359,7 @@ class BleStreamSecure:
try:
cc = self.ssl_object._sslobj.get_unverified_chain()
if cc is None:
logger.info('No TCAT Device cert chain was received (yet).')
logger.warning('No TCAT Device cert chain was received (yet).')
return
logger.info(f'TCAT Device cert chain: {len(cc)} certificates received.')
for cert in cc:
@@ -224,5 +368,7 @@ class BleStreamSecure:
logger.info(f' base64: (paste in https://lapo.it/asn1js/ to decode)\n{peer_cert_der_hex}')
logger.info(f'TCAT Commissioner cert, PEM:\n{self.cert}')
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning('Could not display TCAT client cert info (check Python version is >= 3.10?)')
+7 -3
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -35,7 +35,7 @@ logger = logging.getLogger(__name__)
class UdpStream:
BASE_PORT = 10000
MAX_SERVER_TIMEOUT_SEC = 10
MAX_SERVER_TIMEOUT_SEC = 0.010
def __init__(self, address, node_id):
self.__receive_buffer = b''
@@ -58,4 +58,8 @@ class UdpStream:
logger.debug(f'rx {len(data)} bytes')
return data
else:
raise socket.timeout('simulation UdpStream recv timeout - likely, TCAT is stopped on TCAT Device')
return b''
async def disconnect(self):
if self.socket is not None:
self.socket.close()
+174 -82
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2026, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -27,33 +27,38 @@
"""
from abc import abstractmethod
from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \
BBTC_RX_CHAR_UUID
import asyncio
from hashlib import sha256
import hmac
import logging
from os import path
from secrets import token_bytes
from bleak import BLEDevice
from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID
from ble.ble_stream import BleStream
from ble.ble_stream_secure import BleStreamSecure
from ble import ble_scanner
from ble.udp_stream import UdpStream
from cli.command import Command, CommandResultNone, CommandResultTLV, CommandResult, CommandResultError
from dataset.dataset import ThreadDataset
from tlv.tlv import TLV
from tlv.diagnostic_tlv import DiagnosticTLVType
from tlv.tcat_tlv import TcatTLVType
from cli.command import Command, CommandResultNone, CommandResultTLV
from dataset.dataset import ThreadDataset
from utils import select_device_by_user_input
from os import path
from time import time
from secrets import token_bytes
from hashlib import sha256
import hmac
import binascii
CHALLENGE_SIZE = 8
logger = logging.getLogger(__name__)
class HelpCommand(Command):
def get_help_string(self) -> str:
return 'Display help and return.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
commands = context['commands']
for name, command in commands.items():
print(f'{name}')
@@ -72,13 +77,12 @@ class BleCommand(Command):
pass
@abstractmethod
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
pass
async def execute_default(self, args, context):
if 'ble_sstream' not in context or context['ble_sstream'] is None:
print("TCAT Device not connected.")
return CommandResultNone()
async def execute_default(self, args, context) -> CommandResult:
if context['ble_sstream'] is None or not context['ble_sstream'].is_connected:
return CommandResultError("TCAT Device not connected.")
bless: BleStreamSecure = context['ble_sstream']
print(self.get_log_string())
@@ -86,13 +90,12 @@ class BleCommand(Command):
data = self.prepare_data(args, context)
response = await bless.send_with_resp(data)
if not response:
return
return CommandResultNone()
tlv_response = TLV.from_bytes(response)
self.process_response(tlv_response, context)
return CommandResultTLV(tlv_response)
except DataNotPrepared as err:
print('Command failed', err)
return CommandResultNone()
return CommandResultError(f'Command failed: {err}')
def process_response(self, tlv_response, context):
pass
@@ -106,7 +109,7 @@ class HelloCommand(BleCommand):
def get_help_string(self) -> str:
return 'Send round trip "Hello world!" message.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.VENDOR_APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes()
@@ -118,7 +121,7 @@ class GetApplicationLayersCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get supported application layer service names from device.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.GET_APPLICATION_LAYERS.value, bytes()).to_bytes()
def process_response(self, tlv_response, context):
@@ -148,7 +151,7 @@ class SendApplicationData1(BleCommand):
def get_help_string(self) -> str:
return 'Send hex encoded data to application layer 1.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
payload = bytes.fromhex(args[0])
return TLV(TcatTLVType.APPLICATION_DATA_1.value, payload).to_bytes()
@@ -161,7 +164,7 @@ class SendApplicationData2(BleCommand):
def get_help_string(self) -> str:
return 'Send hex encoded data to application layer 2.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
payload = bytes.fromhex(args[0])
return TLV(TcatTLVType.APPLICATION_DATA_2.value, payload).to_bytes()
@@ -174,7 +177,7 @@ class SendApplicationData3(BleCommand):
def get_help_string(self) -> str:
return 'Send hex encoded data to application layer 3.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
payload = bytes.fromhex(args[0])
return TLV(TcatTLVType.APPLICATION_DATA_3.value, payload).to_bytes()
@@ -187,7 +190,7 @@ class SendApplicationData4(BleCommand):
def get_help_string(self) -> str:
return 'Send hex encoded data to application layer 4.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
payload = bytes.fromhex(args[0])
return TLV(TcatTLVType.APPLICATION_DATA_4.value, payload).to_bytes()
@@ -200,7 +203,7 @@ class SendVendorData(BleCommand):
def get_help_string(self) -> str:
return 'Send hex encoded data to vendor specific application layer.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
payload = bytes.fromhex(args[0])
return TLV(TcatTLVType.VENDOR_APPLICATION.value, payload).to_bytes()
@@ -213,7 +216,7 @@ class CommissionCommand(BleCommand):
def get_help_string(self) -> str:
return 'Update the connected device with current dataset.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
dataset: ThreadDataset = context['dataset']
dataset_bytes = dataset.to_bytes()
return TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes()
@@ -227,7 +230,7 @@ class DecommissionCommand(BleCommand):
def get_help_string(self) -> str:
return 'Stop Thread interface and decommission device from current network.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.DECOMMISSION.value, bytes()).to_bytes()
@@ -236,11 +239,8 @@ class DisconnectCommand(Command):
def get_help_string(self) -> str:
return 'Disconnect client from TCAT device'
async def execute_default(self, args, context):
if 'ble_sstream' not in context or context['ble_sstream'] is None:
print("TCAT Device not connected.")
return CommandResultNone()
await context['ble_sstream'].close()
async def execute_default(self, args, context) -> CommandResult:
await disconnect_helper(context)
return CommandResultNone()
@@ -252,10 +252,10 @@ class ExtractDatasetCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get active dataset from device.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.GET_ACTIVE_DATASET.value, bytes()).to_bytes()
def process_response(self, tlv_response, context):
def process_response(self, tlv_response, context) -> None:
if tlv_response.type == TcatTLVType.RESPONSE_W_PAYLOAD.value:
dataset = ThreadDataset()
dataset.set_from_bytes(tlv_response.value)
@@ -270,9 +270,9 @@ class GetCommissionerCertificate(BleCommand):
return 'Getting commissioner certificate.'
def get_help_string(self) -> str:
return 'Get commissioner certificate from device.'
return 'Get last-store TCAT commissioner certificate from TCAT device.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.GET_COMMISSIONER_CERTIFICATE.value, bytes()).to_bytes()
@@ -284,7 +284,7 @@ class GetDeviceIdCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get unique identifier for the TCAT device.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.GET_DEVICE_ID.value, bytes()).to_bytes()
@@ -296,7 +296,7 @@ class GetExtPanIDCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get extended PAN ID that is commissioned in the active dataset.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.GET_EXT_PAN_ID.value, bytes()).to_bytes()
@@ -308,7 +308,7 @@ class GetProvisioningUrlCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get a URL for an application suited to commission the TCAT device.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.GET_PROVISIONING_URL.value, bytes()).to_bytes()
@@ -320,19 +320,23 @@ class GetNetworkNameCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get the Thread network name that is commissioned in the active dataset.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.GET_NETWORK_NAME.value, bytes()).to_bytes()
class GetPskdHash(BleCommand):
def __init__(self):
super().__init__()
self.digest = None
def get_log_string(self) -> str:
return 'Retrieving peer PSKd hash.'
def get_help_string(self) -> str:
return 'Get calculated PSKd hash.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
bless: BleStreamSecure = context['ble_sstream']
if bless.peer_public_key is None:
raise DataNotPrepared("Peer certificate not present.")
@@ -348,7 +352,7 @@ class GetPskdHash(BleCommand):
self.digest = hash.digest()
return data
def process_response(self, tlv_response, context):
def process_response(self, tlv_response, context) -> None:
if tlv_response.value == self.digest:
print('Requested hash is valid.')
else:
@@ -363,17 +367,16 @@ class GetRandomNumberChallenge(BleCommand):
def get_help_string(self) -> str:
return 'Get the device random number challenge.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.GET_RANDOM_NUMBER_CHALLENGE.value, bytes()).to_bytes()
def process_response(self, tlv_response, context):
def process_response(self, tlv_response, context) -> None:
bless: BleStreamSecure = context['ble_sstream']
if tlv_response.value != None:
if tlv_response.value is not None:
if len(tlv_response.value) == CHALLENGE_SIZE:
bless.peer_challenge = tlv_response.value
else:
print('Challenge format invalid.')
return CommandResultNone()
class PingCommand(Command):
@@ -381,26 +384,27 @@ class PingCommand(Command):
def get_help_string(self) -> str:
return 'Send echo request to TCAT device.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
bless: BleStreamSecure = context['ble_sstream']
if bless is None or not bless.is_connected:
return CommandResultError("TCAT Device not connected.")
payload_size = 10
max_payload = 512
if len(args) > 0:
payload_size = int(args[0])
if payload_size > max_payload:
print(f'Payload size too large. Maximum supported value is {max_payload}')
return CommandResultNone()
return CommandResultError(f'Payload size too large. Maximum supported value is {max_payload}')
to_send = token_bytes(payload_size)
data = TLV(TcatTLVType.PING.value, to_send).to_bytes()
elapsed_time = time()
start_time = asyncio.get_running_loop().time()
response = await bless.send_with_resp(data)
elapsed_time = 1e3 * (time() - elapsed_time)
elapsed_time = 1e3 * (asyncio.get_running_loop().time() - start_time)
if not response:
return CommandResultNone()
tlv_response = TLV.from_bytes(response)
if tlv_response.value != to_send:
print("Received malformed response.")
print("Error: Ping response payload mismatch.")
print(f"Roundtrip time: {elapsed_time} ms")
@@ -415,7 +419,7 @@ class PresentHash(BleCommand):
def get_help_string(self) -> str:
return 'Present calculated hash.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
type = args[0]
code = None
tlv_type = None
@@ -445,40 +449,127 @@ class PresentHash(BleCommand):
return data
def _handshake_progress_bar(is_concluded: bool):
if is_concluded:
print('')
else:
print('.', end='', flush=True)
async def connect_helper(device: BLEDevice | UdpStream,
context: dict,
timeout_ble=30.0,
timeout_simulation=5.0) -> bool:
"""Helper function for CLI and commands to establish a new secure connection with a TCAT device.
Handles both BLE and simulated UDP connections. Loads certificates and performs handshake
to establish a secure channel. Connection objects are stored in the context dictionary.
Args:
device: BLEDevice object, or UdpStream (for simulation)
context: Dictionary containing application context including command line arguments
timeout_ble: Timeout in seconds for handshake with real TCAT device (default: 30.0)
timeout_simulation: Timeout in seconds for handshake for simulated TCAT device (default: 5.0)
Returns:
True if connection was successful, False otherwise.
Raises:
Exception: If certificates cannot be loaded
"""
is_simulation = isinstance(device, UdpStream)
is_debug = logger.getEffectiveLevel() <= logging.DEBUG
print(f'Connecting to {device}')
if not is_simulation:
ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
else:
ble_stream = device
ble_sstream = BleStreamSecure(ble_stream)
context['ble_sstream'] = ble_sstream
context['ble_stream'] = ble_stream
cert_path = context['cmd_args'].cert_path if context['cmd_args'] else 'auth'
ble_sstream.load_cert(
certfile=path.join(cert_path, 'commissioner_cert.pem'),
keyfile=path.join(cert_path, 'commissioner_key.pem'),
cafile=path.join(cert_path, 'ca_cert.pem'),
)
logger.info(f"Certificates and key loaded from '{cert_path}'")
print('Setting up secure channel...')
ok = False
try:
cb = None
if not is_debug:
cb = _handshake_progress_bar
ok = await ble_sstream.do_handshake(progress_callback=cb,
timeout=timeout_simulation if is_simulation else timeout_ble)
except Exception as e:
logger.error(e)
if ok:
print('Done')
return True
else:
if ble_stream is not None:
await ble_stream.disconnect()
context['ble_stream'] = None
context['ble_sstream'] = None
return False
async def disconnect_helper(context: dict) -> None:
"""Helper function for CLI and commands to disconnect from a TCAT device."""
bless: BleStreamSecure = context['ble_sstream']
doing_disconn = False
if bless is not None and bless.is_connected:
print('Disconnecting...')
doing_disconn = True
logger.debug('Closing TLS connection.')
await bless.close(timeout=5.0)
context['ble_sstream'] = None
bles = context['ble_stream']
if bles is not None:
logger.debug('Closing BLE connection.')
await bles.disconnect()
context['ble_stream'] = None
if doing_disconn:
print('Done')
class ScanCommand(Command):
def get_help_string(self) -> str:
return 'Perform scan for TCAT devices.'
async def execute_default(self, args, context):
if 'ble_sstream' in context and context['ble_sstream'] is not None:
context['ble_sstream'].close()
del context['ble_sstream']
async def execute_default(self, args, context) -> CommandResult:
if context['ble_sstream'] is not None and context['ble_sstream'].is_connected:
return CommandResultError('already connected to a TCAT device. Use \'disconnect\' first.')
print('Scanning for BLE TCAT devices...')
tcat_devices = await ble_scanner.scan_tcat_devices()
device = select_device_by_user_input(tcat_devices)
if device is not None:
await connect_helper(device, context)
if device is None:
return CommandResultNone()
return CommandResultNone()
ble_sstream = None
print(f'Connecting to {device}')
ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
ble_sstream = BleStreamSecure(ble_stream)
cert_path = context['cmd_args'].cert_path if context['cmd_args'] else 'auth'
ble_sstream.load_cert(
certfile=path.join(cert_path, 'commissioner_cert.pem'),
keyfile=path.join(cert_path, 'commissioner_key.pem'),
cafile=path.join(cert_path, 'ca_cert.pem'),
)
print('Setting up secure channel...')
if await ble_sstream.do_handshake():
print('Done')
context['ble_sstream'] = ble_sstream
else:
print('Secure channel not established.')
await ble_stream.disconnect()
class SimulationCommand(Command):
def get_help_string(self) -> str:
return 'Connect to a simulated TCAT device over UDP.'
async def execute_default(self, args, context) -> CommandResult:
if len(args) != 1:
return CommandResultError('need index number of simulated TCAT device as first argument.')
if context['ble_sstream'] is not None and context['ble_sstream'].is_connected:
return CommandResultError('already connected to a TCAT device. Use \'disconnect\' first.')
device = UdpStream("127.0.0.1", int(args[0]))
await connect_helper(device, context)
return CommandResultNone()
@@ -490,7 +581,7 @@ class DiagnosticTlvsCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get diagnostic TLVs from the TCAT device.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
num_args = DiagnosticTLVType.names_to_numbers(args)
try:
if not num_args:
@@ -515,7 +606,7 @@ class ThreadStartCommand(BleCommand):
def get_help_string(self) -> str:
return 'Enable thread interface.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes()
@@ -527,18 +618,19 @@ class ThreadStopCommand(BleCommand):
def get_help_string(self) -> str:
return 'Disable thread interface.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes()
class ThreadStateCommand(Command):
def __init__(self):
super().__init__()
self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()}
def get_help_string(self) -> str:
return 'Manipulate state of the Thread interface of the connected device.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
print('Invalid usage. Provide a subcommand.')
return CommandResultNone()
+31 -14
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -25,28 +25,31 @@
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
from argparse import Namespace
import logging
import readline
import shlex
from argparse import Namespace
from ble.ble_stream_secure import BleStreamSecure
from typing import Optional
from cli.base_commands import (DisconnectCommand, HelpCommand, HelloCommand, CommissionCommand, DecommissionCommand,
ExtractDatasetCommand, GetCommissionerCertificate, GetDeviceIdCommand, GetPskdHash,
GetExtPanIDCommand, GetNetworkNameCommand, GetProvisioningUrlCommand, PingCommand,
GetRandomNumberChallenge, ThreadStateCommand, ScanCommand, PresentHash,
DiagnosticTlvsCommand, GetApplicationLayersCommand, SendVendorData,
SendApplicationData1, SendApplicationData2, SendApplicationData3, SendApplicationData4)
SendApplicationData1, SendApplicationData2, SendApplicationData3, SendApplicationData4,
SimulationCommand, connect_helper, disconnect_helper)
from .command import CommandResultNone, CommandResult
from .tlv_commands import TlvCommand
from cli.dataset_commands import (DatasetCommand)
from dataset.dataset import ThreadDataset
from typing import Optional
logger = logging.getLogger(__name__)
class CLI:
def __init__(self,
dataset: ThreadDataset,
cmd_args: Optional[Namespace] = None,
ble_sstream: Optional[BleStreamSecure] = None):
def __init__(self, dataset: ThreadDataset, cmd_args: Optional[Namespace] = None):
self._commands = {
'help': HelpCommand(),
'hello': HelloCommand(),
@@ -68,6 +71,7 @@ class CLI:
'get_dataset': ExtractDatasetCommand(),
'thread': ThreadStateCommand(),
'scan': ScanCommand(),
'simulation': SimulationCommand(),
'random_challenge': GetRandomNumberChallenge(),
'present_hash': PresentHash(),
'peer_pskd_hash': GetPskdHash(),
@@ -75,8 +79,9 @@ class CLI:
'get_comm_cert': GetCommissionerCertificate(),
'diagnostic_tlvs': DiagnosticTlvsCommand()
}
self._context = {
'ble_sstream': ble_sstream,
self.context = {
'ble_sstream': None, # BleStreamSecure | None
'ble_stream': None, # BleStream | None
'dataset': dataset,
'commands': self._commands,
'cmd_args': cmd_args
@@ -116,10 +121,10 @@ class CLI:
else:
return None
async def evaluate_input(self, user_input):
async def evaluate_input(self, user_input) -> CommandResult:
# do not parse empty commands
if not user_input.strip():
return
return CommandResultNone()
command_parts = shlex.split(user_input)
command = command_parts[0]
@@ -128,4 +133,16 @@ class CLI:
if command not in self._commands.keys():
raise Exception('Invalid command: {}'.format(command))
return await self._commands[command].execute(args, self._context)
return await self._commands[command].execute(args, self.context)
async def connect(self, device) -> bool:
"""
Connect with TLS to the BLE/simulation device.
:param device: the BLE device object or simulation UdpStream object
:return: True if connection was successful, False otherwise
"""
return await connect_helper(device, self.context)
async def disconnect(self):
""" Disconnect from the BLE/simulation device. """
await disconnect_helper(self.context)
+23 -4
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -26,11 +26,12 @@
POSSIBILITY OF SUCH DAMAGE.
"""
from abc import ABC, abstractmethod
from tlv.tlv import TLV
from tlv.tcat_tlv import TcatTLVType
from ble.ble_stream_secure import BleStreamSecure
from abc import ABC, abstractmethod
from utils import is_printable_ascii
class CommandResult(ABC):
@@ -58,7 +59,7 @@ class Command(ABC):
return await self._subcommands[args[0]].execute(args[1:], context)
@abstractmethod
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
pass
@abstractmethod
@@ -91,9 +92,27 @@ class CommandResultTLV(CommandResult):
print(f'\tTYPE:\tunknown: {hex(tlv.type)} ({tlv.type})')
print(f'\tLEN:\t{len(tlv.value)}')
print(f'\tVALUE:\t0x{tlv.value.hex()}')
if is_printable_ascii(tlv.value):
print(f'\tSTRING:\t{tlv.value.decode("ascii")}')
class CommandResultNone(CommandResult):
def pretty_print(self):
pass
class CommandResultDone(CommandResult):
def pretty_print(self):
print("Done")
class CommandResultError(CommandResult):
def __init__(self, message):
super().__init__()
self.message = message
def pretty_print(self):
print(f"Error: {self.message}")
+25 -24
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -26,21 +26,21 @@
POSSIBILITY OF SUCH DAMAGE.
"""
from cli.command import Command, CommandResultNone
from dataset.dataset import ThreadDataset, initial_dataset
from tlv.dataset_tlv import MeshcopTlvType
from copy import deepcopy
from cli.command import Command, CommandResult, CommandResultNone, CommandResultDone
from dataset.dataset import ThreadDataset, initial_dataset
from tlv.dataset_tlv import MeshcopTlvType
def handle_dataset_entry_command(type: MeshcopTlvType, args, context):
def handle_dataset_entry_command(type: MeshcopTlvType, args, context) -> CommandResult:
ds: ThreadDataset = context['dataset']
if len(args) == 0:
ds.get_entry(type).print_content()
return CommandResultNone()
ds.set_entry(type, args)
print('Done.')
return CommandResultNone()
return CommandResultDone()
class DatasetClearCommand(Command):
@@ -48,7 +48,7 @@ class DatasetClearCommand(Command):
def get_help_string(self) -> str:
return 'Clear dataset.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
ds: ThreadDataset = context['dataset']
ds.clear()
return CommandResultNone()
@@ -59,7 +59,7 @@ class DatasetHelpCommand(Command):
def get_help_string(self) -> str:
return 'Display help message and return.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
indent_width = 4
indentation = ' ' * indent_width
commands: ThreadDataset = context['commands']
@@ -77,7 +77,7 @@ class DatasetHexCommand(Command):
def get_help_string(self) -> str:
return 'Get or set dataset as hex-encoded TLVs.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
ds: ThreadDataset = context['dataset']
if args:
try:
@@ -98,7 +98,7 @@ class ReloadDatasetCommand(Command):
def get_help_string(self) -> str:
return 'Reset dataset to the initial value.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
context['dataset'].set_from_bytes(initial_dataset)
return CommandResultNone()
@@ -108,7 +108,7 @@ class ActiveTimestampCommand(Command):
def get_help_string(self) -> str:
return 'View and set ActiveTimestamp seconds. Arguments: [seconds (int)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.ACTIVETIMESTAMP, args, context)
@@ -117,7 +117,7 @@ class PendingTimestampCommand(Command):
def get_help_string(self) -> str:
return 'View and set PendingTimestamp seconds. Arguments: [seconds (int)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.PENDINGTIMESTAMP, args, context)
@@ -126,7 +126,7 @@ class NetworkKeyCommand(Command):
def get_help_string(self) -> str:
return 'View and set NetworkKey. Arguments: [nk (hexstring, len=32)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.NETWORKKEY, args, context)
@@ -135,7 +135,7 @@ class NetworkNameCommand(Command):
def get_help_string(self) -> str:
return 'View and set NetworkName. Arguments: [nn (string, maxlen=16)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.NETWORKNAME, args, context)
@@ -144,7 +144,7 @@ class ExtPanIDCommand(Command):
def get_help_string(self) -> str:
return 'View and set ExtPanID. Arguments: [extpanid (hexstring, len=16)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.EXTPANID, args, context)
@@ -153,7 +153,7 @@ class MeshLocalPrefixCommand(Command):
def get_help_string(self) -> str:
return 'View and set MeshLocalPrefix. Arguments: [mlp (hexstring, len=16)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.MESHLOCALPREFIX, args, context)
@@ -162,7 +162,7 @@ class DelayTimerCommand(Command):
def get_help_string(self) -> str:
return 'View and set DelayTimer delay. Arguments: [delay (int)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.DELAYTIMER, args, context)
@@ -171,7 +171,7 @@ class PanIDCommand(Command):
def get_help_string(self) -> str:
return 'View and set PanID. Arguments: [panid (hexstring, len=4)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.PANID, args, context)
@@ -180,7 +180,7 @@ class ChannelCommand(Command):
def get_help_string(self) -> str:
return 'View and set Channel. Arguments: [channel (int)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.CHANNEL, args, context)
@@ -189,7 +189,7 @@ class ChannelMaskCommand(Command):
def get_help_string(self) -> str:
return 'View and set ChannelMask. Arguments: [mask (hexstring)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.CHANNELMASK, args, context)
@@ -198,7 +198,7 @@ class PskcCommand(Command):
def get_help_string(self) -> str:
return 'View and set Pskc. Arguments: [pskc (hexstring, maxlen=32)]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.PSKC, args, context)
@@ -208,13 +208,14 @@ class SecurityPolicyCommand(Command):
return 'View and set SecurityPolicy. Arguments: '\
'[<rotation_time (int)> [flags (string)] [version_threshold (int)]]'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
return handle_dataset_entry_command(MeshcopTlvType.SECURITYPOLICY, args, context)
class DatasetCommand(Command):
def __init__(self):
super().__init__()
self._subcommands = {
'clear': DatasetClearCommand(),
'help': DatasetHelpCommand(),
@@ -238,7 +239,7 @@ class DatasetCommand(Command):
return 'View and manipulate current dataset. ' \
'Call without parameters to show current dataset.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
ds: ThreadDataset = context['dataset']
ds.print_content()
return CommandResultNone()
+6 -5
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -26,7 +26,7 @@
POSSIBILITY OF SUCH DAMAGE.
"""
from .base_commands import BleCommand, CommandResultNone, Command
from .base_commands import BleCommand, CommandResult, CommandResultNone, Command
from tlv.tlv import TLV
from tlv.tcat_tlv import TcatTLVType
@@ -36,7 +36,7 @@ class TlvCommandList(Command):
def get_help_string(self) -> str:
return 'List available TLV types to use in \'tlv send\'.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
list_tlv = "\n".join([f"{tlv.value:#x}\t{tlv.name}" for tlv in TcatTLVType])
print(f"\n{list_tlv}")
return CommandResultNone()
@@ -50,7 +50,7 @@ class TlvCommandSend(BleCommand):
def get_help_string(self) -> str:
return 'Send TLV with arbitrary payload: \'tlv send <TLV_TYPE> <TLV_PAYLOAD>\'.'
def prepare_data(self, args, context):
def prepare_data(self, args, context) -> bytes:
tlv_type = TcatTLVType(int(args[0], 16))
tlv_value = bytes()
try:
@@ -64,11 +64,12 @@ class TlvCommandSend(BleCommand):
class TlvCommand(Command):
def __init__(self):
super().__init__()
self._subcommands = {'list': TlvCommandList(), 'send': TlvCommandSend()}
def get_help_string(self) -> str:
return 'Send TLV with arbitrary payload.'
async def execute_default(self, args, context):
async def execute_default(self, args, context) -> CommandResult:
self.print_help()
return CommandResultNone()
+23 -11
View File
@@ -1,17 +1,29 @@
"""
Copyright (c) 2023 Nordic Semiconductor ASA
Copyright (c) 2025, The OpenThread Authors.
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
from typing import Dict, List
@@ -1,23 +1,35 @@
"""
Copyright (c) 2023 Nordic Semiconductor ASA
Copyright (c) 2025, The OpenThread Authors.
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
import struct
import inspect
from typing import List
from abc import ABC, abstractmethod
import inspect
import struct
from typing import List
from tlv.dataset_tlv import MeshcopTlvType
from tlv.tlv import TLV
@@ -461,7 +473,9 @@ class ChannelMask(DatasetEntry):
class ChannelMaskEntry(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.CHANNELMASK) # Note: type not used in set_from_tlv / to_tlv
self.channel_page = 0
self.mask_length = 0
self.channel_mask: bytes = None
def set(self, args: List[str]):
+3 -2
View File
@@ -4,9 +4,10 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "tcat-ble-client"
version = "0.2.0"
description = "TCAT Commissioner (BBTC) client to configure TCAT devices"
version = "0.3.0"
description = "TCAT Commissioner (BBTC) test client to configure TCAT devices"
authors = ["Piotr Jasinski <piotr.jasinski@nordicsemi.no>", "Esko Dijk <esko.dijk@iotconsultancy.nl>"]
package-mode = false
[tool.poetry.dependencies]
python = "^3.10"
+2 -1
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -31,6 +31,7 @@ from enum import Enum
class TcatTLVType(Enum):
RESPONSE_W_STATUS = 0x01
RESPONSE_W_PAYLOAD = 0x02
RESPONSE_EVENT = 0x03
GET_NETWORK_NAME = 0x08
DISCONNECT = 0x09
PING = 0x0A
+18 -1
View File
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024, The OpenThread Authors.
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -27,9 +27,12 @@
"""
import base64
import string
from tlv import advertised_tlv
PRINTABLE_ASCII_BYTES = set(string.printable.encode('ascii'))
def get_int_in_range(min_value, max_value):
while True:
@@ -143,3 +146,17 @@ def superimpose_centered_string(background: str, foreground: str) -> str:
end_index = start_index + len_fg
return background[:start_index] + foreground + background[end_index:]
def is_printable_ascii(data: bytes) -> bool:
"""
Checks if a byte array contains only printable and human-readable ASCII characters.
Args:
data: The byte array to check.
Returns:
True if all bytes are printable ASCII, False otherwise.
"""
# The all() function stops as soon as it finds a failing case.
return all(byte_val in PRINTABLE_ASCII_BYTES for byte_val in data)