[tcat] feat: add 'adapter' argument and use 'BLEDevice' for scanning (#10657)

Added '-a', '--adapter' arguments to the bbtc.py arguments
parser. This allows the selection of the HCI adapter for the scanning
procedure.

According to 'Bleak' documentation of the 'BleakClient'
class(https://bleak.readthedocs.io/en/latest/api/client.html#bleakclient-class),
it's better to use the 'BLEDevice' object in 'BleakClient' during the
object instantiation, therefore it has been changed for the scanning
This commit is contained in:
Jakub Uliarczyk
2024-10-07 17:55:20 +02:00
committed by GitHub
parent b8d2e5d63a
commit 52d3f49ffe
3 changed files with 15 additions and 9 deletions
+3 -2
View File
@@ -49,6 +49,7 @@ async def main():
logging.basicConfig(level=logging.WARNING)
parser = argparse.ArgumentParser(description='Device parameters')
parser.add_argument('-a', '--adapter', help='Select HCI adapter')
parser.add_argument('--debug', help='Enable debug logs', action='store_true')
parser.add_argument('--info', help='Enable info logs', action='store_true')
parser.add_argument('--cert_path', help='Path to certificate chain and key', action='store', default='auth')
@@ -122,10 +123,10 @@ async def get_device_by_args(args):
device = await ble_scanner.find_first_by_name(args.name)
device = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
elif args.scan:
tcat_devices = await ble_scanner.scan_tcat_devices()
tcat_devices = await ble_scanner.scan_tcat_devices(adapter=args.adapter)
device = select_device_by_user_input(tcat_devices)
if device:
device = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
device = await BleStream.create(device, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
elif args.simulation:
device = UdpStream("127.0.0.1", int(args.simulation))
+8 -4
View File
@@ -27,7 +27,9 @@
"""
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bbtc import BBTC_SERVICE_UUID
from typing import Optional
async def find_first_by_name(name):
@@ -42,11 +44,13 @@ async def find_first_by_mac(mac):
return device
async def scan_tcat_devices():
async def scan_tcat_devices(adapter: Optional[str] = None):
scanner = BleakScanner()
tcat_devices = []
devices_dict = await scanner.discover(return_adv=True, service_uuids=[BBTC_SERVICE_UUID.lower()])
for _, (device, _) in devices_dict.items():
tcat_devices: list[BLEDevice] = []
discovered_devices = await scanner.discover(return_adv=True,
service_uuids=[BBTC_SERVICE_UUID.lower()],
adapter=adapter)
for _, (device, _) in discovered_devices.items():
tcat_devices.append(device)
return tcat_devices
+4 -3
View File
@@ -27,12 +27,13 @@
"""
from itertools import count, takewhile
from typing import Iterator
from typing import Iterator, Union
import logging
import time
from asyncio import sleep
from bleak import BleakClient
from bleak.backends.device import BLEDevice
from bleak.backends.characteristic import BleakGATTCharacteristic
logger = logging.getLogger(__name__)
@@ -65,8 +66,8 @@ class BleStream:
return takewhile(len, (data[i:i + n] for i in count(0, n)))
@classmethod
async def create(cls, address, service_uuid, tx_char_uuid, rx_char_uuid):
client = BleakClient(address)
async def create(cls, address_or_ble_device: Union[BLEDevice, str], service_uuid, tx_char_uuid, rx_char_uuid):
client = BleakClient(address_or_ble_device)
await client.connect()
self = cls(client, service_uuid, tx_char_uuid, rx_char_uuid)
await client.start_notify(self.tx_char_uuid, self.__handle_rx)