[tcat] improve TCAT Commissioner output logging for SSL errors (#11906)

This commit provides more structured logging for ssl.py errors, and
displays the OpenSSL verify error code. This is used for certification
to validate reasons of handshake failure.
This commit is contained in:
Esko Dijk
2025-10-06 19:02:40 +02:00
committed by GitHub
parent e77d798bcb
commit 911820471e
5 changed files with 102 additions and 43 deletions
+18 -3
View File
@@ -71,6 +71,7 @@ async def main():
logging.getLogger('ble.ble_stream_secure').setLevel(logging.INFO)
logging.getLogger('ble.udp_stream').setLevel(logging.INFO)
is_debug = logger.getEffectiveLevel() <= logging.DEBUG
device = await get_device_by_args(args)
ble_sstream = None
@@ -86,12 +87,19 @@ async def main():
logger.info(f"Certificates and key loaded from '{args.cert_path}'")
print('Setting up secure TLS channel..', end='')
ok = False
try:
await ble_sstream.do_handshake()
print('Done')
cb = None
if not is_debug:
cb = handshake_progress_bar
ok = await ble_sstream.do_handshake(progress_callback=cb)
except Exception as e:
print('Failed')
logger.error(e)
if ok:
print('Done')
else:
print('Failed')
quit_with_reason('TLS handshake failure')
ds = ThreadDataset()
@@ -133,6 +141,13 @@ async def get_device_by_args(args):
return device
def handshake_progress_bar(is_concluded: bool):
if is_concluded:
print('')
else:
print('.', end='', flush=True)
if __name__ == '__main__':
try:
asyncio.run(main())
+3
View File
@@ -49,6 +49,9 @@ class BleStream:
self.tx_char_uuid = tx_char_uuid
self.rx_char_uuid = rx_char_uuid
def __str__(self):
return f"BleStream[{self.client}]"
async def __aenter__(self):
return self
+75 -37
View File
@@ -28,15 +28,15 @@
import _ssl
import asyncio
import ssl
import sys
import logging
import ssl
from time import time
from typing import Optional, Callable
from cryptography.x509 import load_der_x509_certificate
from cryptography.hazmat.primitives.serialization import (Encoding, PublicFormat)
from tlv.tlv import TLV
from tlv.tcat_tlv import TcatTLVType
from time import time
import utils
logger = logging.getLogger(__name__)
@@ -65,48 +65,86 @@ class BleStreamSecure:
if cafile:
self.ssl_context.load_verify_locations(cafile=cafile)
async def do_handshake(self, timeout=30.0):
is_debug = logger.getEffectiveLevel() <= logging.DEBUG
async def do_handshake(self, timeout=30.0, progress_callback: Optional[Callable[[bool], None]] = None) -> bool:
"""
Performs a TLS handshake with a TCAT Device, reporting progress via an optional callback.
Args:
timeout: The maximum time in seconds to wait for the handshake to complete.
progress_callback: A function that accepts one boolean argument:
- False (is_concluded=False): The handshake attempt is ongoing.
- True (is_concluded=True): The handshake attempt has concluded. This call is made
before any results/errors in do_handshake are logged.
Returns:
True if the TLS handshake was successful, False otherwise.
"""
self.ssl_object = self.ssl_context.wrap_bio(
incoming=self.incoming,
outgoing=self.outgoing,
server_side=False,
server_hostname=None,
)
start = time()
while (time() - start) < timeout:
try:
if not is_debug:
print('.', end='')
sys.stdout.flush()
self.ssl_object.do_handshake()
break
# SSLWantWrite means ssl wants to send data over the link,
# but might need a receive first
except ssl.SSLWantWriteError:
output = await self.stream.recv(4096)
if output:
self.incoming.write(output)
data = self.outgoing.read()
if data:
await self.stream.send(data)
await asyncio.sleep(0.02)
try:
start = time()
while (time() - start) < timeout:
try:
if progress_callback:
progress_callback(False)
self.ssl_object.do_handshake()
break
# SSLWantRead means ssl wants to receive data from the link,
# but might need to send first
except ssl.SSLWantReadError:
data = self.outgoing.read()
if data:
await self.stream.send(data)
output = await self.stream.recv(4096)
if output:
self.incoming.write(output)
await asyncio.sleep(0.02)
else:
print('TLS Connection timed out.')
return False
print('')
# SSLWantWrite means ssl wants to send data over the link,
# but might need a receive first
except ssl.SSLWantWriteError:
output = await self.stream.recv(4096)
if output:
self.incoming.write(output)
data = self.outgoing.read()
if data:
await self.stream.send(data)
await asyncio.sleep(0.02)
# SSLWantRead means ssl wants to receive data from the link,
# but might need to send first
except ssl.SSLWantReadError:
data = self.outgoing.read()
if data:
await self.stream.send(data)
output = await self.stream.recv(4096)
if output:
self.incoming.write(output)
await asyncio.sleep(0.02)
except ssl.SSLCertVerificationError as err:
if progress_callback:
progress_callback(True)
logger.error(
f'SSLCertVerificationError reason={err.reason} verify_code={err.verify_code} verify_msg="{err.verify_message}"'
)
return False
except ssl.SSLError as err:
if progress_callback:
progress_callback(True)
logger.error(f"SSLError reason={err.reason}")
return False
else:
if progress_callback:
progress_callback(True)
logger.error(f'TLS Connection timed out (timeout={timeout}s).')
return False
# also catch Exceptions here which may be raised in SSL-specific Exception handlers
except Exception as err:
if progress_callback:
progress_callback(True)
raise err
if progress_callback:
progress_callback(True)
cert = self.ssl_object.getpeercert(True)
cert_obj = load_der_x509_certificate(cert)
self._peer_public_key = cert_obj.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
+4 -1
View File
@@ -44,6 +44,9 @@ class UdpStream:
self.socket.setblocking(False)
self.address = (address, self.BASE_PORT + node_id)
def __str__(self):
return f"UdpStream[{self.address[0]}:{self.address[1]}]"
async def send(self, data):
logger.debug(f'tx {len(data)} bytes')
return self.socket.sendto(data, self.address)
@@ -55,4 +58,4 @@ class UdpStream:
logger.debug(f'rx {len(data)} bytes')
return data
else:
raise Exception('simulation UdpStream recv timeout - likely, TCAT is stopped on TCAT Device')
raise socket.timeout('simulation UdpStream recv timeout - likely, TCAT is stopped on TCAT Device')
+2 -2
View File
@@ -27,7 +27,7 @@
"""
import readline
import shlex
from argparse import ArgumentParser
from argparse import Namespace
from ble.ble_stream_secure import BleStreamSecure
from cli.base_commands import (DisconnectCommand, HelpCommand, HelloCommand, CommissionCommand, DecommissionCommand,
ExtractDatasetCommand, GetCommissionerCertificate, GetDeviceIdCommand, GetPskdHash,
@@ -45,7 +45,7 @@ class CLI:
def __init__(self,
dataset: ThreadDataset,
cmd_args: Optional[ArgumentParser] = None,
cmd_args: Optional[Namespace] = None,
ble_sstream: Optional[BleStreamSecure] = None):
self._commands = {
'help': HelpCommand(),