diff --git a/tools/tcat_ble_client/README.md b/tools/tcat_ble_client/README.md index 6bf5d81c2..a9c1548a3 100644 --- a/tools/tcat_ble_client/README.md +++ b/tools/tcat_ble_client/README.md @@ -19,7 +19,7 @@ If pipx is not available, it can be installed for Linux/Windows/MacOS following Then, install this project using Poetry: ``` -poetry install --no-root +poetry install ``` This will install all the required modules to a virtual environment, which can be used by calling `poetry run ` from the project directory. diff --git a/tools/tcat_ble_client/auth-cert/CommCert3/ca_cert.pem b/tools/tcat_ble_client/auth-cert/CommCert3/ca_cert.pem index 1f854531a..ecaaa3a3b 100644 --- a/tools/tcat_ble_client/auth-cert/CommCert3/ca_cert.pem +++ b/tools/tcat_ble_client/auth-cert/CommCert3/ca_cert.pem @@ -12,3 +12,17 @@ A1UdDgQWBBTgTKehLD14MpkRU+S6azYhYsAkKjAKBggqhkjOPQQDAgNIADBFAiA2 Wp9JGbwiqbW0l0fTS+AKdp6xFXkmuePftuUTsnMKcgIhAPdC1zdx8fHPoTnRLpiH Pt2/QkcSashR9zOp9MrBnRPb -----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIICOzCCAeGgAwIBAgIJAKOc2hehOGoBMAoGCCqGSM49BAMCMHExJjAkBgNVBAMM +HVRocmVhZCBDZXJ0aWZpY2F0aW9uIERldmljZUNBMRkwFwYDVQQKDBBUaHJlYWQg +R3JvdXAgSW5jMRIwEAYDVQQHDAlTYW4gUmFtb24xCzAJBgNVBAgMAkNBMQswCQYD +VQQGEwJVUzAeFw0yNDA1MDMyMDAyMThaFw00NDA0MjgyMDAyMThaMHExJjAkBgNV +BAMMHVRocmVhZCBDZXJ0aWZpY2F0aW9uIERldmljZUNBMRkwFwYDVQQKDBBUaHJl +YWQgR3JvdXAgSW5jMRIwEAYDVQQHDAlTYW4gUmFtb24xCzAJBgNVBAgMAkNBMQsw +CQYDVQQGEwJVUzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABGy850VBIPTkN3oL +x++zIUsZk2k26w4fuieFz9oNvjdb5W14+Yf3mvGWsl4NHyLxqhmamVAR4h7zWRlZ +0XyMVpKjYjBgMB4GA1UdEQQXMBWBE3RvbUB0aHJlYWRncm91cC5vcmcwDgYDVR0P +AQH/BAQDAgGGMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYEFF+rGyloiKHUtDGo +hmHn52ZZ7fgZMAoGCCqGSM49BAMCA0gAMEUCIQCTq1qjPZs9fAJB6ppTXs588Pnu +eVFOwC8bd//D99KiHAIgU84kwFHIyDvFqu6y+u1hFqBGsiuTmKwZ2PHhVe/xK1k= +-----END CERTIFICATE----- diff --git a/tools/tcat_ble_client/bbtc.py b/tools/tcat_ble_client/bbtc.py index b9603b13b..54e07c5c2 100755 --- a/tools/tcat_ble_client/bbtc.py +++ b/tools/tcat_ble_client/bbtc.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -29,24 +29,26 @@ import asyncio import argparse import logging -import os -from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \ - BBTC_RX_CHAR_UUID -from ble.ble_stream import BleStream +from bleak import BLEDevice + from ble.ble_stream_secure import BleStreamSecure from ble.udp_stream import UdpStream from ble import ble_scanner from cli.cli import CLI from dataset.dataset import ThreadDataset from cli.command import CommandResult -from utils import select_device_by_user_input, quit_with_reason +from tlv.tcat_tlv import TcatTLVType +from tlv.tlv import TLV +from utils import hexdump_ot, select_device_by_user_input, quit_with_reason logger = logging.getLogger(__name__) +logged_modules = ['ble', 'cli', 'dataset', 'tlv', 'utils'] async def main(): - logging.basicConfig(level=logging.WARNING) + log_level = logging.WARNING + logging.basicConfig(level=log_level) parser = argparse.ArgumentParser(description='Device parameters') parser.add_argument('-a', '--adapter', help='Select HCI adapter') @@ -61,80 +63,86 @@ async def main(): args = parser.parse_args() if args.debug: - logger.setLevel(logging.DEBUG) - logging.getLogger('ble.ble_stream').setLevel(logging.DEBUG) - logging.getLogger('ble.ble_stream_secure').setLevel(logging.DEBUG) - logging.getLogger('ble.udp_stream').setLevel(logging.DEBUG) + log_level = logging.DEBUG elif args.info: - logger.setLevel(logging.INFO) - logging.getLogger('ble.ble_stream').setLevel(logging.INFO) - logging.getLogger('ble.ble_stream_secure').setLevel(logging.INFO) - logging.getLogger('ble.udp_stream').setLevel(logging.INFO) + log_level = logging.INFO + logger.setLevel(log_level) + for module in logged_modules: + logging.getLogger(module).setLevel(log_level) - is_debug = logger.getEffectiveLevel() <= logging.DEBUG device = await get_device_by_args(args) - ble_sstream = None - - if device is not None: - print(f'Connecting to {device}') - ble_sstream = BleStreamSecure(device) - ble_sstream.load_cert( - certfile=os.path.join(args.cert_path, 'commissioner_cert.pem'), - keyfile=os.path.join(args.cert_path, 'commissioner_key.pem'), - cafile=os.path.join(args.cert_path, 'ca_cert.pem'), - ) - logger.info(f"Certificates and key loaded from '{args.cert_path}'") - - print('Setting up secure TLS channel..', end='') - ok = False - try: - cb = None - if not is_debug: - cb = handshake_progress_bar - ok = await ble_sstream.do_handshake(progress_callback=cb) - except Exception as e: - logger.error(e) - - if ok: - print('Done') - else: - print('Failed') - quit_with_reason('TLS handshake failure') - + # create CLI and (if selected) connect to TCAT device ds = ThreadDataset() - cli = CLI(ds, args, ble_sstream) - loop = asyncio.get_running_loop() + cli = CLI(ds, args) + if device is not None: + if not await cli.connect(device): + quit_with_reason('Failed to connect to TCAT device: TLS handshake failed.') + + # Task 1: run a receiver that gets unsolicited event data or TLS Alerts from TLS server. + receiver_task = asyncio.create_task(receive_loop(cli.context)) + + # Task 2: run the CLI print('Enter \'help\' to see available commands or \'exit\' to exit the application.') + loop = asyncio.get_running_loop() while True: user_input = await loop.run_in_executor(None, lambda: input('> ')) if user_input.lower() == 'exit': break try: result: CommandResult = await cli.evaluate_input(user_input) - if result: - result.pretty_print() + result.pretty_print() except Exception as e: logger.error(e) + logger.debug(e, exc_info=True) - print('Disconnecting...') - if ble_sstream is not None: - await ble_sstream.close() + # Stop Task 1 + receiver_task.cancel() + try: + await receiver_task + except asyncio.CancelledError: + # CancelledError is expected when awaiting the canceled task - not an error. + pass + + # Disconnect from TCAT device (if still needed) + await cli.disconnect() -async def get_device_by_args(args): +async def receive_loop(cli_context: dict): + while True: + bless: BleStreamSecure = cli_context['ble_sstream'] + if bless is not None: + data = await bless.recv_unsolicited_event() + if data: + logger.info('Received event data from TCAT Device:\n' + hexdump_ot("Event", data)) + tlv = TLV.from_bytes(data) + validate_unsolicited_tlv(tlv) + continue + await asyncio.sleep(0.100) + + +def validate_unsolicited_tlv(tlv: TLV): + if tlv.type in [ + TcatTLVType.APPLICATION_DATA_1.value, TcatTLVType.APPLICATION_DATA_2.value, + TcatTLVType.APPLICATION_DATA_3.value, TcatTLVType.APPLICATION_DATA_4.value + ]: + num = tlv.type - TcatTLVType.APPLICATION_DATA_1.value + 1 + logger.info(f" - Send Application Data {num} {hex(tlv.type)}") + elif tlv.type in [TcatTLVType.RESPONSE_EVENT.value]: + logger.info(f" - Response Event {hex(tlv.type)}") + else: + logger.error(f"Error: Illegal unsolicited TLV type sent by TCAT Device: {hex(tlv.type)}") + + +async def get_device_by_args(args) -> BLEDevice | UdpStream | None: device = None if args.mac: device = await ble_scanner.find_first_by_mac(args.mac) - device = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID) elif args.name: device = await ble_scanner.find_first_by_name(args.name) - device = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID) elif args.scan: tcat_devices = await ble_scanner.scan_tcat_devices(adapter=args.adapter) device = select_device_by_user_input(tcat_devices) - if device: - device = await BleStream.create(device, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID) elif args.simulation: device = UdpStream("127.0.0.1", int(args.simulation)) diff --git a/tools/tcat_ble_client/ble/ble_scanner.py b/tools/tcat_ble_client/ble/ble_scanner.py index 408e4d2e2..0381898d0 100644 --- a/tools/tcat_ble_client/ble/ble_scanner.py +++ b/tools/tcat_ble_client/ble/ble_scanner.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,29 +26,31 @@ POSSIBILITY OF SUCH DAMAGE. """ +from typing import Optional + from bleak import BleakScanner from bleak.backends.device import BLEDevice from bleak.uuids import normalize_uuid_str -from bbtc import BBTC_SERVICE_UUID -from typing import Optional + +from ble.ble_connection_constants import BBTC_SERVICE_UUID from ble.ble_advertisement_data import AdvertisedData -async def find_first_by_name(name): +async def find_first_by_name(name) -> BLEDevice: match_name = lambda dev, adv_data: name == dev.name device = await BleakScanner.find_device_by_filter(match_name) return device -async def find_first_by_mac(mac): +async def find_first_by_mac(mac) -> BLEDevice: match_mac = lambda dev, adv_data: mac.upper() == dev.address device = await BleakScanner.find_device_by_filter(match_mac) return device -async def scan_tcat_devices(adapter: Optional[str] = None): +async def scan_tcat_devices(adapter: Optional[str] = None) -> list[tuple[BLEDevice, Optional[AdvertisedData]]]: scanner = BleakScanner() - tcat_devices: list[BLEDevice] = [] + tcat_devices: list[tuple[BLEDevice, Optional[AdvertisedData]]] = [] service_uuids = [normalize_uuid_str(BBTC_SERVICE_UUID)] discovered_devices = await scanner.discover(return_adv=True, service_uuids=service_uuids, adapter=adapter) for _, (device, adv) in discovered_devices.items(): diff --git a/tools/tcat_ble_client/ble/ble_stream.py b/tools/tcat_ble_client/ble/ble_stream.py index 22ab02a2a..3c6e5c786 100644 --- a/tools/tcat_ble_client/ble/ble_stream.py +++ b/tools/tcat_ble_client/ble/ble_stream.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2026, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,11 +26,10 @@ POSSIBILITY OF SUCH DAMAGE. """ +import asyncio from itertools import count, takewhile -from typing import Iterator, Union import logging -import time -from asyncio import sleep +from typing import Iterator, Union from bleak import BleakClient from bleak.backends.device import BLEDevice @@ -62,7 +61,7 @@ class BleStream: def __handle_rx(self, _: BleakGATTCharacteristic, data: bytearray): logger.debug(f'rx {len(data)} bytes') self.__receive_buffer += data - self.__last_recv_time = time.time() + self.__last_recv_time = asyncio.get_running_loop().time() @staticmethod def __sliced(data: bytes, n: int) -> Iterator[bytes]: @@ -84,12 +83,12 @@ class BleStream: await self.client.write_gatt_char(rx_char, s) return len(data) - async def recv(self, bufsize, recv_timeout=0.2): + async def recv(self, bufsize, recv_timeout=0.200): if not self.__receive_buffer: return b'' - while time.time() - self.__last_recv_time <= recv_timeout: - await sleep(0.1) + while asyncio.get_running_loop().time() - self.__last_recv_time <= recv_timeout: + await asyncio.sleep(0.020) data = self.__receive_buffer[:bufsize] self.__receive_buffer = self.__receive_buffer[bufsize:] diff --git a/tools/tcat_ble_client/ble/ble_stream_secure.py b/tools/tcat_ble_client/ble/ble_stream_secure.py index 36f3751b6..0bdf04bb9 100644 --- a/tools/tcat_ble_client/ble/ble_stream_secure.py +++ b/tools/tcat_ble_client/ble/ble_stream_secure.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2026, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -30,11 +30,11 @@ import _ssl import asyncio import logging import ssl -from time import time from typing import Optional, Callable from cryptography.x509 import load_der_x509_certificate from cryptography.hazmat.primitives.serialization import (Encoding, PublicFormat) + from tlv.tlv import TLV from tlv.tcat_tlv import TcatTLVType import utils @@ -53,6 +53,10 @@ class BleStreamSecure: self.cert = '' self.peer_challenge = None self._peer_public_key = None + self._recv_lock = asyncio.Lock() + self._close_notify_sent = False + self._close_notify_received = False + self._async_events_queue = asyncio.Queue() def load_cert(self, certfile='', keyfile='', cafile=''): if certfile and keyfile: @@ -65,7 +69,10 @@ class BleStreamSecure: if cafile: self.ssl_context.load_verify_locations(cafile=cafile) - async def do_handshake(self, timeout=30.0, progress_callback: Optional[Callable[[bool], None]] = None) -> bool: + async def do_handshake(self, + buffersize: int = 4096, + timeout: float = 30.0, + progress_callback: Optional[Callable[[bool], None]] = None) -> bool: """ Performs a TLS handshake with a TCAT Device, reporting progress via an optional callback. @@ -79,6 +86,9 @@ class BleStreamSecure: Returns: True if the TLS handshake was successful, False otherwise. """ + self._close_notify_sent = False + self._close_notify_received = False + self._peer_public_key = None self.ssl_object = self.ssl_context.wrap_bio( incoming=self.incoming, outgoing=self.outgoing, @@ -87,8 +97,8 @@ class BleStreamSecure: ) try: - start = time() - while (time() - start) < timeout: + start = asyncio.get_running_loop().time() + while (asyncio.get_running_loop().time() - start) < timeout: try: if progress_callback: progress_callback(False) @@ -96,26 +106,26 @@ class BleStreamSecure: break # SSLWantWrite means ssl wants to send data over the link, - # but might need a receive first + # but might need to receive first except ssl.SSLWantWriteError: - output = await self.stream.recv(4096) - if output: - self.incoming.write(output) - data = self.outgoing.read() - if data: - await self.stream.send(data) - await asyncio.sleep(0.02) + recv_data = await self.stream.recv(buffersize) + if recv_data: + self.incoming.write(recv_data) + send_data = self.outgoing.read() + if send_data: + await self.stream.send(send_data) + await asyncio.sleep(0.020) # SSLWantRead means ssl wants to receive data from the link, # but might need to send first except ssl.SSLWantReadError: - data = self.outgoing.read() - if data: - await self.stream.send(data) - output = await self.stream.recv(4096) - if output: - self.incoming.write(output) - await asyncio.sleep(0.02) + send_data = self.outgoing.read() + if send_data: + await self.stream.send(send_data) + recv_data = await self.stream.recv(buffersize) + if recv_data: + self.incoming.write(recv_data) + await asyncio.sleep(0.020) except ssl.SSLCertVerificationError as err: if progress_callback: @@ -151,50 +161,184 @@ class BleStreamSecure: self.log_cert_identities() return True - async def send(self, bytes): - logger.debug(f"tx {len(bytes)} bytes\n" + utils.hexdump_ot("Tx", bytes)) - self.ssl_object.write(bytes) - encode = self.outgoing.read(4096) - await self.stream.send(encode) + # Precondition: caller must handle all exceptions raised + async def _send(self, data: bytes, buffersize: int = 4096) -> None: + hexdump_str = utils.hexdump_ot("Tx", data) if len(data) > 0 else '' + logger.debug(f"tx {len(data)} bytes\n{hexdump_str}") + self.ssl_object.write(data) + while self.outgoing.pending > 0: + encrypted_chunk = self.outgoing.read(buffersize) + await self.stream.send(encrypted_chunk) - async def recv(self, buffersize, timeout=1): - end_time = asyncio.get_event_loop().time() + timeout + # Precondition: must only be called by _recv() + async def _on_alert_from_peer(self): + self._close_notify_received = True + if not self._close_notify_sent: + logger.warning('TLS connection closed by peer.') + else: + logger.debug('TLS connection closed by local, close-notify alert received from peer.') + + await self._close_tls_gracefully() + + # Precondition: caller must acquire _recv_lock before calling this method + # Precondition: caller must handle all exceptions raised + async def _recv(self, buffersize: int = 4096, timeout: float = 0.0) -> bytes: + if self._close_notify_received and self._close_notify_sent: + return b'' + + slp_time = 0.020 + end_time = asyncio.get_running_loop().time() + timeout data = await self.stream.recv(buffersize) - while not data and asyncio.get_event_loop().time() < end_time: - await asyncio.sleep(0.1) + while not data and asyncio.get_running_loop().time() < end_time: + await asyncio.sleep(slp_time) data = await self.stream.recv(buffersize) if not data: - logger.warning('No response when response expected.') return b'' self.incoming.write(data) while True: try: - decode = self.ssl_object.read(4096) + decode = self.ssl_object.read(buffersize) break - # if recv called before entire message was received from the link + # if _recv called before entire message was received from the link except ssl.SSLWantReadError: more = await self.stream.recv(buffersize) while not more: - await asyncio.sleep(0.1) + await asyncio.sleep(slp_time) more = await self.stream.recv(buffersize) self.incoming.write(more) - logger.debug(f"rx {len(decode)} bytes\n" + utils.hexdump_ot("Rx", decode)) + hexdump_str = utils.hexdump_ot("Rx", decode) if len(decode) > 0 else '' + logger.debug(f"rx {len(decode)} bytes\n{hexdump_str}") + + # ssl_object.read returns 0 bytes when peer side sent TLS Alert close/notify. + if len(decode) == 0: + await self._on_alert_from_peer() + return decode - async def send_with_resp(self, bytes): - await self.send(bytes) - res = await self.recv(buffersize=4096, timeout=5) - return res + async def send_with_resp(self, data: bytes, timeout: float = 5.0) -> bytes: + """ + Send data to the server over the secure TLS connection and wait for response data. - async def close(self): - if self.ssl_object.session is not None: - logger.debug('sending Disconnect command TLV') - data = TLV(TcatTLVType.DISCONNECT.value, bytes()).to_bytes() - self.peer_challenge = None + Args: + data: The data to send to the server. + timeout: The maximum time in seconds to wait for the response data. Defaults to 5.0 seconds. + + Returns: + bytes: The received response data as bytes, or empty b'' if no data TLV/line was received + within the timeout. + """ + async with self._recv_lock: + # first receive any pending unsolicited events and store in FIFO queue + while True: + pend_data = await self._recv(timeout=0.0) + if len(pend_data) == 0: + break + await self._async_events_queue.put(pend_data) + + # then send the data (command) and wait for the response (1 TLV or line) + await self._send(data) + res = await self._recv(timeout=timeout) + if len(res) == 0: + logger.error(f'No response when response TLV/line expected (timeout={timeout}s).') + return res + + async def recv_unsolicited_event(self) -> bytes: + """ + Receive unsolicited event data, if any, from the server over the secure TLS connection. + + This method is non-blocking and returns immediately if no unsolicited event data is available. + Events are returned in FIFO order of reception. To receive multiple events, call this method + repeatedly until no more data is available. + + Returns: + bytes: The received event data as bytes, or empty b'' if no data is available. + """ + + # dequeue the next event, if any + try: + data = self._async_events_queue.get_nowait() + return data + except asyncio.QueueEmpty: + pass + + # when connected, receive any pending unsolicited events incoming from the peer (and queue these) + while self.is_connected: + async with self._recv_lock: + data = await self._recv(timeout=0.0) + if len(data) == 0: + break + await self._async_events_queue.put(data) + + # dequeue the next event, if any + try: + data = self._async_events_queue.get_nowait() + return data + except asyncio.QueueEmpty: + return b'' + + async def close(self, timeout: float = 5.0): + try: + if self.is_connected: + logger.debug('sending Disconnect command TLV') + data = TLV(TcatTLVType.DISCONNECT.value, bytes()).to_bytes() + try: + await self._send(data) + except asyncio.CancelledError: + raise + except Exception as err: + logger.warning(f'Failed to send Disconnect command TLV: {err}') + logger.debug(err, exc_info=True) + + async with self._recv_lock: + await self._close_tls_gracefully(timeout=timeout) # send out Alert after the command. + + finally: self._peer_public_key = None - await self.send(data) + self.peer_challenge = None + self.ssl_object = None + + # Closes the TLS connection by repeated calls to unwrap() until it succeeds or times out. + # Precondition: must only be called by close() with _recv_lock acquired, or by _closed_by_peer() + async def _close_tls_gracefully(self, timeout: float = 5.0, buffersize: int = 4096): + try: + async with asyncio.timeout(timeout): + while True: + try: + self.ssl_object.unwrap() + send_data = self.outgoing.read() # send final Alert (if any) + if send_data: + await self.stream.send(send_data) + self._close_notify_sent = True + self._close_notify_received = True + break + + except ssl.SSLWantReadError: + recv_data = await self.stream.recv(buffersize) + if recv_data: + self.incoming.write(recv_data) + else: + await asyncio.sleep(0.020) # small pause to allow asyncio.timeout to occur + except ssl.SSLWantWriteError: + send_data = self.outgoing.read() + if send_data: + await self.stream.send(send_data) + + except asyncio.TimeoutError: + logger.warning(f'TLS closing procedure timed out (timeout={timeout} s).') + + except Exception as err: + logger.warning(f'TLS closing procedure incomplete: {err}') + logger.debug(err, exc_info=True) + + finally: + self.ssl_object = None + + @property + def is_connected(self): + return not self._close_notify_sent and not self._close_notify_received and \ + self._peer_public_key is not None and self.ssl_object is not None @property def peer_public_key(self): @@ -215,7 +359,7 @@ class BleStreamSecure: try: cc = self.ssl_object._sslobj.get_unverified_chain() if cc is None: - logger.info('No TCAT Device cert chain was received (yet).') + logger.warning('No TCAT Device cert chain was received (yet).') return logger.info(f'TCAT Device cert chain: {len(cc)} certificates received.') for cert in cc: @@ -224,5 +368,7 @@ class BleStreamSecure: logger.info(f' base64: (paste in https://lapo.it/asn1js/ to decode)\n{peer_cert_der_hex}') logger.info(f'TCAT Commissioner cert, PEM:\n{self.cert}') + except asyncio.CancelledError: + raise except Exception as e: logger.warning('Could not display TCAT client cert info (check Python version is >= 3.10?)') diff --git a/tools/tcat_ble_client/ble/udp_stream.py b/tools/tcat_ble_client/ble/udp_stream.py index 4d4723d89..2d3b980a0 100644 --- a/tools/tcat_ble_client/ble/udp_stream.py +++ b/tools/tcat_ble_client/ble/udp_stream.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -35,7 +35,7 @@ logger = logging.getLogger(__name__) class UdpStream: BASE_PORT = 10000 - MAX_SERVER_TIMEOUT_SEC = 10 + MAX_SERVER_TIMEOUT_SEC = 0.010 def __init__(self, address, node_id): self.__receive_buffer = b'' @@ -58,4 +58,8 @@ class UdpStream: logger.debug(f'rx {len(data)} bytes') return data else: - raise socket.timeout('simulation UdpStream recv timeout - likely, TCAT is stopped on TCAT Device') + return b'' + + async def disconnect(self): + if self.socket is not None: + self.socket.close() diff --git a/tools/tcat_ble_client/cli/base_commands.py b/tools/tcat_ble_client/cli/base_commands.py index bf500d158..1954e2b7a 100644 --- a/tools/tcat_ble_client/cli/base_commands.py +++ b/tools/tcat_ble_client/cli/base_commands.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2026, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -27,33 +27,38 @@ """ from abc import abstractmethod -from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \ - BBTC_RX_CHAR_UUID +import asyncio +from hashlib import sha256 +import hmac +import logging +from os import path +from secrets import token_bytes + +from bleak import BLEDevice + +from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID from ble.ble_stream import BleStream from ble.ble_stream_secure import BleStreamSecure from ble import ble_scanner +from ble.udp_stream import UdpStream +from cli.command import Command, CommandResultNone, CommandResultTLV, CommandResult, CommandResultError +from dataset.dataset import ThreadDataset from tlv.tlv import TLV from tlv.diagnostic_tlv import DiagnosticTLVType from tlv.tcat_tlv import TcatTLVType -from cli.command import Command, CommandResultNone, CommandResultTLV -from dataset.dataset import ThreadDataset from utils import select_device_by_user_input -from os import path -from time import time -from secrets import token_bytes -from hashlib import sha256 -import hmac -import binascii CHALLENGE_SIZE = 8 +logger = logging.getLogger(__name__) + class HelpCommand(Command): def get_help_string(self) -> str: return 'Display help and return.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: commands = context['commands'] for name, command in commands.items(): print(f'{name}') @@ -72,13 +77,12 @@ class BleCommand(Command): pass @abstractmethod - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: pass - async def execute_default(self, args, context): - if 'ble_sstream' not in context or context['ble_sstream'] is None: - print("TCAT Device not connected.") - return CommandResultNone() + async def execute_default(self, args, context) -> CommandResult: + if context['ble_sstream'] is None or not context['ble_sstream'].is_connected: + return CommandResultError("TCAT Device not connected.") bless: BleStreamSecure = context['ble_sstream'] print(self.get_log_string()) @@ -86,13 +90,12 @@ class BleCommand(Command): data = self.prepare_data(args, context) response = await bless.send_with_resp(data) if not response: - return + return CommandResultNone() tlv_response = TLV.from_bytes(response) self.process_response(tlv_response, context) return CommandResultTLV(tlv_response) except DataNotPrepared as err: - print('Command failed', err) - return CommandResultNone() + return CommandResultError(f'Command failed: {err}') def process_response(self, tlv_response, context): pass @@ -106,7 +109,7 @@ class HelloCommand(BleCommand): def get_help_string(self) -> str: return 'Send round trip "Hello world!" message.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.VENDOR_APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes() @@ -118,7 +121,7 @@ class GetApplicationLayersCommand(BleCommand): def get_help_string(self) -> str: return 'Get supported application layer service names from device.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.GET_APPLICATION_LAYERS.value, bytes()).to_bytes() def process_response(self, tlv_response, context): @@ -148,7 +151,7 @@ class SendApplicationData1(BleCommand): def get_help_string(self) -> str: return 'Send hex encoded data to application layer 1.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: payload = bytes.fromhex(args[0]) return TLV(TcatTLVType.APPLICATION_DATA_1.value, payload).to_bytes() @@ -161,7 +164,7 @@ class SendApplicationData2(BleCommand): def get_help_string(self) -> str: return 'Send hex encoded data to application layer 2.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: payload = bytes.fromhex(args[0]) return TLV(TcatTLVType.APPLICATION_DATA_2.value, payload).to_bytes() @@ -174,7 +177,7 @@ class SendApplicationData3(BleCommand): def get_help_string(self) -> str: return 'Send hex encoded data to application layer 3.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: payload = bytes.fromhex(args[0]) return TLV(TcatTLVType.APPLICATION_DATA_3.value, payload).to_bytes() @@ -187,7 +190,7 @@ class SendApplicationData4(BleCommand): def get_help_string(self) -> str: return 'Send hex encoded data to application layer 4.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: payload = bytes.fromhex(args[0]) return TLV(TcatTLVType.APPLICATION_DATA_4.value, payload).to_bytes() @@ -200,7 +203,7 @@ class SendVendorData(BleCommand): def get_help_string(self) -> str: return 'Send hex encoded data to vendor specific application layer.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: payload = bytes.fromhex(args[0]) return TLV(TcatTLVType.VENDOR_APPLICATION.value, payload).to_bytes() @@ -213,7 +216,7 @@ class CommissionCommand(BleCommand): def get_help_string(self) -> str: return 'Update the connected device with current dataset.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: dataset: ThreadDataset = context['dataset'] dataset_bytes = dataset.to_bytes() return TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes() @@ -227,7 +230,7 @@ class DecommissionCommand(BleCommand): def get_help_string(self) -> str: return 'Stop Thread interface and decommission device from current network.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.DECOMMISSION.value, bytes()).to_bytes() @@ -236,11 +239,8 @@ class DisconnectCommand(Command): def get_help_string(self) -> str: return 'Disconnect client from TCAT device' - async def execute_default(self, args, context): - if 'ble_sstream' not in context or context['ble_sstream'] is None: - print("TCAT Device not connected.") - return CommandResultNone() - await context['ble_sstream'].close() + async def execute_default(self, args, context) -> CommandResult: + await disconnect_helper(context) return CommandResultNone() @@ -252,10 +252,10 @@ class ExtractDatasetCommand(BleCommand): def get_help_string(self) -> str: return 'Get active dataset from device.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.GET_ACTIVE_DATASET.value, bytes()).to_bytes() - def process_response(self, tlv_response, context): + def process_response(self, tlv_response, context) -> None: if tlv_response.type == TcatTLVType.RESPONSE_W_PAYLOAD.value: dataset = ThreadDataset() dataset.set_from_bytes(tlv_response.value) @@ -270,9 +270,9 @@ class GetCommissionerCertificate(BleCommand): return 'Getting commissioner certificate.' def get_help_string(self) -> str: - return 'Get commissioner certificate from device.' + return 'Get last-store TCAT commissioner certificate from TCAT device.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.GET_COMMISSIONER_CERTIFICATE.value, bytes()).to_bytes() @@ -284,7 +284,7 @@ class GetDeviceIdCommand(BleCommand): def get_help_string(self) -> str: return 'Get unique identifier for the TCAT device.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.GET_DEVICE_ID.value, bytes()).to_bytes() @@ -296,7 +296,7 @@ class GetExtPanIDCommand(BleCommand): def get_help_string(self) -> str: return 'Get extended PAN ID that is commissioned in the active dataset.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.GET_EXT_PAN_ID.value, bytes()).to_bytes() @@ -308,7 +308,7 @@ class GetProvisioningUrlCommand(BleCommand): def get_help_string(self) -> str: return 'Get a URL for an application suited to commission the TCAT device.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.GET_PROVISIONING_URL.value, bytes()).to_bytes() @@ -320,19 +320,23 @@ class GetNetworkNameCommand(BleCommand): def get_help_string(self) -> str: return 'Get the Thread network name that is commissioned in the active dataset.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.GET_NETWORK_NAME.value, bytes()).to_bytes() class GetPskdHash(BleCommand): + def __init__(self): + super().__init__() + self.digest = None + def get_log_string(self) -> str: return 'Retrieving peer PSKd hash.' def get_help_string(self) -> str: return 'Get calculated PSKd hash.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: bless: BleStreamSecure = context['ble_sstream'] if bless.peer_public_key is None: raise DataNotPrepared("Peer certificate not present.") @@ -348,7 +352,7 @@ class GetPskdHash(BleCommand): self.digest = hash.digest() return data - def process_response(self, tlv_response, context): + def process_response(self, tlv_response, context) -> None: if tlv_response.value == self.digest: print('Requested hash is valid.') else: @@ -363,17 +367,16 @@ class GetRandomNumberChallenge(BleCommand): def get_help_string(self) -> str: return 'Get the device random number challenge.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.GET_RANDOM_NUMBER_CHALLENGE.value, bytes()).to_bytes() - def process_response(self, tlv_response, context): + def process_response(self, tlv_response, context) -> None: bless: BleStreamSecure = context['ble_sstream'] - if tlv_response.value != None: + if tlv_response.value is not None: if len(tlv_response.value) == CHALLENGE_SIZE: bless.peer_challenge = tlv_response.value else: print('Challenge format invalid.') - return CommandResultNone() class PingCommand(Command): @@ -381,26 +384,27 @@ class PingCommand(Command): def get_help_string(self) -> str: return 'Send echo request to TCAT device.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: bless: BleStreamSecure = context['ble_sstream'] + if bless is None or not bless.is_connected: + return CommandResultError("TCAT Device not connected.") payload_size = 10 max_payload = 512 if len(args) > 0: payload_size = int(args[0]) if payload_size > max_payload: - print(f'Payload size too large. Maximum supported value is {max_payload}') - return CommandResultNone() + return CommandResultError(f'Payload size too large. Maximum supported value is {max_payload}') to_send = token_bytes(payload_size) data = TLV(TcatTLVType.PING.value, to_send).to_bytes() - elapsed_time = time() + start_time = asyncio.get_running_loop().time() response = await bless.send_with_resp(data) - elapsed_time = 1e3 * (time() - elapsed_time) + elapsed_time = 1e3 * (asyncio.get_running_loop().time() - start_time) if not response: return CommandResultNone() tlv_response = TLV.from_bytes(response) if tlv_response.value != to_send: - print("Received malformed response.") + print("Error: Ping response payload mismatch.") print(f"Roundtrip time: {elapsed_time} ms") @@ -415,7 +419,7 @@ class PresentHash(BleCommand): def get_help_string(self) -> str: return 'Present calculated hash.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: type = args[0] code = None tlv_type = None @@ -445,40 +449,127 @@ class PresentHash(BleCommand): return data +def _handshake_progress_bar(is_concluded: bool): + if is_concluded: + print('') + else: + print('.', end='', flush=True) + + +async def connect_helper(device: BLEDevice | UdpStream, + context: dict, + timeout_ble=30.0, + timeout_simulation=5.0) -> bool: + """Helper function for CLI and commands to establish a new secure connection with a TCAT device. + + Handles both BLE and simulated UDP connections. Loads certificates and performs handshake + to establish a secure channel. Connection objects are stored in the context dictionary. + + Args: + device: BLEDevice object, or UdpStream (for simulation) + context: Dictionary containing application context including command line arguments + timeout_ble: Timeout in seconds for handshake with real TCAT device (default: 30.0) + timeout_simulation: Timeout in seconds for handshake for simulated TCAT device (default: 5.0) + + Returns: + True if connection was successful, False otherwise. + + Raises: + Exception: If certificates cannot be loaded + """ + is_simulation = isinstance(device, UdpStream) + is_debug = logger.getEffectiveLevel() <= logging.DEBUG + + print(f'Connecting to {device}') + if not is_simulation: + ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID) + else: + ble_stream = device + ble_sstream = BleStreamSecure(ble_stream) + context['ble_sstream'] = ble_sstream + context['ble_stream'] = ble_stream + + cert_path = context['cmd_args'].cert_path if context['cmd_args'] else 'auth' + ble_sstream.load_cert( + certfile=path.join(cert_path, 'commissioner_cert.pem'), + keyfile=path.join(cert_path, 'commissioner_key.pem'), + cafile=path.join(cert_path, 'ca_cert.pem'), + ) + logger.info(f"Certificates and key loaded from '{cert_path}'") + + print('Setting up secure channel...') + ok = False + try: + cb = None + if not is_debug: + cb = _handshake_progress_bar + ok = await ble_sstream.do_handshake(progress_callback=cb, + timeout=timeout_simulation if is_simulation else timeout_ble) + except Exception as e: + logger.error(e) + + if ok: + print('Done') + return True + else: + if ble_stream is not None: + await ble_stream.disconnect() + context['ble_stream'] = None + context['ble_sstream'] = None + return False + + +async def disconnect_helper(context: dict) -> None: + """Helper function for CLI and commands to disconnect from a TCAT device.""" + bless: BleStreamSecure = context['ble_sstream'] + doing_disconn = False + if bless is not None and bless.is_connected: + print('Disconnecting...') + doing_disconn = True + logger.debug('Closing TLS connection.') + await bless.close(timeout=5.0) + context['ble_sstream'] = None + + bles = context['ble_stream'] + if bles is not None: + logger.debug('Closing BLE connection.') + await bles.disconnect() + context['ble_stream'] = None + if doing_disconn: + print('Done') + + class ScanCommand(Command): def get_help_string(self) -> str: return 'Perform scan for TCAT devices.' - async def execute_default(self, args, context): - if 'ble_sstream' in context and context['ble_sstream'] is not None: - context['ble_sstream'].close() - del context['ble_sstream'] + async def execute_default(self, args, context) -> CommandResult: + if context['ble_sstream'] is not None and context['ble_sstream'].is_connected: + return CommandResultError('already connected to a TCAT device. Use \'disconnect\' first.') + print('Scanning for BLE TCAT devices...') tcat_devices = await ble_scanner.scan_tcat_devices() device = select_device_by_user_input(tcat_devices) + if device is not None: + await connect_helper(device, context) - if device is None: - return CommandResultNone() + return CommandResultNone() - ble_sstream = None - print(f'Connecting to {device}') - ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID) - ble_sstream = BleStreamSecure(ble_stream) - cert_path = context['cmd_args'].cert_path if context['cmd_args'] else 'auth' - ble_sstream.load_cert( - certfile=path.join(cert_path, 'commissioner_cert.pem'), - keyfile=path.join(cert_path, 'commissioner_key.pem'), - cafile=path.join(cert_path, 'ca_cert.pem'), - ) - print('Setting up secure channel...') - if await ble_sstream.do_handshake(): - print('Done') - context['ble_sstream'] = ble_sstream - else: - print('Secure channel not established.') - await ble_stream.disconnect() +class SimulationCommand(Command): + + def get_help_string(self) -> str: + return 'Connect to a simulated TCAT device over UDP.' + + async def execute_default(self, args, context) -> CommandResult: + if len(args) != 1: + return CommandResultError('need index number of simulated TCAT device as first argument.') + if context['ble_sstream'] is not None and context['ble_sstream'].is_connected: + return CommandResultError('already connected to a TCAT device. Use \'disconnect\' first.') + + device = UdpStream("127.0.0.1", int(args[0])) + await connect_helper(device, context) return CommandResultNone() @@ -490,7 +581,7 @@ class DiagnosticTlvsCommand(BleCommand): def get_help_string(self) -> str: return 'Get diagnostic TLVs from the TCAT device.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: num_args = DiagnosticTLVType.names_to_numbers(args) try: if not num_args: @@ -515,7 +606,7 @@ class ThreadStartCommand(BleCommand): def get_help_string(self) -> str: return 'Enable thread interface.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes() @@ -527,18 +618,19 @@ class ThreadStopCommand(BleCommand): def get_help_string(self) -> str: return 'Disable thread interface.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes() class ThreadStateCommand(Command): def __init__(self): + super().__init__() self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()} def get_help_string(self) -> str: return 'Manipulate state of the Thread interface of the connected device.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: print('Invalid usage. Provide a subcommand.') return CommandResultNone() diff --git a/tools/tcat_ble_client/cli/cli.py b/tools/tcat_ble_client/cli/cli.py index 1b7317051..9de33dec8 100644 --- a/tools/tcat_ble_client/cli/cli.py +++ b/tools/tcat_ble_client/cli/cli.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -25,28 +25,31 @@ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ + +from argparse import Namespace +import logging import readline import shlex -from argparse import Namespace -from ble.ble_stream_secure import BleStreamSecure +from typing import Optional + from cli.base_commands import (DisconnectCommand, HelpCommand, HelloCommand, CommissionCommand, DecommissionCommand, ExtractDatasetCommand, GetCommissionerCertificate, GetDeviceIdCommand, GetPskdHash, GetExtPanIDCommand, GetNetworkNameCommand, GetProvisioningUrlCommand, PingCommand, GetRandomNumberChallenge, ThreadStateCommand, ScanCommand, PresentHash, DiagnosticTlvsCommand, GetApplicationLayersCommand, SendVendorData, - SendApplicationData1, SendApplicationData2, SendApplicationData3, SendApplicationData4) + SendApplicationData1, SendApplicationData2, SendApplicationData3, SendApplicationData4, + SimulationCommand, connect_helper, disconnect_helper) +from .command import CommandResultNone, CommandResult from .tlv_commands import TlvCommand from cli.dataset_commands import (DatasetCommand) from dataset.dataset import ThreadDataset -from typing import Optional + +logger = logging.getLogger(__name__) class CLI: - def __init__(self, - dataset: ThreadDataset, - cmd_args: Optional[Namespace] = None, - ble_sstream: Optional[BleStreamSecure] = None): + def __init__(self, dataset: ThreadDataset, cmd_args: Optional[Namespace] = None): self._commands = { 'help': HelpCommand(), 'hello': HelloCommand(), @@ -68,6 +71,7 @@ class CLI: 'get_dataset': ExtractDatasetCommand(), 'thread': ThreadStateCommand(), 'scan': ScanCommand(), + 'simulation': SimulationCommand(), 'random_challenge': GetRandomNumberChallenge(), 'present_hash': PresentHash(), 'peer_pskd_hash': GetPskdHash(), @@ -75,8 +79,9 @@ class CLI: 'get_comm_cert': GetCommissionerCertificate(), 'diagnostic_tlvs': DiagnosticTlvsCommand() } - self._context = { - 'ble_sstream': ble_sstream, + self.context = { + 'ble_sstream': None, # BleStreamSecure | None + 'ble_stream': None, # BleStream | None 'dataset': dataset, 'commands': self._commands, 'cmd_args': cmd_args @@ -116,10 +121,10 @@ class CLI: else: return None - async def evaluate_input(self, user_input): + async def evaluate_input(self, user_input) -> CommandResult: # do not parse empty commands if not user_input.strip(): - return + return CommandResultNone() command_parts = shlex.split(user_input) command = command_parts[0] @@ -128,4 +133,16 @@ class CLI: if command not in self._commands.keys(): raise Exception('Invalid command: {}'.format(command)) - return await self._commands[command].execute(args, self._context) + return await self._commands[command].execute(args, self.context) + + async def connect(self, device) -> bool: + """ + Connect with TLS to the BLE/simulation device. + :param device: the BLE device object or simulation UdpStream object + :return: True if connection was successful, False otherwise + """ + return await connect_helper(device, self.context) + + async def disconnect(self): + """ Disconnect from the BLE/simulation device. """ + await disconnect_helper(self.context) diff --git a/tools/tcat_ble_client/cli/command.py b/tools/tcat_ble_client/cli/command.py index da858f358..720c2aa82 100644 --- a/tools/tcat_ble_client/cli/command.py +++ b/tools/tcat_ble_client/cli/command.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,11 +26,12 @@ POSSIBILITY OF SUCH DAMAGE. """ +from abc import ABC, abstractmethod + from tlv.tlv import TLV from tlv.tcat_tlv import TcatTLVType from ble.ble_stream_secure import BleStreamSecure - -from abc import ABC, abstractmethod +from utils import is_printable_ascii class CommandResult(ABC): @@ -58,7 +59,7 @@ class Command(ABC): return await self._subcommands[args[0]].execute(args[1:], context) @abstractmethod - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: pass @abstractmethod @@ -91,9 +92,27 @@ class CommandResultTLV(CommandResult): print(f'\tTYPE:\tunknown: {hex(tlv.type)} ({tlv.type})') print(f'\tLEN:\t{len(tlv.value)}') print(f'\tVALUE:\t0x{tlv.value.hex()}') + if is_printable_ascii(tlv.value): + print(f'\tSTRING:\t{tlv.value.decode("ascii")}') class CommandResultNone(CommandResult): def pretty_print(self): pass + + +class CommandResultDone(CommandResult): + + def pretty_print(self): + print("Done") + + +class CommandResultError(CommandResult): + + def __init__(self, message): + super().__init__() + self.message = message + + def pretty_print(self): + print(f"Error: {self.message}") diff --git a/tools/tcat_ble_client/cli/dataset_commands.py b/tools/tcat_ble_client/cli/dataset_commands.py index 8fd268ea8..9b36521bb 100644 --- a/tools/tcat_ble_client/cli/dataset_commands.py +++ b/tools/tcat_ble_client/cli/dataset_commands.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,21 +26,21 @@ POSSIBILITY OF SUCH DAMAGE. """ -from cli.command import Command, CommandResultNone -from dataset.dataset import ThreadDataset, initial_dataset -from tlv.dataset_tlv import MeshcopTlvType from copy import deepcopy +from cli.command import Command, CommandResult, CommandResultNone, CommandResultDone +from dataset.dataset import ThreadDataset, initial_dataset +from tlv.dataset_tlv import MeshcopTlvType -def handle_dataset_entry_command(type: MeshcopTlvType, args, context): + +def handle_dataset_entry_command(type: MeshcopTlvType, args, context) -> CommandResult: ds: ThreadDataset = context['dataset'] if len(args) == 0: ds.get_entry(type).print_content() return CommandResultNone() ds.set_entry(type, args) - print('Done.') - return CommandResultNone() + return CommandResultDone() class DatasetClearCommand(Command): @@ -48,7 +48,7 @@ class DatasetClearCommand(Command): def get_help_string(self) -> str: return 'Clear dataset.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: ds: ThreadDataset = context['dataset'] ds.clear() return CommandResultNone() @@ -59,7 +59,7 @@ class DatasetHelpCommand(Command): def get_help_string(self) -> str: return 'Display help message and return.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: indent_width = 4 indentation = ' ' * indent_width commands: ThreadDataset = context['commands'] @@ -77,7 +77,7 @@ class DatasetHexCommand(Command): def get_help_string(self) -> str: return 'Get or set dataset as hex-encoded TLVs.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: ds: ThreadDataset = context['dataset'] if args: try: @@ -98,7 +98,7 @@ class ReloadDatasetCommand(Command): def get_help_string(self) -> str: return 'Reset dataset to the initial value.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: context['dataset'].set_from_bytes(initial_dataset) return CommandResultNone() @@ -108,7 +108,7 @@ class ActiveTimestampCommand(Command): def get_help_string(self) -> str: return 'View and set ActiveTimestamp seconds. Arguments: [seconds (int)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.ACTIVETIMESTAMP, args, context) @@ -117,7 +117,7 @@ class PendingTimestampCommand(Command): def get_help_string(self) -> str: return 'View and set PendingTimestamp seconds. Arguments: [seconds (int)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.PENDINGTIMESTAMP, args, context) @@ -126,7 +126,7 @@ class NetworkKeyCommand(Command): def get_help_string(self) -> str: return 'View and set NetworkKey. Arguments: [nk (hexstring, len=32)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.NETWORKKEY, args, context) @@ -135,7 +135,7 @@ class NetworkNameCommand(Command): def get_help_string(self) -> str: return 'View and set NetworkName. Arguments: [nn (string, maxlen=16)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.NETWORKNAME, args, context) @@ -144,7 +144,7 @@ class ExtPanIDCommand(Command): def get_help_string(self) -> str: return 'View and set ExtPanID. Arguments: [extpanid (hexstring, len=16)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.EXTPANID, args, context) @@ -153,7 +153,7 @@ class MeshLocalPrefixCommand(Command): def get_help_string(self) -> str: return 'View and set MeshLocalPrefix. Arguments: [mlp (hexstring, len=16)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.MESHLOCALPREFIX, args, context) @@ -162,7 +162,7 @@ class DelayTimerCommand(Command): def get_help_string(self) -> str: return 'View and set DelayTimer delay. Arguments: [delay (int)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.DELAYTIMER, args, context) @@ -171,7 +171,7 @@ class PanIDCommand(Command): def get_help_string(self) -> str: return 'View and set PanID. Arguments: [panid (hexstring, len=4)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.PANID, args, context) @@ -180,7 +180,7 @@ class ChannelCommand(Command): def get_help_string(self) -> str: return 'View and set Channel. Arguments: [channel (int)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.CHANNEL, args, context) @@ -189,7 +189,7 @@ class ChannelMaskCommand(Command): def get_help_string(self) -> str: return 'View and set ChannelMask. Arguments: [mask (hexstring)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.CHANNELMASK, args, context) @@ -198,7 +198,7 @@ class PskcCommand(Command): def get_help_string(self) -> str: return 'View and set Pskc. Arguments: [pskc (hexstring, maxlen=32)]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.PSKC, args, context) @@ -208,13 +208,14 @@ class SecurityPolicyCommand(Command): return 'View and set SecurityPolicy. Arguments: '\ '[ [flags (string)] [version_threshold (int)]]' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: return handle_dataset_entry_command(MeshcopTlvType.SECURITYPOLICY, args, context) class DatasetCommand(Command): def __init__(self): + super().__init__() self._subcommands = { 'clear': DatasetClearCommand(), 'help': DatasetHelpCommand(), @@ -238,7 +239,7 @@ class DatasetCommand(Command): return 'View and manipulate current dataset. ' \ 'Call without parameters to show current dataset.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: ds: ThreadDataset = context['dataset'] ds.print_content() return CommandResultNone() diff --git a/tools/tcat_ble_client/cli/tlv_commands.py b/tools/tcat_ble_client/cli/tlv_commands.py index 3beb985e0..3ec786caf 100644 --- a/tools/tcat_ble_client/cli/tlv_commands.py +++ b/tools/tcat_ble_client/cli/tlv_commands.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,7 +26,7 @@ POSSIBILITY OF SUCH DAMAGE. """ -from .base_commands import BleCommand, CommandResultNone, Command +from .base_commands import BleCommand, CommandResult, CommandResultNone, Command from tlv.tlv import TLV from tlv.tcat_tlv import TcatTLVType @@ -36,7 +36,7 @@ class TlvCommandList(Command): def get_help_string(self) -> str: return 'List available TLV types to use in \'tlv send\'.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: list_tlv = "\n".join([f"{tlv.value:#x}\t{tlv.name}" for tlv in TcatTLVType]) print(f"\n{list_tlv}") return CommandResultNone() @@ -50,7 +50,7 @@ class TlvCommandSend(BleCommand): def get_help_string(self) -> str: return 'Send TLV with arbitrary payload: \'tlv send \'.' - def prepare_data(self, args, context): + def prepare_data(self, args, context) -> bytes: tlv_type = TcatTLVType(int(args[0], 16)) tlv_value = bytes() try: @@ -64,11 +64,12 @@ class TlvCommandSend(BleCommand): class TlvCommand(Command): def __init__(self): + super().__init__() self._subcommands = {'list': TlvCommandList(), 'send': TlvCommandSend()} def get_help_string(self) -> str: return 'Send TLV with arbitrary payload.' - async def execute_default(self, args, context): + async def execute_default(self, args, context) -> CommandResult: self.print_help() return CommandResultNone() diff --git a/tools/tcat_ble_client/dataset/dataset.py b/tools/tcat_ble_client/dataset/dataset.py index 6c3d6be7d..d210c890e 100644 --- a/tools/tcat_ble_client/dataset/dataset.py +++ b/tools/tcat_ble_client/dataset/dataset.py @@ -1,17 +1,29 @@ """ - Copyright (c) 2023 Nordic Semiconductor ASA + Copyright (c) 2025, The OpenThread Authors. + All rights reserved. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holder nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. """ from typing import Dict, List diff --git a/tools/tcat_ble_client/dataset/dataset_entries.py b/tools/tcat_ble_client/dataset/dataset_entries.py index c1671f3db..60b2756d8 100644 --- a/tools/tcat_ble_client/dataset/dataset_entries.py +++ b/tools/tcat_ble_client/dataset/dataset_entries.py @@ -1,23 +1,35 @@ """ - Copyright (c) 2023 Nordic Semiconductor ASA + Copyright (c) 2025, The OpenThread Authors. + All rights reserved. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holder nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. """ -import struct -import inspect -from typing import List from abc import ABC, abstractmethod +import inspect +import struct +from typing import List from tlv.dataset_tlv import MeshcopTlvType from tlv.tlv import TLV @@ -461,7 +473,9 @@ class ChannelMask(DatasetEntry): class ChannelMaskEntry(DatasetEntry): def __init__(self): + super().__init__(MeshcopTlvType.CHANNELMASK) # Note: type not used in set_from_tlv / to_tlv self.channel_page = 0 + self.mask_length = 0 self.channel_mask: bytes = None def set(self, args: List[str]): diff --git a/tools/tcat_ble_client/pyproject.toml b/tools/tcat_ble_client/pyproject.toml index c14c5423e..66643165a 100644 --- a/tools/tcat_ble_client/pyproject.toml +++ b/tools/tcat_ble_client/pyproject.toml @@ -4,9 +4,10 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "tcat-ble-client" -version = "0.2.0" -description = "TCAT Commissioner (BBTC) client to configure TCAT devices" +version = "0.3.0" +description = "TCAT Commissioner (BBTC) test client to configure TCAT devices" authors = ["Piotr Jasinski ", "Esko Dijk "] +package-mode = false [tool.poetry.dependencies] python = "^3.10" diff --git a/tools/tcat_ble_client/tlv/tcat_tlv.py b/tools/tcat_ble_client/tlv/tcat_tlv.py index ce0604a1f..8a3e2ae7b 100644 --- a/tools/tcat_ble_client/tlv/tcat_tlv.py +++ b/tools/tcat_ble_client/tlv/tcat_tlv.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -31,6 +31,7 @@ from enum import Enum class TcatTLVType(Enum): RESPONSE_W_STATUS = 0x01 RESPONSE_W_PAYLOAD = 0x02 + RESPONSE_EVENT = 0x03 GET_NETWORK_NAME = 0x08 DISCONNECT = 0x09 PING = 0x0A diff --git a/tools/tcat_ble_client/utils/__init__.py b/tools/tcat_ble_client/utils/__init__.py index 2473fa8ea..7ffd5a3cc 100644 --- a/tools/tcat_ble_client/utils/__init__.py +++ b/tools/tcat_ble_client/utils/__init__.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2024, The OpenThread Authors. + Copyright (c) 2024-2025, The OpenThread Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -27,9 +27,12 @@ """ import base64 +import string from tlv import advertised_tlv +PRINTABLE_ASCII_BYTES = set(string.printable.encode('ascii')) + def get_int_in_range(min_value, max_value): while True: @@ -143,3 +146,17 @@ def superimpose_centered_string(background: str, foreground: str) -> str: end_index = start_index + len_fg return background[:start_index] + foreground + background[end_index:] + + +def is_printable_ascii(data: bytes) -> bool: + """ + Checks if a byte array contains only printable and human-readable ASCII characters. + + Args: + data: The byte array to check. + + Returns: + True if all bytes are printable ASCII, False otherwise. + """ + # The all() function stops as soon as it finds a failing case. + return all(byte_val in PRINTABLE_ASCII_BYTES for byte_val in data)