Files
openthread/tools/tcat_ble_client/cli/cli.py
T
arnulfrupp 928c78a01b [tcat] implement rate limitation for TCAT TLVs 0x10, 0x11 and 0x12 and remove TLV 0x14 (#12211)
This commit implements rate limitation for the TCAT commands Present
PSKd Hash TLV (0x10), Present PSKc Hash TLV (0x11) and Present
Install-code Hash TLV (0x12) to prevent password guessing attacks.

It also removes the TCAT command Request PSKd Hash TLV (0x14), to
prevent offline password guessing attacks with a single Hash value
retrieved from the device.

Note: The commit does not remove the Request PSKd Hash TLV
implementation in the Python commissioner such that the non-existence
of the command TLV can still be tested.
2026-05-04 07:10:19 -07:00

148 lines
6.3 KiB
Python

"""
Copyright (c) 2024-2025, The OpenThread Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
from argparse import Namespace
import logging
import readline
import shlex
from typing import Optional
from cli.base_commands import (DisconnectCommand, HelpCommand, HelloCommand, CommissionCommand, DecommissionCommand,
ExtractDatasetCommand, GetCommissionerCertificate, GetDeviceIdCommand,
GetExtPanIDCommand, GetNetworkNameCommand, GetProvisioningUrlCommand, PingCommand,
GetRandomNumberChallenge, ThreadStateCommand, ScanCommand, PresentHash,
DiagnosticTlvsCommand, GetApplicationLayersCommand, SendVendorData,
SendApplicationData1, SendApplicationData2, SendApplicationData3, SendApplicationData4,
SimulationCommand, connect_helper, disconnect_helper)
from .command import CommandResultNone, CommandResult
from .tlv_commands import TlvCommand
from cli.dataset_commands import (DatasetCommand)
from dataset.dataset import ThreadDataset
logger = logging.getLogger(__name__)
class CLI:
def __init__(self, dataset: ThreadDataset, cmd_args: Optional[Namespace] = None):
self._commands = {
'help': HelpCommand(),
'hello': HelloCommand(),
'get_apps': GetApplicationLayersCommand(),
'appdata1': SendApplicationData1(),
'appdata2': SendApplicationData2(),
'appdata3': SendApplicationData3(),
'appdata4': SendApplicationData4(),
'vendor_data': SendVendorData(),
'commission': CommissionCommand(),
'decommission': DecommissionCommand(),
'disconnect': DisconnectCommand(),
'device_id': GetDeviceIdCommand(),
'ext_panid': GetExtPanIDCommand(),
'provisioning_url': GetProvisioningUrlCommand(),
'network_name': GetNetworkNameCommand(),
'ping': PingCommand(),
'dataset': DatasetCommand(),
'get_dataset': ExtractDatasetCommand(),
'thread': ThreadStateCommand(),
'scan': ScanCommand(),
'simulation': SimulationCommand(),
'random_challenge': GetRandomNumberChallenge(),
'present_hash': PresentHash(),
'tlv': TlvCommand(),
'get_comm_cert': GetCommissionerCertificate(),
'diagnostic_tlvs': DiagnosticTlvsCommand()
}
self.context = {
'ble_sstream': None, # BleStreamSecure | None
'ble_stream': None, # BleStream | None
'dataset': dataset,
'commands': self._commands,
'cmd_args': cmd_args
}
readline.set_completer(self.completer)
readline.parse_and_bind('tab: complete')
def completer(self, text, state):
command_pool = self._commands.keys()
full_line = readline.get_line_buffer().lstrip()
words = full_line.split()
should_suggest_subcommands = len(words) > 1 or (len(words) == 1 and full_line[-1].isspace())
if should_suggest_subcommands:
if words[0] not in self._commands.keys():
return None
current_command = self._commands[words[0]]
if full_line[-1].isspace():
subcommands = words[1:]
else:
subcommands = words[1:-1]
for nextarg in subcommands:
if nextarg in current_command._subcommands.keys():
current_command = current_command._subcommands[nextarg]
else:
return None
if len(current_command._subcommands) == 0:
return None
command_pool = current_command._subcommands.keys()
options = [c for c in command_pool if c.startswith(text)]
if state < len(options):
return options[state]
else:
return None
async def evaluate_input(self, user_input) -> CommandResult:
# do not parse empty commands
if not user_input.strip():
return CommandResultNone()
command_parts = shlex.split(user_input)
command = command_parts[0]
args = command_parts[1:]
if command not in self._commands.keys():
raise Exception('Invalid command: {}'.format(command))
return await self._commands[command].execute(args, self.context)
async def connect(self, device) -> bool:
"""
Connect with TLS to the BLE/simulation device.
:param device: the BLE device object or simulation UdpStream object
:return: True if connection was successful, False otherwise
"""
return await connect_helper(device, self.context)
async def disconnect(self):
""" Disconnect from the BLE/simulation device. """
await disconnect_helper(self.context)