[tcat] implement extraction of active dataset and commissioner cert (#10991)

Commit adds implementation of:
	- 0x40 Tcat tlv extraction of active dataset,
	- 0x25 Tcat tlv extraction of commissioner certificate.

Includes also refactoring of `BleCommand` adds new method `process_response`.
This simplifies:
- `GetPskdHash`
- `GetRandomNumberChallenge`
This commit is contained in:
Przemysław Bida
2025-01-21 17:43:19 +01:00
committed by GitHub
parent 8c0363c79b
commit 4d6def38a5
16 changed files with 399 additions and 161 deletions
+164 -141
View File
@@ -44,6 +44,8 @@ from hashlib import sha256
import hmac
import binascii
CHALLENGE_SIZE = 8
class HelpCommand(Command):
@@ -85,11 +87,15 @@ class BleCommand(Command):
if not response:
return
tlv_response = TLV.from_bytes(response)
self.process_response(tlv_response, context)
return CommandResultTLV(tlv_response)
except DataNotPrepared as err:
print('Command failed', err)
return CommandResultNone()
def process_response(self, tlv_response, context):
pass
class HelloCommand(BleCommand):
@@ -129,6 +135,51 @@ class DecommissionCommand(BleCommand):
return TLV(TcatTLVType.DECOMMISSION.value, bytes()).to_bytes()
class DisconnectCommand(Command):
def get_help_string(self) -> str:
return 'Disconnect client from TCAT device'
async def execute_default(self, args, context):
if 'ble_sstream' not in context or context['ble_sstream'] is None:
print("TCAT Device not connected.")
return CommandResultNone()
await context['ble_sstream'].close()
return CommandResultNone()
class ExtractDatasetCommand(BleCommand):
def get_log_string(self) -> str:
return 'Getting active dataset.'
def get_help_string(self) -> str:
return 'Get active dataset from device.'
def prepare_data(self, args, context):
return TLV(TcatTLVType.GET_ACTIVE_DATASET.value, bytes()).to_bytes()
def process_response(self, tlv_response, context):
if tlv_response.type == TcatTLVType.RESPONSE_W_PAYLOAD.value:
dataset = ThreadDataset()
dataset.set_from_bytes(tlv_response.value)
dataset.print_content()
else:
print('Dataset extraction error.')
class GetCommissionerCertificate(BleCommand):
def get_log_string(self) -> str:
return 'Getting commissioner certificate.'
def get_help_string(self) -> str:
return 'Get commissioner certificate from device.'
def prepare_data(self, args, context):
return TLV(TcatTLVType.GET_COMMISSIONER_CERTIFICATE.value, bytes()).to_bytes()
class GetDeviceIdCommand(BleCommand):
def get_log_string(self) -> str:
@@ -177,6 +228,89 @@ class GetNetworkNameCommand(BleCommand):
return TLV(TcatTLVType.GET_NETWORK_NAME.value, bytes()).to_bytes()
class GetPskdHash(BleCommand):
def get_log_string(self) -> str:
return 'Retrieving peer PSKd hash.'
def get_help_string(self) -> str:
return 'Get calculated PSKd hash.'
def prepare_data(self, args, context):
bless: BleStreamSecure = context['ble_sstream']
if bless.peer_public_key is None:
raise DataNotPrepared("Peer certificate not present.")
challenge = token_bytes(CHALLENGE_SIZE)
pskd = bytes(args[0], 'utf-8')
data = TLV(TcatTLVType.GET_PSKD_HASH.value, challenge).to_bytes()
hash = hmac.new(pskd, digestmod=sha256)
hash.update(challenge)
hash.update(bless.peer_public_key)
self.digest = hash.digest()
return data
def process_response(self, tlv_response, context):
if tlv_response.value == self.digest:
print('Requested hash is valid.')
else:
print('Requested hash is NOT valid.')
class GetRandomNumberChallenge(BleCommand):
def get_log_string(self) -> str:
return 'Retrieving random challenge.'
def get_help_string(self) -> str:
return 'Get the device random number challenge.'
def prepare_data(self, args, context):
return TLV(TcatTLVType.GET_RANDOM_NUMBER_CHALLENGE.value, bytes()).to_bytes()
def process_response(self, tlv_response, context):
bless: BleStreamSecure = context['ble_sstream']
if tlv_response.value != None:
if len(tlv_response.value) == CHALLENGE_SIZE:
bless.peer_challenge = tlv_response.value
else:
print('Challenge format invalid.')
return CommandResultNone()
class PingCommand(Command):
def get_help_string(self) -> str:
return 'Send echo request to TCAT device.'
async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']
payload_size = 10
max_payload = 512
if len(args) > 0:
payload_size = int(args[0])
if payload_size > max_payload:
print(f'Payload size too large. Maximum supported value is {max_payload}')
return CommandResultNone()
to_send = token_bytes(payload_size)
data = TLV(TcatTLVType.PING.value, to_send).to_bytes()
elapsed_time = time()
response = await bless.send_with_resp(data)
elapsed_time = 1e3 * (time() - elapsed_time)
if not response:
return CommandResultNone()
tlv_response = TLV.from_bytes(response)
if tlv_response.value != to_send:
print("Received malformed response.")
print(f"Roundtrip time: {elapsed_time} ms")
return CommandResultTLV(tlv_response)
class PresentHash(BleCommand):
def get_log_string(self) -> str:
@@ -215,141 +349,6 @@ class PresentHash(BleCommand):
return data
class GetPskdHash(Command):
def get_log_string(self) -> str:
return 'Retrieving peer PSKd hash.'
def get_help_string(self) -> str:
return 'Get calculated PSKd hash.'
async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']
print(self.get_log_string())
try:
if bless.peer_public_key is None:
print("Peer certificate not present.")
return
challenge_size = 8
challenge = token_bytes(challenge_size)
pskd = bytes(args[0], 'utf-8')
data = TLV(TcatTLVType.GET_PSKD_HASH.value, challenge).to_bytes()
response = await bless.send_with_resp(data)
if not response:
return
tlv_response = TLV.from_bytes(response)
if tlv_response.value != None:
hash = hmac.new(pskd, digestmod=sha256)
hash.update(challenge)
hash.update(bless.peer_public_key)
digest = hash.digest()
if digest == tlv_response.value:
print('Requested hash is valid.')
else:
print('Requested hash is NOT valid.')
return CommandResultTLV(tlv_response)
except DataNotPrepared as err:
print('Command failed', err)
class GetRandomNumberChallenge(Command):
def get_log_string(self) -> str:
return 'Retrieving random challenge.'
def get_help_string(self) -> str:
return 'Get the device random number challenge.'
async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']
print(self.get_log_string())
try:
data = TLV(TcatTLVType.GET_RANDOM_NUMBER_CHALLENGE.value, bytes()).to_bytes()
response = await bless.send_with_resp(data)
if not response:
return
tlv_response = TLV.from_bytes(response)
if tlv_response.value != None:
if len(tlv_response.value) == 8:
bless.peer_challenge = tlv_response.value
else:
print('Challenge format invalid.')
return CommandResultNone()
return CommandResultTLV(tlv_response)
except DataNotPrepared as err:
print('Command failed', err)
class PingCommand(Command):
def get_help_string(self) -> str:
return 'Send echo request to TCAT device.'
async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']
payload_size = 10
max_payload = 512
if len(args) > 0:
payload_size = int(args[0])
if payload_size > max_payload:
print(f'Payload size too large. Maximum supported value is {max_payload}')
return
to_send = token_bytes(payload_size)
data = TLV(TcatTLVType.PING.value, to_send).to_bytes()
elapsed_time = time()
response = await bless.send_with_resp(data)
elapsed_time = 1e3 * (time() - elapsed_time)
if not response:
return CommandResultNone()
tlv_response = TLV.from_bytes(response)
if tlv_response.value != to_send:
print("Received malformed response.")
print(f"Roundtrip time: {elapsed_time} ms")
return CommandResultTLV(tlv_response)
class ThreadStartCommand(BleCommand):
def get_log_string(self) -> str:
return 'Enabling Thread...'
def get_help_string(self) -> str:
return 'Enable thread interface.'
def prepare_data(self, args, context):
return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes()
class ThreadStopCommand(BleCommand):
def get_log_string(self) -> str:
return 'Disabling Thread...'
def get_help_string(self) -> str:
return 'Disable thread interface.'
def prepare_data(self, args, context):
return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes()
class ThreadStateCommand(Command):
def __init__(self):
self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()}
def get_help_string(self) -> str:
return 'Manipulate state of the Thread interface of the connected device.'
async def execute_default(self, args, context):
print('Invalid usage. Provide a subcommand.')
return CommandResultNone()
class ScanCommand(Command):
def get_help_string(self) -> str:
@@ -387,14 +386,38 @@ class ScanCommand(Command):
return CommandResultNone()
class DisconnectCommand(Command):
class ThreadStartCommand(BleCommand):
def get_log_string(self) -> str:
return 'Enabling Thread...'
def get_help_string(self) -> str:
return 'Disconnect client from TCAT device'
return 'Enable thread interface.'
def prepare_data(self, args, context):
return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes()
class ThreadStopCommand(BleCommand):
def get_log_string(self) -> str:
return 'Disabling Thread...'
def get_help_string(self) -> str:
return 'Disable thread interface.'
def prepare_data(self, args, context):
return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes()
class ThreadStateCommand(Command):
def __init__(self):
self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()}
def get_help_string(self) -> str:
return 'Manipulate state of the Thread interface of the connected device.'
async def execute_default(self, args, context):
if 'ble_sstream' not in context or context['ble_sstream'] is None:
print("TCAT Device not connected.")
return CommandResultNone()
await context['ble_sstream'].close()
print('Invalid usage. Provide a subcommand.')
return CommandResultNone()
+6 -4
View File
@@ -30,9 +30,9 @@ import shlex
from argparse import ArgumentParser
from ble.ble_stream_secure import BleStreamSecure
from cli.base_commands import (DisconnectCommand, HelpCommand, HelloCommand, CommissionCommand, DecommissionCommand,
GetDeviceIdCommand, GetPskdHash, GetExtPanIDCommand, GetNetworkNameCommand,
GetProvisioningUrlCommand, PingCommand, GetRandomNumberChallenge, ThreadStateCommand,
ScanCommand, PresentHash)
ExtractDatasetCommand, GetCommissionerCertificate, GetDeviceIdCommand, GetPskdHash,
GetExtPanIDCommand, GetNetworkNameCommand, GetProvisioningUrlCommand, PingCommand,
GetRandomNumberChallenge, ThreadStateCommand, ScanCommand, PresentHash)
from .tlv_commands import TlvCommand
from cli.dataset_commands import (DatasetCommand)
from dataset.dataset import ThreadDataset
@@ -57,12 +57,14 @@ class CLI:
'network_name': GetNetworkNameCommand(),
'ping': PingCommand(),
'dataset': DatasetCommand(),
'get_dataset': ExtractDatasetCommand(),
'thread': ThreadStateCommand(),
'scan': ScanCommand(),
'random_challenge': GetRandomNumberChallenge(),
'present_hash': PresentHash(),
'peer_pskd_hash': GetPskdHash(),
'tlv': TlvCommand()
'tlv': TlvCommand(),
'get_comm_cert': GetCommissionerCertificate(),
}
self._context = {
'ble_sstream': ble_sstream,
+2
View File
@@ -43,6 +43,8 @@ class TcatTLVType(Enum):
GET_RANDOM_NUMBER_CHALLENGE = 0x13
GET_PSKD_HASH = 0x14
ACTIVE_DATASET = 0x20
GET_COMMISSIONER_CERTIFICATE = 0x25
GET_ACTIVE_DATASET = 0x40
DECOMMISSION = 0x60
APPLICATION = 0x82
THREAD_START = 0x27