[tcat] implementation of TCAT general class commands (#10700)

Commit introduces implementation of missing general class commands:
- PresentPskdHash
- PresentPskcHash
- PresentInstallCodeHash
- RequestRandomNumChallenge
- RequestPskdHash

Also include minor fixes in Tcat python client and refactoring of expect
tests for tcat.
This commit is contained in:
Przemysław Bida
2024-10-02 20:10:16 +02:00
committed by GitHub
parent e120051e21
commit 213665cce0
21 changed files with 543 additions and 70 deletions
+2 -4
View File
@@ -87,12 +87,10 @@ async def main():
print('Setting up secure TLS channel..', end='')
try:
await ble_sstream.do_handshake()
print('\nDone')
ble_sstream.log_cert_identities()
print('Done')
except Exception as e:
print('\nFailed')
print('Failed')
logger.error(e)
ble_sstream.log_cert_identities()
quit_with_reason('TLS handshake failure')
ds = ThreadDataset()
@@ -32,6 +32,8 @@ import ssl
import sys
import logging
from cryptography.x509 import load_der_x509_certificate
from cryptography.hazmat.primitives.serialization import (Encoding, PublicFormat)
from tlv.tlv import TLV
from tlv.tcat_tlv import TcatTLVType
from time import time
@@ -49,6 +51,8 @@ class BleStreamSecure:
self.outgoing = ssl.MemoryBIO()
self.ssl_object = None
self.cert = ''
self.peer_challenge = None
self._peer_public_key = None
def load_cert(self, certfile='', keyfile='', cafile=''):
if certfile and keyfile:
@@ -102,6 +106,11 @@ class BleStreamSecure:
else:
print('TLS Connection timed out.')
return False
print('')
cert = self.ssl_object.getpeercert(True)
cert_obj = load_der_x509_certificate(cert)
self._peer_public_key = cert_obj.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
self.log_cert_identities()
return True
async def send(self, bytes):
@@ -142,8 +151,22 @@ class BleStreamSecure:
if self.ssl_object.session is not None:
logger.debug('sending Disconnect command TLV')
data = TLV(TcatTLVType.DISCONNECT.value, bytes()).to_bytes()
self.peer_challenge = None
self._peer_public_key = None
await self.send(data)
@property
def peer_public_key(self):
return self._peer_public_key
@property
def peer_challenge(self):
return self._peer_challenge
@peer_challenge.setter
def peer_challenge(self, value):
self._peer_challenge = value
def log_cert_identities(self):
# using the internal object of the ssl library is necessary to see the cert data in
# case of handshake failure - see https://sethmlarson.dev/experimental-python-3.10-apis-and-trust-stores
+131 -16
View File
@@ -40,6 +40,9 @@ from utils import select_device_by_user_input
from os import path
from time import time
from secrets import token_bytes
from hashlib import sha256
import hmac
import binascii
class HelpCommand(Command):
@@ -55,6 +58,10 @@ class HelpCommand(Command):
return CommandResultNone()
class DataNotPrepared(Exception):
pass
class BleCommand(Command):
@abstractmethod
@@ -62,19 +69,22 @@ class BleCommand(Command):
pass
@abstractmethod
def prepare_data(self, context):
def prepare_data(self, args, context):
pass
async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']
print(self.get_log_string())
data = self.prepare_data(context)
response = await bless.send_with_resp(data)
if not response:
return
tlv_response = TLV.from_bytes(response)
return CommandResultTLV(tlv_response)
try:
data = self.prepare_data(args, context)
response = await bless.send_with_resp(data)
if not response:
return
tlv_response = TLV.from_bytes(response)
return CommandResultTLV(tlv_response)
except DataNotPrepared as err:
print('Command failed', err)
class HelloCommand(BleCommand):
@@ -85,7 +95,7 @@ class HelloCommand(BleCommand):
def get_help_string(self) -> str:
return 'Send round trip "Hello world!" message.'
def prepare_data(self, context):
def prepare_data(self, args, context):
return TLV(TcatTLVType.APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes()
@@ -97,7 +107,7 @@ class CommissionCommand(BleCommand):
def get_help_string(self) -> str:
return 'Update the connected device with current dataset.'
def prepare_data(self, context):
def prepare_data(self, args, context):
dataset: ThreadDataset = context['dataset']
dataset_bytes = dataset.to_bytes()
return TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes()
@@ -111,7 +121,7 @@ class DecommissionCommand(BleCommand):
def get_help_string(self) -> str:
return 'Stop Thread interface and decommission device from current network.'
def prepare_data(self, context):
def prepare_data(self, args, context):
return TLV(TcatTLVType.DECOMMISSION.value, bytes()).to_bytes()
@@ -123,7 +133,7 @@ class GetDeviceIdCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get unique identifier for the TCAT device.'
def prepare_data(self, context):
def prepare_data(self, args, context):
return TLV(TcatTLVType.GET_DEVICE_ID.value, bytes()).to_bytes()
@@ -135,7 +145,7 @@ class GetExtPanIDCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get extended PAN ID that is commissioned in the active dataset.'
def prepare_data(self, context):
def prepare_data(self, args, context):
return TLV(TcatTLVType.GET_EXT_PAN_ID.value, bytes()).to_bytes()
@@ -147,7 +157,7 @@ class GetProvisioningUrlCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get a URL for an application suited to commission the TCAT device.'
def prepare_data(self, context):
def prepare_data(self, args, context):
return TLV(TcatTLVType.GET_PROVISIONING_URL.value, bytes()).to_bytes()
@@ -159,10 +169,115 @@ class GetNetworkNameCommand(BleCommand):
def get_help_string(self) -> str:
return 'Get the Thread network name that is commissioned in the active dataset.'
def prepare_data(self, context):
def prepare_data(self, args, context):
return TLV(TcatTLVType.GET_NETWORK_NAME.value, bytes()).to_bytes()
class PresentHash(BleCommand):
def get_log_string(self) -> str:
return 'Presenting hash.'
def get_help_string(self) -> str:
return 'Present calculated hash.'
def prepare_data(self, args, context):
type = args[0]
code = None
tlv_type = None
if type == "pskd":
code = bytes(args[1], 'utf-8')
tlv_type = TcatTLVType.PRESENT_PSKD_HASH.value
elif type == "pskc":
code = bytes.fromhex(args[1])
tlv_type = TcatTLVType.PRESENT_PSKC_HASH.value
elif type == "install":
code = bytes(args[1], 'utf-8')
tlv_type = TcatTLVType.PRESENT_INSTALL_CODE_HASH.value
else:
raise DataNotPrepared("Hash code name incorrect.")
bless: BleStreamSecure = context['ble_sstream']
if bless.peer_public_key is None:
raise DataNotPrepared("Peer certificate not present.")
if bless.peer_challenge is None:
raise DataNotPrepared("Peer challenge not present.")
hash = hmac.new(code, digestmod=sha256)
hash.update(bless.peer_challenge)
hash.update(bless.peer_public_key)
data = TLV(tlv_type, hash.digest()).to_bytes()
return data
class GetPskdHash(Command):
def get_log_string(self) -> str:
return 'Retrieving peer PSKd hash.'
def get_help_string(self) -> str:
return 'Get calculated PSKd hash.'
async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']
print(self.get_log_string())
try:
if bless.peer_public_key is None:
print("Peer certificate not present.")
return
challenge_size = 8
challenge = token_bytes(challenge_size)
pskd = bytes(args[0], 'utf-8')
data = TLV(TcatTLVType.GET_PSKD_HASH.value, challenge).to_bytes()
response = await bless.send_with_resp(data)
if not response:
return
tlv_response = TLV.from_bytes(response)
if tlv_response.value != None:
hash = hmac.new(pskd, digestmod=sha256)
hash.update(challenge)
hash.update(bless.peer_public_key)
digest = hash.digest()
if digest == tlv_response.value:
print('Requested hash is valid.')
else:
print('Requested hash is NOT valid.')
return CommandResultTLV(tlv_response)
except DataNotPrepared as err:
print('Command failed', err)
class GetRandomNumberChallenge(Command):
def get_log_string(self) -> str:
return 'Retrieving random challenge.'
def get_help_string(self) -> str:
return 'Get the device random number challenge.'
async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']
print(self.get_log_string())
try:
data = TLV(TcatTLVType.GET_RANDOM_NUMBER_CHALLENGE.value, bytes()).to_bytes()
response = await bless.send_with_resp(data)
if not response:
return
tlv_response = TLV.from_bytes(response)
if tlv_response.value != None:
if len(tlv_response.value) == 8:
bless.peer_challenge = tlv_response.value
else:
print('Challenge format invalid.')
return CommandResultNone()
return CommandResultTLV(tlv_response)
except DataNotPrepared as err:
print('Command failed', err)
class PingCommand(Command):
def get_help_string(self) -> str:
@@ -202,7 +317,7 @@ class ThreadStartCommand(BleCommand):
def get_help_string(self) -> str:
return 'Enable thread interface.'
def prepare_data(self, context):
def prepare_data(self, args, context):
return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes()
@@ -214,7 +329,7 @@ class ThreadStopCommand(BleCommand):
def get_help_string(self) -> str:
return 'Disable thread interface.'
def prepare_data(self, context):
def prepare_data(self, args, context):
return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes()
+5 -2
View File
@@ -30,8 +30,8 @@ import shlex
from argparse import ArgumentParser
from ble.ble_stream_secure import BleStreamSecure
from cli.base_commands import (HelpCommand, HelloCommand, CommissionCommand, DecommissionCommand, GetDeviceIdCommand,
GetExtPanIDCommand, GetNetworkNameCommand, GetProvisioningUrlCommand, PingCommand,
ThreadStateCommand, ScanCommand)
GetPskdHash, GetExtPanIDCommand, GetNetworkNameCommand, GetProvisioningUrlCommand,
PingCommand, GetRandomNumberChallenge, ThreadStateCommand, ScanCommand, PresentHash)
from cli.dataset_commands import (DatasetCommand)
from dataset.dataset import ThreadDataset
from typing import Optional
@@ -56,6 +56,9 @@ class CLI:
'dataset': DatasetCommand(),
'thread': ThreadStateCommand(),
'scan': ScanCommand(),
'random_challenge': GetRandomNumberChallenge(),
'present_hash': PresentHash(),
'peer_pskd_hash': GetPskdHash(),
}
self._context = {
'ble_sstream': ble_sstream,
+5
View File
@@ -37,6 +37,11 @@ class TcatTLVType(Enum):
GET_DEVICE_ID = 0x0B
GET_EXT_PAN_ID = 0x0C
GET_PROVISIONING_URL = 0x0D
PRESENT_PSKD_HASH = 0x10
PRESENT_PSKC_HASH = 0x11
PRESENT_INSTALL_CODE_HASH = 0x12
GET_RANDOM_NUMBER_CHALLENGE = 0x13
GET_PSKD_HASH = 0x14
ACTIVE_DATASET = 0x20
DECOMMISSION = 0x60
APPLICATION = 0x82