Init version of python tool for measuring throughput

User provides HCI indexes of connected controllers, the tool configures them and starts throughput test. More in README file.
This commit is contained in:
Jakub
2022-05-23 13:24:47 +02:00
committed by Szymon Janc
parent edb6a287ea
commit 57e29af5d2
37 changed files with 3015 additions and 0 deletions
View File
+100
View File
@@ -0,0 +1,100 @@
# HCI Throughput
Tool for measuring BLE throughput.
## Packages versions
Python 3.8.10 \
Matplotlib 3.5.1
## Usage
### Prepare devices
This tool may be used with existing controller or with any board with ```blehci``` app.
- If you want to use builtin PC controller, provide HCI index of the controller. Turn the Bluetooth ON on your device, run ```hciconfig``` in the terminal and get the HCI index. In the case below HCI index is equal to 0:
```
user@user:~$ hciconfig
hci0: Type: Primary Bus: USB
BD Address: 64:BC:58:E2:9C:52 ACL MTU: 1021:4 SCO MTU: 96:6
UP RUNNING
RX bytes:20003 acl:0 sco:0 events:3176 errors:0
TX bytes:771246 acl:0 sco:0 commands:3174 errors:0
```
- If you want to use the nimble controller, create the image and load the provided target (can be found under ```/targets``` for NRF52840 and NRF52832).
- NRF52840 may use USB or UART transport. The target is configured for USB by default.
- NRF52832 uses UART as transport. This requires some additional configuration. Get the tty path and run in the terminal:
```
sudo btattach -B /dev/ttyACM0 -S 1000000
```
Then proceed with ```hciconfig``` as shown above.
### Run tests
This tool opens a raw socket which requires running all scripts as ```sudo```. Copy the ```config.yaml.sample``` file, change the name to ```config.yaml``` and fill the parameters. Run ```main.py``` as shown below:
```
sudo python main.py -i <hci_idx_1> <hci_idx_2> -m rx tx -cf config.yaml
```
Switch ```<hci_idx_1>``` and ```<hci_idx_2>``` to corresponding hci indexes present in your computer. ```-m``` and ```-cf``` may be omitted if the defaults are correct. \
The output provides the plots of measured throughput in ```kb``` or ```kB``` as predefined in ```config.yaml```. In addition to the throughput plots, when the ```flag_plot_packets``` is turned on, the number of packets transmitted/received in time is visualized.
#### Set ```config.yaml``` file
To run **once** the throughput measurement with given parameters, set the ```flag_testing``` to false.
```
flag_testing: false
```
To run the throughput measurements **more than once** with the same parameters and to generate the plot of average throughputs, set ```config.yaml``` as shown below:
```
show_tp_plots: false
flag_testing: true
test:
change_param_group: null
change_param_variable: null
start_value: 0
stop_value: 5
step: 1
```
This configuration provides 5 measurements. The ```show_tp_plots``` flag is optionally set as ```false``` for speed, changing it to ```true``` will trigger rx and tx throughput plots at the end of every iteration.
To run the throughput measurement with some parameters changing within tests, fill config as below:
```
flag_testing: true
test:
change_param_group:
- conn
- conn
change_param_variable:
- connection_interval_min
- connection_interval_max
start_value: 0x000A
stop_value: 0x0320
step: 20
```
This will run each test incrementing ```connection_interval_min``` and ```connection_interval_max``` by 20. the final plot will show the influence of the parameters change on the average throughput.
## Tools
The ```main.py``` script usees all tools mentioned below and it is advised to use it above all. Nevertheless, the sub-tools may be used separately as shown below.
### HCI device sub-tool
```hci_device.py``` is a tool that manages one hci device. User can provide parameters and run it as receiver or transmitter as shown below:
```
sudo python hci_device.py -m rx -oa 00:00:00:00:00:00 -oat 0 -di 0 -pa 00:00:00:00:00:00 -pat 0 -pdi 0 -cf config.yaml
```
Run ```python hci_device.py --help``` for parameters description. \
If properly configured ```init.yaml``` is present (it is created automatically while running ```main.py```), the script can be run like this:
```
sudo python hci_device.py -m tx -if init.yaml
```
### Check addr sub-tool
When given hci indexes, ```check_addr.py``` returns devices' address types and addresses.
```
sudo python check_addr.py -i <hci_idx_1> <hci_idx_2> ... <hci_idx_N>
```
### Throughput sub-tool
The timestamps of the received packets are stored in csv files (```tp_receiver.csv``` and ```tp_transmitter.csv``` by default). If the program stopped in the middle of the measurements, you can still plot the values and get the average througput. Provide the filename, sample time and run the tool as shown below:
```
python throughput.py -f tp_receiver -s 0.1
```
+104
View File
@@ -0,0 +1,104 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import argparse
import asyncio
import hci_commands
import sys
import logging
import hci
import traceback
import util
import transport_factory
def parse_arguments():
parser = argparse.ArgumentParser(
description='Check HCI device address type and address',
epilog='How to run script: \
sudo python check_addr.py -i 0 1 2')
parser.add_argument('-i', '--indexes', type=str, nargs='*',
help='specify hci adapters indexes', default=0)
try:
args = parser.parse_args()
except Exception as e:
print(traceback.format_exc())
return args
async def main(dev: hci_commands.HCI_Commands):
result = tuple()
task = asyncio.create_task(dev.rx_buffer_q_wait())
await dev.cmd_reset()
await dev.cmd_read_bd_addr()
if hci.bdaddr != '00:00:00:00:00:00':
logging.info("Type public: %s, address: %s", hci.PUBLIC_ADDRESS_TYPE, hci.bdaddr)
result = (0, hci.bdaddr)
print("Public address: ", result)
else:
await dev.cmd_vs_read_static_addr()
if hci.static_addr != '00:00:00:00:00:00':
logging.info("Type static random: %s, address: %s", hci.STATIC_RANDOM_ADDRESS_TYPE, hci.static_addr)
result = (1, hci.static_addr)
print("Static random address: ", result)
else:
addr = hci.gen_static_rand_addr()
logging.info("Type static random: %s, generated address: %s", hci.STATIC_RANDOM_ADDRESS_TYPE, addr)
result = (1, addr)
print("Generated static random address: ", result)
task.cancel()
return result
def check_addr(device_indexes: list, addresses: list) -> list:
util.configure_logging(f"log/check_addr.log", clear_log_file=True)
logging.info(f"Devices indexes: {device_indexes}")
for index in device_indexes:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)
transport = transport_factory.TransportFactory(device_index=index,
asyncio_loop=loop)
bt_dev = hci_commands.HCI_Commands(send=transport.send,
rx_buffer_q=transport.rx_buffer_q,
asyncio_loop=loop)
transport.start()
addresses.append(loop.run_until_complete(main(bt_dev)))
transport.stop()
loop.close()
logging.info(f"Finished: {addresses}")
return addresses
if __name__ == '__main__':
try:
args = parse_arguments()
print(args)
addresses = []
addresses = check_addr(args.indexes, addresses)
print(addresses)
except Exception as e:
print(traceback.format_exc())
finally:
sys.exit()
+55
View File
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
num_of_bytes_to_send: 247
num_of_packets_to_send: 2000
show_tp_plots: true
flag_testing: false
test:
change_param_group:
- conn
- conn
change_param_variable:
- connection_interval_min
- connection_interval_max
start_value: 16
stop_value: 160
step: 8
tp:
data_type: kb
sample_time: 0.1
flag_plot_packets: true
phy: 2M
adv:
advertising_interval_min: 2048
advertising_interval_max: 2048
advertising_type: 0
peer_address: 00:00:00:00:00:00
advertising_channel_map: 7
advertising_filter_policy: 0
conn:
le_scan_interval: 2400
le_scan_window: 2400
initiator_filter_policy: 0
connection_interval_min: 0x0080
connection_interval_max: 0x0080
max_latency: 0
supervision_timeout: 3200
min_ce_length: 0
max_ce_length: 0
+605
View File
@@ -0,0 +1,605 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from dataclasses import dataclass
import struct
from binascii import unhexlify
import random
############
# DEFINES
############
AF_BLUETOOTH = 31
HCI_CHANNEL_USER = 1
HCI_COMMAND_PACKET = 0x01
HCI_ACL_DATA_PACKET = 0x02
HCI_EVENT_PACKET = 0x04
HCI_EV_CODE_DISCONN_CMP = 0x05
HCI_EV_CODE_CMD_CMP = 0x0e
HCI_EV_CODE_CMD_STATUS = 0x0f
HCI_EV_CODE_LE_META_EVENT = 0x3e
HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP = 0x0a
HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE = 0x07
HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP = 0x0c
HCI_SUBEV_CODE_LE_CHAN_SEL_ALG = 0x14
HCI_EV_NUM_COMP_PKTS = 0x13
CONN_FAILED_TO_BE_ESTABLISHED = 0x3e
CONN_TIMEOUT = 0x08
OGF_HOST_CTL = 0x03
OCF_SET_EVENT_MASK = 0x0001
OCF_RESET = 0X0003
OGF_INFO_PARAM = 0x04
OCF_READ_LOCAL_COMMANDS = 0x0002
OCF_READ_BD_ADDR = 0x0009
OGF_LE_CTL = 0x08
OCF_LE_SET_EVENT_MASK = 0x0001
OCF_LE_READ_BUFFER_SIZE_V1 = 0x0002
OCF_LE_READ_BUFFER_SIZE_V2 = 0x0060
OCF_LE_SET_RANDOM_ADDRESS = 0x0005
OCF_LE_SET_ADVERTISING_PARAMETERS = 0x0006
OCF_LE_SET_ADVERTISE_ENABLE = 0x000a
OCF_LE_SET_SCAN_PARAMETERS = 0x000b
OCF_LE_SET_SCAN_ENABLE = 0x000c
OCF_LE_CREATE_CONN = 0x000d
OCF_LE_SET_DATA_LEN = 0x0022
OCF_LE_READ_SUGGESTED_DFLT_DATA_LEN = 0x0023
OCF_LE_READ_MAX_DATA_LEN = 0x002f
OCF_LE_READ_PHY = 0x0030
OCF_LE_SET_DFLT_PHY = 0x0031
OCF_LE_SET_PHY = 0x0032
OGF_VENDOR_SPECIFIC = 0x003f
BLE_HCI_OCF_VS_RD_STATIC_ADDR = 0x0001
PUBLIC_ADDRESS_TYPE = 0
STATIC_RANDOM_ADDRESS_TYPE = 1
WAIT_FOR_EVENT_TIMEOUT = 5
WAIT_FOR_EVENT_CONN_TIMEOUT = 25
############
# GLOBAL VAR
############
num_of_bytes_to_send = None # based on supported_max_tx_octets
num_of_packets_to_send = None
events_list = []
bdaddr = '00:00:00:00:00:00'
static_addr = '00:00:00:00:00:00'
le_read_buffer_size = None
conn_handle = 0
requested_tx_octets = 1
requested_tx_time = 1
suggested_dflt_data_len = None
max_data_len = None
phy = None
ev_num_comp_pkts = None
num_of_completed_packets_cnt = 0
num_of_completed_packets_time = 0
############
# FUNCTIONS
############
def get_opcode(ogf: int, ocf: int):
return ((ocf & 0x03ff)|(ogf << 10))
def get_ogf_ocf(opcode: int):
ogf = opcode >> 10
ocf = opcode & 0x03ff
return ogf, ocf
def cmd_addr_to_ba(addr_str: str):
return unhexlify("".join(addr_str.split(':')))[::-1]
def ba_addr_to_str(addr_ba: bytearray):
addr_str = addr_ba.hex().upper()
return ':'.join(addr_str[i:i+2] for i in range(len(addr_str), -2, -2))[1:]
def gen_static_rand_addr():
while True:
x = [random.randint(0,1) for _ in range(0,48)]
if 0 in x[:-2] and 1 in x[:-2]:
x[0] = 1
x[1] = 1
break
addr_int = int("".join([str(x[i]) for i in range(0,len(x))]), 2)
addr_hex = "{0:0{1}x}".format(addr_int, 12)
addr = ":".join(addr_hex[i:i+2] for i in range(0, len(addr_hex), 2))
return addr.upper()
############
# GLOBAL VAR CLASSES
############
@dataclass
class Suggested_Dflt_Data_Length():
status: int
suggested_max_tx_octets: int
suggested_max_tx_time: int
def __init__(self):
self.set()
def set(self, status=0, suggested_max_tx_octets=0, suggested_max_tx_time=0):
self.status = status
self.suggested_max_tx_octets = suggested_max_tx_octets
self.suggested_max_tx_time = suggested_max_tx_time
@dataclass
class Max_Data_Length():
status: int
supported_max_tx_octets: int
supported_max_tx_time: int
supported_max_rx_octets: int
supported_max_rx_time: int
def __init__(self):
self.set()
def set(self, status=0, supported_max_tx_octets=0, supported_max_tx_time=0,
supported_max_rx_octets=0, supported_max_rx_time=0):
self.status = status
self.supported_max_tx_octets = supported_max_tx_octets
self.supported_max_tx_time = supported_max_tx_time
self.supported_max_rx_octets = supported_max_rx_octets
self.supported_max_rx_time = supported_max_rx_time
@dataclass
class LE_Read_Buffer_Size:
status: int
le_acl_data_packet_length: int
total_num_le_acl_data_packets: int
iso_data_packet_len: int
total_num_iso_data_packets: int
def __init__(self):
self.set()
def set(self, status=0, le_acl_data_packet_length=0,
total_num_le_acl_data_packets=0, iso_data_packet_len=0,
total_num_iso_data_packets=0):
self.status = status
self.le_acl_data_packet_length = le_acl_data_packet_length
self.total_num_le_acl_data_packets = total_num_le_acl_data_packets
self.iso_data_packet_len = iso_data_packet_len
self.total_num_iso_data_packets = total_num_iso_data_packets
@dataclass
class LE_Read_PHY:
status: int
connection_handle: int
tx_phy: int
rx_phy: int
def __init__(self):
self.set()
def set(self, status=0, connection_handle=0, tx_phy=0, rx_phy=0):
self.status = status
self.connection_handle = connection_handle
self.tx_phy = tx_phy
self.rx_phy = rx_phy
############
# EVENTS
############
@dataclass
class HCI_Ev_Disconn_Complete:
status: int
connection_handle: int
reason: int
def __init__(self):
self.set()
def set(self, status=0, connection_handle=0, reason=0):
self.status = status
self.connection_handle = connection_handle
self.reason = reason
@dataclass
class HCI_Ev_Cmd_Complete:
num_hci_command_packets: int
opcode: int
return_parameters: int
def __init__(self):
self.set()
def set(self, num_hci_cmd_packets=0, opcode=0, return_parameters=b''):
self.num_hci_command_packets = num_hci_cmd_packets
self.opcode = opcode
self.return_parameters = return_parameters
@dataclass
class HCI_Ev_Cmd_Status:
status: int
num_hci_command_packets: int
opcode: int
def __init__(self):
self.set()
def set(self, status = 0, num_hci_cmd_packets=0, opcode=0):
self.status = status
self.num_hci_command_packets = num_hci_cmd_packets
self.opcode = opcode
@dataclass
class HCI_Ev_LE_Meta:
subevent_code: int
def __init__(self):
self.set()
def set(self, subevent_code=0):
self.subevent_code = subevent_code
@dataclass
class HCI_Ev_LE_Enhanced_Connection_Complete(HCI_Ev_LE_Meta):
status: int
connection_handle: int
role: int
peer_address_type: int
peer_address: str
local_resolvable_private_address: int
peer_resolvable_private_address: int
connection_interval: int
peripheral_latency: int
supervision_timeout: int
central_clock_accuracy: int
def __init__(self):
self.set()
def set(self, subevent_code=0, status=0, connection_handle=0, role=0,
peer_address_type=0, peer_address='00:00:00:00:00:00',
local_resolvable_private_address='00:00:00:00:00:00',
peer_resolvable_private_address='00:00:00:00:00:00',
connection_interval=0, peripheral_latency=0, supervision_timeout=0,
central_clock_accuracy=0):
super().set(subevent_code)
self.status = status
self.connection_handle = connection_handle
self.role = role
self.peer_address_type = peer_address_type
self.peer_address = peer_address
self.local_resolvable_private_address = local_resolvable_private_address
self.peer_resolvable_private_address = peer_resolvable_private_address
self.connection_interval = connection_interval
self.peripheral_latency = peripheral_latency
self.supervision_timeout = supervision_timeout
self.central_clock_accuracy = central_clock_accuracy
@dataclass
class HCI_Ev_LE_Data_Length_Change(HCI_Ev_LE_Meta):
conn_handle: int
max_tx_octets: int
max_tx_time: int
max_rx_octets: int
max_rx_time: int
triggered: int
def __init__(self):
self.set()
def set(self, subevent_code=0, conn_handle=0, max_tx_octets=0,
max_tx_time=0, max_rx_octets=0, max_rx_time=0, triggered=0):
super().set(subevent_code)
self.conn_handle = conn_handle
self.max_tx_octets = max_tx_octets
self.max_tx_time = max_tx_time
self.max_rx_octets = max_rx_octets
self.max_rx_time = max_rx_time
self.triggered = triggered
@dataclass
class HCI_Ev_LE_PHY_Update_Complete(HCI_Ev_LE_Meta):
status: int
connection_handle: int
tx_phy: int
rx_phy: int
def __init__(self):
self.set()
def set(self, subevent_code=0, status=0, connection_handle=0,
tx_phy=0, rx_phy=0):
super().set(subevent_code)
self.status = status
self.connection_handle = connection_handle
self.tx_phy = tx_phy
self.rx_phy = rx_phy
@dataclass
class HCI_Number_Of_Completed_Packets:
num_handles: int
connection_handle: int
num_completed_packets: int
def __init__(self):
self.set()
def set(self, num_handles=0, connection_handle=0, num_completed_packets=0):
self.num_handles = num_handles
self.connection_handle = connection_handle
self.num_completed_packets = num_completed_packets
class HCI_Ev_LE_Chan_Sel_Alg(HCI_Ev_LE_Meta):
connection_handle: int
algorithm: int
def __init__(self):
self.set()
def set(self, subevent_code=0, connection_handle=0, algorithm=0):
super().set(subevent_code)
self.connection_handle = connection_handle
self.algorithm = algorithm
############
# PARAMETERS
############
@dataclass
class HCI_Advertising:
advertising_interval_min: int
advertising_interval_max: int
advertising_type: int
own_address_type: int
peer_address_type: int
peer_address: str
advertising_channel_map: int
advertising_filter_policy: int
ba_full_message: bytearray
def __init__(self):
self.set()
def set(self, advertising_interval_min=0, advertising_interval_max=0, \
advertising_type=0, own_address_type=0, peer_address_type=0, \
peer_address='00:00:00:00:00:00', advertising_channel_map=0, \
advertising_filter_policy=0):
self.advertising_interval_min = advertising_interval_min
self.advertising_interval_max = advertising_interval_max
self.advertising_type = advertising_type
self.own_address_type = own_address_type
self.peer_address_type = peer_address_type
self.peer_address = peer_address
self.advertising_channel_map = advertising_channel_map
self.advertising_filter_policy = advertising_filter_policy
self.ba_full_message = bytearray(struct.pack('<HHBBBBB',
advertising_interval_min, advertising_interval_max,
advertising_type, own_address_type, peer_address_type,
advertising_channel_map, advertising_filter_policy))
peer_addr_ba = cmd_addr_to_ba(peer_address)
self.ba_full_message[7:7] = peer_addr_ba
@dataclass
class HCI_Scan:
le_scan_type: int
le_scan_interval: int
le_scan_window: int
own_address_type: int
scanning_filter_policy: int
ba_full_message: bytearray
def __init__(self):
self.set()
def set(self, le_scan_type=0, le_scan_interval=0, le_scan_window=0,
own_address_type=0, scanning_filter_policy=0):
self.le_scan_type = le_scan_type
self.le_scan_interval = le_scan_interval
self.le_scan_window = le_scan_window
self.own_address_type = own_address_type
self.scanning_filter_policy = scanning_filter_policy
self.ba_full_message = bytearray(struct.pack('<BHHBB',le_scan_type,
le_scan_interval, le_scan_window, own_address_type,
scanning_filter_policy))
@dataclass
class HCI_Connect:
le_scan_interval: int
le_scan_window: int
initiator_filter_policy: int
peer_address_type: int
peer_address: str
own_address_type: int
connection_interval_min: int
connection_interval_max: int
max_latency: int
supervision_timeout: int
min_ce_length: int
max_ce_length: int
ba_full_message: bytearray
def __init__(self):
self.set()
def set(self, le_scan_interval=0, le_scan_window=0, \
initiator_filter_policy=0, peer_address_type=0, \
peer_address='00:00:00:00:00:00', own_address_type=0, \
connection_interval_min=0, connection_interval_max=0, \
max_latency=0, supervision_timeout=0, min_ce_length=0, \
max_ce_length=0):
self.le_scan_interval = le_scan_interval
self.le_scan_window = le_scan_window
self.initiator_filter_policy = initiator_filter_policy
self.peer_address_type = peer_address_type
self.peer_address = peer_address
self.own_address_type = own_address_type
self.connection_interval_min = connection_interval_min
self.connection_interval_max = connection_interval_max
self.max_latency = max_latency
self.supervision_timeout = supervision_timeout
self.min_ce_length = min_ce_length
self.max_ce_length = max_ce_length
self.ba_full_message = bytearray(struct.pack('<HHBBBHHHHHH',
le_scan_interval, le_scan_window, initiator_filter_policy,
peer_address_type, own_address_type, connection_interval_min,
connection_interval_max, max_latency,supervision_timeout,
min_ce_length, max_ce_length))
peer_addr_ba = cmd_addr_to_ba(peer_address)
self.ba_full_message[6:6] = peer_addr_ba
############
# RX / TX
############
@dataclass
class HCI_Receive:
packet_type: int
def __init__(self):
self.set()
def set(self,packet_type=0):
self.packet_type = packet_type
@dataclass
class HCI_Recv_Event_Packet(HCI_Receive):
ev_code: int
packet_len: int
recv_data: bytearray
current_event: None
def __init__(self):
self.set()
def set(self,packet_type=0, ev_code=0, packet_len=0,
recv_data=bytearray(256)):
super().set(packet_type)
self.ev_code = ev_code
self.packet_len = packet_len
self.recv_data = recv_data
self.recv_data = recv_data[:packet_len]
@dataclass
class HCI_Recv_ACL_Data_Packet(HCI_Receive):
connection_handle: int
pb_flag: int
bc_flag: int
data_total_len: int
data: bytearray
def __init__(self):
self.set()
def set(self, packet_type=0, connection_handle=0,
pb_flag=0, bc_flag=0, total_data_len=0, data=b''):
super().set(packet_type)
self.connection_handle = connection_handle
self.pb_flag = pb_flag
self.bc_flag = bc_flag
self.data_total_len = total_data_len
self.data = data
@dataclass
class HCI_Recv_L2CAP_Data:
pdu_length: int
channel_id: int
data: bytearray
def __init__(self):
self.set()
def set(self, pdu_length=0, channel_id=0, data=b''):
self.pdu_length = pdu_length
self.channel_id = channel_id
self.data = data
@dataclass
class HCI_Cmd_Send:
packet_type: int
ogf: int
ocf: int
packet_len: int
data: bytearray
ba_full_message: bytearray
def __init__(self):
self.set()
def set(self, ogf=0, ocf=0, data=b''):
self.packet_type = HCI_COMMAND_PACKET
self.ogf = ogf
self.ocf = ocf
self.opcode = get_opcode(ogf, ocf)
self.packet_len = len(data)
self.data = data
self.ba_full_message = bytearray(struct.pack('<BHB',
self.packet_type, self.opcode, self.packet_len))
self.ba_full_message.extend(self.data)
@dataclass
class HCI_ACL_Data_Send:
packet_type: int
connection_handle: int
pb_flag: int
bc_flag: int
data_total_length: int
data: bytearray
ba_full_message: bytearray
def __init__(self):
self.set()
def set(self, connection_handle=0, pb_flag=0b00, bc_flag=0b00, data=b''):
self.packet_type = HCI_ACL_DATA_PACKET
self.connection_handle = connection_handle
self.pb_flag = pb_flag
self.bc_flag = bc_flag
self.data_total_length = len(data)
self.data = data
self.ba_full_message = bytearray(struct.pack('<BHH',
self.packet_type,
((self.connection_handle & 0x0eff) |
(self.pb_flag << 12) |
(self.bc_flag << 14)),
self.data_total_length))
self.ba_full_message.extend(self.data)
@dataclass
class L2CAP_Data_Send:
pdu_length: int
channel_id: int
data: bytearray
ba_full_message: bytearray
def __init__(self):
self.set()
def set(self, pdu_length=0, channel_id=0, data=b''):
if not pdu_length:
self.pdu_length = len(data)
else:
self.pdu_length = pdu_length
self.channel_id = channel_id
self.data = data
fmt_conf = "<HH"
self.ba_full_message = bytearray(struct.pack(fmt_conf,
self.pdu_length, self.channel_id))
self.ba_full_message.extend(data)
+668
View File
@@ -0,0 +1,668 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import asyncio
import struct
import hci
import sys
import time
async def wait_ev(ev):
while ev.is_set() == False:
await asyncio.sleep(0.000001)
async def wait_for_event(ev, timeout):
try:
await asyncio.wait_for(wait_ev(ev), timeout)
except TimeoutError as e:
logging.error(f"Timeout waiting for event: {e}")
sys.exit()
class HCI_Commands():
def __init__(self, send=None, rx_buffer_q=None,
asyncio_loop=None, tp=None, device_mode="rx"):
self.hci_send_cmd = hci.HCI_Cmd_Send()
self.hci_send_acl_data = hci.HCI_ACL_Data_Send()
self.hci_recv_ev_packet = hci.HCI_Recv_Event_Packet()
self.async_sem_cmd = asyncio.Semaphore()
self.async_ev_cmd_end = asyncio.Event()
self.async_ev_connected = asyncio.Event()
self.async_ev_set_data_len = asyncio.Event()
self.async_ev_update_phy = asyncio.Event()
self.async_ev_num_cmp_pckts = asyncio.Event()
self.async_ev_recv_data_finish = asyncio.Event()
self.async_ev_rx_wait_finish = asyncio.Event()
self.async_lock_packets_cnt = asyncio.Lock()
self.valid_recv_data = 0
self.expected_recv_data = 0
self.last_timestamp = 0
self.sent_packets_counter = 0
self.send = send
self.rx_buffer_q = rx_buffer_q
self.tp = tp
self.loop = asyncio_loop
self.device_mode = device_mode
async def rx_buffer_q_wait(self):
try:
logging.debug("%s", self.rx_buffer_q_wait.__name__)
while not self.async_ev_rx_wait_finish.is_set():
if self.rx_buffer_q.empty():
await asyncio.sleep(0.000000001)
continue
await self.loop.create_task(self.recv_handler())
logging.info("rx_buffer_q_wait finished")
self.async_ev_rx_wait_finish.clear()
except asyncio.CancelledError:
logging.critical("rx_buffer_q_wait task canceled")
""" 7.3 Controller & Baseband commands """
async def cmd_set_event_mask(self, mask: int = 0x00001fffffffffff):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_HOST_CTL, hci.OCF_SET_EVENT_MASK,
struct.pack('<Q', mask))
logging.debug("%s %s", self.cmd_set_event_mask.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_reset(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_HOST_CTL, hci.OCF_RESET)
logging.debug("%s %s", self.cmd_reset.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
""" 7.4 Informational parameters """
async def cmd_read_local_supported_cmds(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_INFO_PARAM, hci.OCF_READ_LOCAL_COMMANDS)
logging.debug("%s %s", self.cmd_read_local_supported_cmds.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_read_bd_addr(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_INFO_PARAM, hci.OCF_READ_BD_ADDR)
logging.debug("%s %s", self.cmd_read_bd_addr.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
""" 7.8 LE Controller Commands """
async def cmd_le_set_event_mask(self, mask: int = 0x000000000000001f):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_EVENT_MASK,
struct.pack('<Q', mask))
logging.debug("%s %s", self.cmd_le_set_event_mask.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_buffer_size(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_READ_BUFFER_SIZE_V1)
logging.debug("%s %s", self.cmd_le_read_buffer_size.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_random_addr(self, addr: str):
async with self.async_sem_cmd:
addr_ba = hci.cmd_addr_to_ba(addr)
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_RANDOM_ADDRESS,
addr_ba)
logging.debug("%s %s", self.cmd_le_set_random_addr.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_advertising_params(self, adv_params: hci.HCI_Advertising):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_ADVERTISING_PARAMETERS,
adv_params.ba_full_message)
logging.debug("%s %s", self.cmd_le_set_advertising_params.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_advertising_enable(self, adv_en: int = 0):
""" Default: Disabled """
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_ADVERTISE_ENABLE,
struct.pack('<B',adv_en))
logging.debug("%s %s", self.cmd_le_set_advertising_enable.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_scan_params(self, scan_params: hci.HCI_Scan):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_SCAN_PARAMETERS,
scan_params.ba_full_message)
logging.debug("%s %s", self.cmd_le_set_scan_params.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_scan_enable(self, scan_en: int = 0, filter_dup: int = 0):
""" Default:
scan_en: disabled
filter_dup: disabled
"""
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_SCAN_ENABLE,
struct.pack('<BB', scan_en, filter_dup))
logging.debug("%s %s", self.cmd_le_set_scan_enable.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_create_connection(self, con_params: hci.HCI_Connect):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_CREATE_CONN,
con_params.ba_full_message)
logging.debug("%s %s", self.cmd_le_create_connection.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_data_len(self, conn_handle: int, tx_octets: int, tx_time: int):
""" conn_handle: Range 0x0000 to 0x0EFF
tx_octets: Range 0x001B to 0x00FB
tx_time: Range 0x0148 to 0x4290
"""
logging.debug("%s", self.cmd_le_set_data_len.__name__)
async with self.async_sem_cmd:
if tx_octets == 0 or tx_time == 0:
tx_octets = hci.max_data_len.supported_max_tx_octets
tx_time = hci.max_data_len.supported_max_tx_time
hci.requested_tx_octets = tx_octets
hci.requested_tx_time = tx_time
while conn_handle != hci.conn_handle:
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_DATA_LEN,
struct.pack('<HHH', conn_handle,
tx_octets, tx_time))
logging.debug("%s %s", self.cmd_le_set_data_len.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_suggested_dflt_data_len(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_READ_SUGGESTED_DFLT_DATA_LEN)
logging.debug("%s %s",self.cmd_le_read_suggested_dflt_data_len.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_max_data_len(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_READ_MAX_DATA_LEN)
logging.debug("%s %s", self.cmd_le_read_max_data_len.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_phy(self, conn_handle: int):
""" conn_handle: Range 0x0000 to 0x0EFF
"""
async with self.async_sem_cmd:
while conn_handle != hci.conn_handle:
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_READ_PHY,
struct.pack('<H', conn_handle))
logging.debug("%s %s", self.cmd_le_read_phy.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_dflt_phy(self, all_phys: int = 0, tx_phys: int = 0, rx_phys: int = 0):
""" Default:
all_phys: 0 - The Host has no preference among the transmitter PHYs
supported by the Controller
tx_phys: 0 - The Host prefers to use the LE 1M transmitter PHY
(possibly among others)
rx_phys: 0 - The Host prefers to use the LE 1M receiver PHY
(possibly among others)
"""
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_DFLT_PHY,
struct.pack('<BBB', all_phys,
tx_phys, rx_phys))
logging.debug("%s %s", self.cmd_le_set_dflt_phy.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_phy(self, conn_handle: int, all_phys: int = 0,
tx_phys: int = 0, rx_phys: int = 0,
phy_options: int = 0):
""" Default:
conn_handle: Range 0x0000 to 0x0EFF
all_phys: The Host has no preference among the transmitter PHYs
supported by the Controller
tx_phys: 0 - The Host prefers to use the LE 1M transmitter PHY
(possibly among others)
rx_phys: 0 - The Host prefers to use the LE 1M receiver PHY
(possibly among others)
phy_options: 0 - the Host has no preferred coding when
transmitting on the LE Coded PHY
"""
async with self.async_sem_cmd:
while conn_handle != hci.conn_handle:
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_PHY,
struct.pack('<HBBBH', conn_handle, all_phys,
tx_phys, rx_phys, phy_options))
logging.debug("%s %s", self.cmd_le_set_phy.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_vs_read_static_addr(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_VENDOR_SPECIFIC,
hci.BLE_HCI_OCF_VS_RD_STATIC_ADDR)
logging.debug("%s %s", self.cmd_vs_read_static_addr.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
""" Send data """
async def acl_data_send(self, acl_data: hci.HCI_ACL_Data_Send):
async with self.async_sem_cmd:
acl_data.connection_handle = hci.conn_handle
self.hci_send_acl_data = acl_data
await self.send(self.hci_send_acl_data.ba_full_message)
self.sent_packets_counter += 1
""" Parse and process received data"""
def parse_ev_disconn_cmp(self, data: bytes):
ev_disconn_cmp = hci.HCI_Ev_Disconn_Complete()
ev_disconn_cmp.set(*struct.unpack('<BHB', bytes(data[:4])))
return ev_disconn_cmp
def parse_ev_cmd_cmp(self, data: bytes):
ev_cmd_cmp = hci.HCI_Ev_Cmd_Complete()
ev_cmd_cmp.set(*struct.unpack('<BH', bytes(data[:3])), data[3:])
return ev_cmd_cmp
def parse_ev_cmd_stat(self, data: bytes):
ev_cmd_stat = hci.HCI_Ev_Cmd_Status()
ev_cmd_stat.set(*struct.unpack('<BBH', bytes(data[:4])))
return ev_cmd_stat
def parse_ev_le_meta(self, data: bytes):
ev_le_meta = hci.HCI_Ev_LE_Meta()
ev_le_meta.set(data[0])
return ev_le_meta
def parse_subev_le_enhcd_conn_cmp(self, data: bytes):
ev_le_enhcd_conn_cmp = hci.HCI_Ev_LE_Enhanced_Connection_Complete()
ev_le_enhcd_conn_cmp.set(*struct.unpack('<BBHBB', bytes(data[:6])),
hci.ba_addr_to_str(bytes(data[6:12])),
hci.ba_addr_to_str(bytes(data[12:18])),
hci.ba_addr_to_str(bytes(data[18:24])),
*struct.unpack('<HHHB', bytes(data[24:])))
return ev_le_enhcd_conn_cmp
def parse_subev_le_data_len_change(self, data: bytes):
ev_le_data_len_change = hci.HCI_Ev_LE_Data_Length_Change()
ev_le_data_len_change.set(*struct.unpack('<BHHHHH', bytes(data[:11])))
return ev_le_data_len_change
def parse_subev_le_phy_update_cmp(self, data: bytes):
le_phy_update_cmp = hci.HCI_Ev_LE_PHY_Update_Complete()
le_phy_update_cmp.set(*struct.unpack('<BBHBB', data))
return le_phy_update_cmp
def parse_subev_le_chan_sel_alg(self, data: bytes):
le_chan_sel_alg = hci.HCI_Ev_LE_Chan_Sel_Alg()
le_chan_sel_alg.set(*struct.unpack('<BHB', data))
return le_chan_sel_alg
def parse_num_comp_pkts(self, data: bytes):
hci.ev_num_comp_pkts = hci.HCI_Number_Of_Completed_Packets()
hci.ev_num_comp_pkts.set(*struct.unpack('<BHH', bytes(data[:5])))
return hci.ev_num_comp_pkts
def process_returned_parameters(self):
def status() -> int:
current_ev_name = type(self.hci_recv_ev_packet.current_event).__name__
if current_ev_name == type(hci.HCI_Ev_Cmd_Complete()).__name__:
return struct.unpack_from("<B",
self.hci_recv_ev_packet.current_event.return_parameters,
offset=0)[0]
elif current_ev_name == type(hci.HCI_Ev_Cmd_Status()).__name__:
return self.hci_recv_ev_packet.current_event.status
else:
return -100
current_ev = self.hci_recv_ev_packet.current_event
ogf, ocf = hci.get_ogf_ocf(current_ev.opcode)
if ogf == hci.OGF_HOST_CTL:
if ocf == hci.OCF_SET_EVENT_MASK:
return status()
elif ocf == hci.OCF_RESET:
return status()
elif ogf == hci.OGF_INFO_PARAM:
if ocf == hci.OCF_READ_LOCAL_COMMANDS:
return status()
elif ocf == hci.OCF_READ_BD_ADDR:
hci.bdaddr = hci.ba_addr_to_str(
bytes(current_ev.return_parameters[1:7]))
return status()
elif ogf == hci.OGF_LE_CTL:
if ocf == hci.OCF_LE_SET_EVENT_MASK:
return status()
elif ocf == hci.OCF_LE_READ_BUFFER_SIZE_V1:
hci.le_read_buffer_size = hci.LE_Read_Buffer_Size()
hci.le_read_buffer_size.set(*struct.unpack("<BHB",
current_ev.return_parameters))
logging.info(f"LE Buffer size: {hci.le_read_buffer_size}")
return hci.le_read_buffer_size.status
elif ocf == hci.OCF_LE_SET_RANDOM_ADDRESS:
return status()
elif ocf == hci.OCF_LE_SET_ADVERTISING_PARAMETERS:
return status()
elif ocf == hci.OCF_LE_SET_ADVERTISE_ENABLE:
return status()
elif ocf == hci.OCF_LE_SET_SCAN_PARAMETERS:
return status()
elif ocf == hci.OCF_LE_SET_SCAN_ENABLE:
return status()
elif ocf == hci.OCF_LE_CREATE_CONN:
return status()
elif ocf == hci.OCF_LE_SET_DATA_LEN:
return status()
elif ocf == hci.OCF_LE_READ_SUGGESTED_DFLT_DATA_LEN:
hci.suggested_dflt_data_len = hci.Suggested_Dflt_Data_Length()
hci.suggested_dflt_data_len.set(*struct.unpack("<BHH",
current_ev.return_parameters))
logging.info(f"Suggested Deafult Data Len: {hci.suggested_dflt_data_len}")
return status()
elif ocf == hci.OCF_LE_READ_MAX_DATA_LEN:
hci.max_data_len = hci.Max_Data_Length()
hci.max_data_len.set(*struct.unpack("<BHHHH",
current_ev.return_parameters))
logging.info(f"Suggested Max Data Len: {hci.max_data_len}")
if (hci.num_of_bytes_to_send > hci.max_data_len.supported_max_tx_octets - 4):
logging.critical(f"Number of bytes to send: {hci.num_of_bytes_to_send}\
not supported. Closing.")
raise SystemExit("Number of bytes to send not supported. Closing.")
return status()
elif ocf == hci.OCF_LE_READ_PHY:
hci.phy = hci.LE_Read_PHY()
hci.phy.set(*struct.unpack('<BHBB',
current_ev.return_parameters))
logging.info(f"Current LE PHY: {hci.phy}")
return status()
elif ocf == hci.OCF_LE_SET_DFLT_PHY:
return status()
elif ocf == hci.OCF_LE_SET_PHY:
return status()
elif ogf == hci.OGF_VENDOR_SPECIFIC:
if ocf == hci.BLE_HCI_OCF_VS_RD_STATIC_ADDR:
if type(current_ev).__name__ == type(hci.HCI_Ev_Cmd_Complete()).__name__:
hci.static_addr = hci.ba_addr_to_str(
bytes(current_ev.return_parameters[1:7]))
logging.info(f"Received rd static addr: {hci.static_addr}")
elif type(current_ev).__name__ == type(hci.HCI_Ev_Cmd_Status()).__name__:
logging.info(f"Rd static addr status: {current_ev.status}")
return status()
else:
return -100
def parse_acl_data(self, buffer: bytes):
packet_type, handle_pb_bc_flags, data_len = struct.unpack('<BHH',
buffer[:5])
handle = handle_pb_bc_flags & 0x0EFF
pb_flag = (handle_pb_bc_flags & 0x3000) >> 12
bc_flag = (handle_pb_bc_flags & 0xC000) >> 14
hci_recv_acl_data_packet = hci.HCI_Recv_ACL_Data_Packet()
if pb_flag == 0b10:
l2cap_data = hci.HCI_Recv_L2CAP_Data()
data = buffer[5:]
l2cap_data.set(*struct.unpack("<HH", data[:4]), data[4:])
else:
l2cap_data = buffer[5:]
hci_recv_acl_data_packet.set(
packet_type=packet_type,
connection_handle=handle,
pb_flag=pb_flag,
bc_flag=bc_flag,
total_data_len=data_len,
data=l2cap_data)
return hci_recv_acl_data_packet
def parse_subevent(self, subev_code: int):
if subev_code == hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_enhcd_conn_cmp(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP
elif subev_code == hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_data_len_change(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE
elif subev_code == hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_phy_update_cmp(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP
elif subev_code == hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_chan_sel_alg(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG
else:
return -1
def parse_event(self, buffer: bytes):
self.hci_recv_ev_packet.set(*struct.unpack('<BBB', bytes(buffer[:3])),
buffer[3:])
if self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_DISCONN_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_disconn_cmp(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_DISCONN_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_DISCONN_CMP
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_CMD_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_cmd_cmp(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_CMD_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_CMD_CMP
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_CMD_STATUS:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_cmd_stat(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_CMD_STATUS,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_CMD_STATUS
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_LE_META_EVENT:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_le_meta(self.hci_recv_ev_packet.recv_data)
return hci.HCI_EV_CODE_LE_META_EVENT
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_NUM_COMP_PKTS:
self.hci_recv_ev_packet.current_event = \
self.parse_num_comp_pkts(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_NUM_COMP_PKTS,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_NUM_COMP_PKTS
else:
return -1
async def handle_event(self, buffer: bytes):
event_code = self.parse_event(buffer)
curr_ev = self.hci_recv_ev_packet.current_event
if event_code == hci.HCI_EV_CODE_DISCONN_CMP:
logging.debug("Received code: %s - HCI_EV_CODE_DISCONN_CMP", event_code)
logging.debug("Status: %s for event: %s - HCI_EV_CODE_DISCONN_CMP",
curr_ev.status, self.hci_recv_ev_packet.current_event)
if curr_ev.reason == hci.CONN_FAILED_TO_BE_ESTABLISHED:
logging.error(f"Connection failed to be established. Exiting...")
raise Exception("Connection failed to be established. Exiting...")
if curr_ev.reason == hci.CONN_TIMEOUT:
logging.error(f"Connection timeout. Exiting...")
raise Exception("Connection timeout. Exiting...")
elif event_code == hci.HCI_EV_CODE_CMD_CMP:
logging.debug("Received code: %s - HCI_EV_CODE_CMD_CMP", event_code)
sent_opcode = self.hci_send_cmd.opcode
recv_opcode = curr_ev.opcode
if sent_opcode == recv_opcode:
status = self.process_returned_parameters()
if status != 0:
logging.error("Status: %s for event: %s - HCI_EV_CODE_CMD_CMP", status, curr_ev)
self.async_ev_cmd_end.set()
elif event_code == hci.HCI_EV_CODE_CMD_STATUS:
logging.debug("Received code: %s - HCI_EV_CODE_CMD_STATUS", event_code)
sent_opcode = self.hci_send_cmd.opcode
recv_opcode = curr_ev.opcode
if sent_opcode == recv_opcode:
status = self.process_returned_parameters()
if status != 0:
logging.error("Status: %s for event: %s", status, curr_ev)
self.async_ev_cmd_end.set()
elif event_code == hci.HCI_EV_CODE_LE_META_EVENT:
logging.debug("Received code: %s - HCI_EV_CODE_LE_META_EVENT", event_code)
subev_code = self.parse_subevent(curr_ev.subevent_code)
if subev_code == hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP:
logging.debug("Received subev code: %s - HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP", subev_code)
hci.conn_handle = self.hci_recv_ev_packet.current_event.connection_handle
if self.async_ev_connected.is_set() == False:
logging.info("Connection established. Event received.")
self.async_ev_connected.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE:
logging.debug("Received subev code: %s - HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE", subev_code)
self.async_ev_set_data_len.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP:
logging.debug("Received subev code: %s - HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP", subev_code)
self.async_ev_update_phy.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG:
logging.debug("Received subev code: %s - HCI_SUBEV_CODE_LE_CHAN_SEL_ALG", subev_code)
elif subev_code < 0:
logging.warning(f"Unknown received subevent: {buffer}\n")
elif event_code == hci.HCI_EV_NUM_COMP_PKTS:
logging.debug("Received code: %s - HCI_EV_NUM_COMP_PKTS", event_code)
async with self.async_lock_packets_cnt:
hci.num_of_completed_packets_cnt += curr_ev.num_completed_packets
hci.num_of_completed_packets_time = time.perf_counter()
self.async_ev_num_cmp_pckts.set()
if event_code < 0:
logging.warning(f"Unknown received event: {buffer}\n")
else:
logging.debug("%s \t%s ", self.handle_event.__name__, self.hci_recv_ev_packet)
def match_recv_l2cap_data(self, buffer: bytes, timestamp: int):
self.expected_recv_data += self.tp.predef_packet_key
packet_key = struct.unpack("<I", buffer[-4:])[0]
result = self.expected_recv_data == packet_key
if result:
self.valid_recv_data += 1
logging.info(f"L2CAP packet number - Received: {packet_key}, \
Expected: {self.expected_recv_data}, Result: {result}")
packet_number = (packet_key / self.tp.predef_packet_key) - 1
self.tp.append_to_csv_file(timestamp, packet_number)
# if self.tp and self.device_mode == "rx":
# if timestamp - self.last_timestamp > self.tp.sample_time \
# or packet_number == 0 \
# or packet_number == self.tp.total_packets_number-1:
# self.tp.record_throughput(packet_number, timestamp)
# self.last_timestamp = timestamp
if packet_number >= self.tp.total_packets_number - 1:
self.async_ev_recv_data_finish.set()
def handle_acl_data(self, buffer: bytes, timestamp: int):
hci_recv_acl_data_packet = self.parse_acl_data(buffer)
logging.debug("%s", hci_recv_acl_data_packet)
recv_data_type = type(hci_recv_acl_data_packet.data).__name__
if recv_data_type == 'HCI_Recv_L2CAP_Data':
self.match_recv_l2cap_data(buffer, timestamp)
async def recv_handler(self):
while not self.rx_buffer_q.empty():
q_buffer_item, q_timestamp = self.rx_buffer_q.get()
packet_type = struct.unpack('<B', bytes(q_buffer_item[:1]))[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.handle_acl_data(q_buffer_item, q_timestamp)
elif packet_type == hci.HCI_EVENT_PACKET:
await self.loop.create_task(self.handle_event(q_buffer_item))
+323
View File
@@ -0,0 +1,323 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import hci
import hci_commands
import logging
import argparse
import sys
import asyncio
import struct
import throughput as tp
import traceback
import yaml
import util
import transport_factory
import signal
show_tp_plots = False
test_dir = None
class ParentCalledException(KeyboardInterrupt):
""" This exception is raised when e.g. parent process sends signal.
This allows to terminate processes correctly. """
pass
def parse_arguments():
parser = argparse.ArgumentParser(
description='HCI device with User Channel Socket',
epilog='Start a device according to predefined mode (receiver/transmitter). \
The initialization of the device is based on received parameters \
or predefined init.yaml and config.yaml files.\
The tx device will try to connect to rx device and send data. \
After completion the throughput plots will pop up. \
How to run the python scripts, first specifying all params: \
sudo python hci_device.py -m rx -oa 00:00:00:00:00:00 -oat 0 -di 0 \
-pa 00:00:00:00:00:00 -pat 0 -pdi 0 -cf config.yaml\
or, if present, specifying init.yaml file \
sudo python main.py -m tx -if init.yaml')
parser.add_argument('-m', '--mode', type=str, nargs="?",
help='device mode - receiver, transmitter',
choices=['rx', 'tx'])
parser.add_argument('-if', '--init_file', type=str, nargs="?",
help='yaml init file, e.g.: -f init.yaml',
default="init.yaml")
parser.add_argument('-oa', '--own_addr', type=str, nargs="?",
help='device own address, e.g.: -oa 00:00:00:00:00:00')
parser.add_argument('-oat', '--own_addr_type', type=int, nargs="?",
help='device own address type, public e.g.: -oat 0')
parser.add_argument('-di', '--dev_idx', type=str, nargs="?",
help='device own hci index, hci0 e.g.: -ohi 0')
parser.add_argument('-pa', '--peer_addr', type=str, nargs="?",
help='peer device address, e.g.: -pa 00:00:00:00:00:00')
parser.add_argument('-pat', '--peer_addr_type', type=int, nargs="?",
help='peer device own address type, public e.g.: -pat 0')
parser.add_argument('-pdi', '--peer_dev_idx', type=str, nargs="?",
help='peer device index, e.g. hci0: -phi 0')
parser.add_argument('-cf', '--config_file', type=str, nargs="?",
help='yaml config file, e.g.: -f config.yaml',
default="config.yaml")
try:
args = parser.parse_args()
return args
except Exception as e:
logging.error(traceback.format_exc())
sys.exit()
async def init(bt_dev: hci_commands.HCI_Commands, ini: dict):
""" init: Assumed to be the same for all devices """
asyncio.create_task(bt_dev.rx_buffer_q_wait())
await bt_dev.cmd_reset()
if ini["own_address_type"]:
await bt_dev.cmd_le_set_random_addr(ini["own_address"])
await bt_dev.cmd_set_event_mask(mask=0x200080000204e090)
await bt_dev.cmd_le_set_event_mask(mask=0x00000007FFFFFFFF)
await bt_dev.cmd_le_set_dflt_phy(all_phys=0, tx_phys=2, rx_phys=2)
await bt_dev.cmd_le_read_buffer_size()
await bt_dev.cmd_le_read_max_data_len()
async def finish(bt_dev: hci_commands.HCI_Commands, cfg: dict):
logging.info("Received %s good packets", bt_dev.valid_recv_data)
if bt_dev.tp:
if show_tp_plots:
bt_dev.tp.plot_tp_from_file(sample_time = cfg["tp"]["sample_time"])
if bt_dev.device_mode == "rx":
bt_dev.tp.save_average()
util.copy_log_files_to_test_directory(test_dir)
logging.info(f"Correctly received: {bt_dev.valid_recv_data}")
logging.info(f"Sent packets: {bt_dev.sent_packets_counter}")
bt_dev.async_ev_rx_wait_finish.set()
# Wait for rx_buffer_q_wait task to finish and socket to close
await asyncio.sleep(1)
async def async_main_rx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict):
await init(bt_dev, ini)
bt_dev.tp = tp.Throughput(name="tp_receiver", mode=bt_dev.device_mode,
total_packets_number=hci.num_of_packets_to_send,
bytes_number_in_packet=hci.num_of_bytes_to_send,
throughput_data_type=cfg["tp"]["data_type"],
flag_plot_packets=cfg["tp"]["flag_plot_packets"],
sample_time=cfg["tp"]["sample_time"],
test_directory=test_dir)
############
# ADVERTISE
############
adv_params = hci.HCI_Advertising()
adv_params.set(
advertising_interval_min = cfg["adv"]["advertising_interval_min"],
advertising_interval_max = cfg["adv"]["advertising_interval_max"],
advertising_type = cfg["adv"]["advertising_type"],
own_address_type = ini["own_address_type"],
peer_address_type = ini["peer_address_type"],
peer_address = cfg["adv"]["peer_address"],
advertising_channel_map = cfg["adv"]["advertising_channel_map"],
advertising_filter_policy = cfg["adv"]["advertising_filter_policy"]
)
await bt_dev.cmd_le_set_advertising_params(adv_params)
await bt_dev.cmd_le_set_advertising_enable(1)
await hci_commands.wait_for_event(bt_dev.async_ev_connected, hci.WAIT_FOR_EVENT_CONN_TIMEOUT)
await bt_dev.cmd_le_set_data_len(hci.conn_handle, tx_octets=0, tx_time=0)
await hci_commands.wait_for_event(bt_dev.async_ev_set_data_len, hci.WAIT_FOR_EVENT_TIMEOUT)
logging.debug("Before finish event")
await asyncio.shield(bt_dev.async_ev_recv_data_finish.wait())
logging.debug("after finish event")
bt_dev.async_ev_recv_data_finish.clear()
await bt_dev.cmd_le_set_advertising_enable(0)
await finish(bt_dev, cfg)
async def async_main_tx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict):
await init(bt_dev, ini)
conn_params = hci.HCI_Connect()
conn_params.set(
le_scan_interval = cfg["conn"]["le_scan_interval"],
le_scan_window = cfg["conn"]["le_scan_window"],
initiator_filter_policy = cfg["conn"]["initiator_filter_policy"],
peer_address_type = ini['peer_address_type'],
peer_address = ini['peer_address'],
own_address_type = ini['own_address_type'],
connection_interval_min = cfg["conn"]["connection_interval_min"],
connection_interval_max = cfg["conn"]["connection_interval_max"],
max_latency = cfg["conn"]["max_latency"],
supervision_timeout = cfg["conn"]["supervision_timeout"],
min_ce_length = cfg["conn"]["min_ce_length"],
max_ce_length = cfg["conn"]["max_ce_length"]
)
await bt_dev.cmd_le_create_connection(conn_params)
await hci_commands.wait_for_event(bt_dev.async_ev_connected, hci.WAIT_FOR_EVENT_CONN_TIMEOUT)
await bt_dev.cmd_le_set_data_len(hci.conn_handle, tx_octets=0, tx_time=0)
await hci_commands.wait_for_event(bt_dev.async_ev_set_data_len, hci.WAIT_FOR_EVENT_TIMEOUT)
if cfg["phy"] == "1M":
await bt_dev.cmd_le_set_phy(hci.conn_handle, all_phys=0, tx_phys=1, rx_phys=1, phy_options=0)
elif cfg["phy"] == "2M":
await bt_dev.cmd_le_set_phy(hci.conn_handle, all_phys=0, tx_phys=2, rx_phys=2, phy_options=0)
else:
raise Exception("PHY parameter not valid.")
await hci_commands.wait_for_event(bt_dev.async_ev_update_phy, hci.WAIT_FOR_EVENT_TIMEOUT)
############
# L2CAP SEND
############
l2cap_data = hci.L2CAP_Data_Send()
acl_data = hci.HCI_ACL_Data_Send()
packets_to_send = hci.num_of_packets_to_send
packet_credits = hci.le_read_buffer_size.total_num_le_acl_data_packets
fmt = "<" + str(hci.num_of_bytes_to_send) + "B"
data = struct.pack(fmt, *([0] * hci.num_of_bytes_to_send))
last_value = 0
sent_packets = 0
tx_sent_timestamps = []
bt_dev.tp = tp.Throughput(name="tp_transmitter", mode=bt_dev.device_mode,
total_packets_number=hci.num_of_packets_to_send,
bytes_number_in_packet=hci.num_of_bytes_to_send,
throughput_data_type=cfg["tp"]["data_type"],
flag_plot_packets=cfg["tp"]["flag_plot_packets"],
sample_time=cfg["tp"]["sample_time"],
test_directory=test_dir)
async with bt_dev.async_lock_packets_cnt:
hci.num_of_completed_packets_cnt = 0
while sent_packets < hci.num_of_packets_to_send:
if packet_credits > 0 and packets_to_send > 0:
data, last_value = tp.gen_data(hci.num_of_bytes_to_send, last_value)
l2cap_data.set(channel_id=0x0044, data=data)
acl_data.set(connection_handle=hci.conn_handle, pb_flag=0b00, bc_flag=0b00,
data=l2cap_data.ba_full_message)
await bt_dev.acl_data_send(acl_data)
async with bt_dev.async_lock_packets_cnt:
packets_to_send -= 1
packet_credits -= 1
else:
logging.info(f"Waiting for num_of_cmp_packets event")
await bt_dev.async_ev_num_cmp_pckts.wait()
bt_dev.async_ev_num_cmp_pckts.clear()
if hci.num_of_completed_packets_cnt > 0:
async with bt_dev.async_lock_packets_cnt:
sent_packets += hci.num_of_completed_packets_cnt
tx_sent_timestamps.append((hci.num_of_completed_packets_time,
sent_packets))
logging.info(f"Sent : {sent_packets}")
packet_credits += hci.num_of_completed_packets_cnt
hci.num_of_completed_packets_cnt = 0
for timestamp in tx_sent_timestamps:
bt_dev.tp.append_to_csv_file(*timestamp)
await finish(bt_dev, cfg)
def parse_cfg_files(args) -> dict:
if args.init_file is None:
ini = {
"own_address": args.own_addr,
"own_address_type": args.own_addr_type,
"dev_index": str(args.dev_idx),
"peer_address": args.peer_addr,
"peer_address_type": args.peer_addr_type,
"peer_dev_index": args.peer_dev_idx
}
else:
with open(args.init_file, "r") as file:
init_file = yaml.safe_load(file)
ini = init_file[args.mode]
global test_dir
test_dir = init_file["test_dir"]
with open(args.config_file) as f:
cfg = yaml.safe_load(f)
global show_tp_plots
hci.num_of_bytes_to_send = cfg["num_of_bytes_to_send"]
hci.num_of_packets_to_send = cfg["num_of_packets_to_send"]
show_tp_plots = cfg["show_tp_plots"]
return ini, cfg
def signal_handler(signum, frame):
logging.critical(f"Received signal: {signal.Signals(signum).name}")
raise ParentCalledException(f"Received signal: {signal.Signals(signum).name}")
def main():
args = parse_arguments()
ini, cfg = parse_cfg_files(args)
log_path = f"log/log_{args.mode}.log"
transport = None
try:
util.configure_logging(log_path, clear_log_file=True)
loop = asyncio.get_event_loop()
loop.set_debug(True)
transport = transport_factory.TransportFactory(device_index=ini['dev_index'],
device_mode=args.mode,
asyncio_loop=loop)
signal.signal(signal.SIGTERM, signal_handler)
bt_dev = hci_commands.HCI_Commands(send=transport.send,
rx_buffer_q=transport.rx_buffer_q,
asyncio_loop=loop,
device_mode=args.mode)
transport.start()
if args.mode == 'rx':
loop.run_until_complete(async_main_rx(bt_dev, ini, cfg))
elif args.mode == 'tx':
loop.run_until_complete(async_main_tx(bt_dev, ini, cfg))
except Exception as e:
logging.error(traceback.format_exc())
except (KeyboardInterrupt or ParentCalledException):
logging.critical("Hard exit triggered.")
logging.error(traceback.format_exc())
finally:
if transport != None:
transport.stop()
sys.exit()
if __name__ == '__main__':
main()
+164
View File
@@ -0,0 +1,164 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import hci
import socket
import ctypes
import struct
import asyncio
import logging
import subprocess
import sys
import time
import multiprocessing
SOCKET_RECV_BUFFER_SIZE = 425984
SOCKET_RECV_TIMEOUT = 3
def btmgmt_dev_reset(index):
logging.info(f"Selecting index {index}")
proc = subprocess.Popen(['btmgmt', '-i', str(index), 'power', 'off'],
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
class BindingError(Exception):
pass
class HCI_User_Channel_Socket_Error(BaseException):
pass
class HCI_User_Channel_Socket():
def __init__(self, device_index=0, device_mode=None,
asyncio_loop=None):
logging.debug("Device index: %s, Device address: %s", device_index, device_mode)
self.loop = asyncio_loop
self.libc = ctypes.cdll.LoadLibrary('libc.so.6')
self.rx_buffer_q = multiprocessing.Manager().Queue()
self.counter = 0
self.device_index = device_index
self.device_mode = device_mode
self.hci_socket = self.socket_create()
self.socket_bind(self.device_index)
self.socket_clear()
self.listener_proc = None
self.listener_ev = multiprocessing.Manager().Event()
def socket_create(self):
logging.debug("%s", self.socket_create.__name__)
new_socket = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW | socket.SOCK_NONBLOCK,
socket.BTPROTO_HCI)
if new_socket == None:
raise HCI_User_Channel_Socket_Error("Socket error. \
Opening socket failed")
new_socket.setblocking(False)
socket_size = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
logging.info(f"Default socket recv buffer size: {socket_size}")
new_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 500000)
socket_size = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
logging.info(f"Set socket recv buffer size: {socket_size}")
return new_socket
def socket_bind(self, index):
logging.debug("%s index: %s", self.socket_bind.__name__, index)
# addr: struct sockaddr_hci from /usr/include/bluetooth/hci.h
addr = struct.pack('HHH', hci.AF_BLUETOOTH, index, hci.HCI_CHANNEL_USER)
retry_binding=2
for i in range(retry_binding):
try:
bind = self.libc.bind(self.hci_socket.fileno(),
ctypes.cast(addr,
ctypes.POINTER(ctypes.c_ubyte)),
len(addr))
if bind != 0:
raise BindingError
except BindingError:
logging.warning("Binding error. Trying to reset bluetooth.")
btmgmt_dev_reset(self.device_index)
if i < retry_binding - 1:
continue
else:
self.hci_socket.close()
logging.error("Binding error. Check HCI index present.")
sys.exit()
logging.info("Binding done!")
break
def socket_clear(self):
logging.debug("%s", self.socket_clear.__name__)
try:
logging.info("Clearing the buffer...")
time.sleep(1)
cnt = 0
while True:
buff = self.hci_socket.recv(SOCKET_RECV_BUFFER_SIZE)
cnt += len(buff)
logging.debug(f"Read from buffer {cnt} bytes")
except BlockingIOError:
logging.info("Buffer empty and ready!")
return
async def send(self, ba_message):
await self.loop.sock_sendall(self.hci_socket, ba_message)
def socket_listener(self):
recv_at_once = 0
while True:
try:
if self.listener_ev.is_set():
logging.info("listener_ev set")
break
buffer = self.hci_socket.recv(SOCKET_RECV_BUFFER_SIZE)
logging.info(f"Socket recv: {self.counter} th packet with len: {len(buffer)}")
self.rx_buffer_q.put((buffer, time.perf_counter()))
recv_at_once +=1
self.counter +=1
except BlockingIOError:
if recv_at_once > 1:
logging.info(f"Socket recv in one loop: {recv_at_once}")
recv_at_once = 0
pass
except BrokenPipeError:
logging.info("BrokenPipeError: Closing...")
print("BrokenPipeError. Press Ctrl-C to exit...")
def close(self):
logging.debug("%s ", self.close.__name__)
return self.hci_socket.close()
def start(self):
self.listener_proc = multiprocessing.Process(target=self.socket_listener,
daemon=True)
self.listener_proc.start()
logging.info(f"start listener_proc pid: {self.listener_proc.pid}")
def stop(self):
logging.info(f"stop listener_proc pid: {self.listener_proc.pid}")
self.listener_ev.set()
self.listener_proc.join()
self.close()
+34
View File
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
rx:
dev_index: '1'
own_address_type: 1
own_address: C0:0D:A5:1A:98:EF
peer_dev_index: '2'
peer_address_type: 1
peer_address: FE:69:8F:77:2F:49
tx:
dev_index: '2'
own_address_type: 1
own_address: FE:69:8F:77:2F:49
peer_dev_index: '1'
peer_address_type: 1
peer_address: C0:0D:A5:1A:98:EF
test_dir: /path/to/blehci_throughput/tests/Mon_May_23_12:29:10_2022
+2
View File
@@ -0,0 +1,2 @@
*
!.gitignore
+243
View File
@@ -0,0 +1,243 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import multiprocessing
import check_addr
import argparse
import yaml
import sys
import subprocess
import traceback
import matplotlib.pyplot as plt
import csv
import util
import os
import math
PROCESS_TIMEOUT = 500 # seconds, adjust if necessary
def parse_arguments():
parser = argparse.ArgumentParser(
description='Measure throughput',
epilog='How to run python scripts: \
sudo python main.py -i 0 1 -m rx tx -cf config.yaml\
then hci0 -> rx and hci1 -> tx')
parser.add_argument('-i', '--indexes', type=str, nargs='*',
help='specify adapters indexes', default=[0, 1])
parser.add_argument('-m', '--modes', type=str, nargs="*",
help='devices modes - receiver, transmitter',
choices=['rx', 'tx'], default=['rx', 'tx'])
parser.add_argument('-cf', '--config_file', type=str, nargs="*",
help='configuration file for devices',
default=["config.yaml"])
try:
args = parser.parse_args()
except Exception as e:
print(traceback.format_exc())
print(f"Indexes: {args.indexes}")
print(f"Modes: {args.modes}")
return args
def get_dev_addr_and_type(hci_indexes: list):
if (len(hci_indexes) != 2):
raise Exception("HCI index error.")
manager = multiprocessing.Manager()
addr_list = manager.list()
check_addrs_proc = multiprocessing.Process(target=check_addr.check_addr,
name="Check addresses",
args=(hci_indexes, addr_list))
check_addrs_proc.start()
print("check_addrs_proc pid: ", check_addrs_proc.pid)
check_addrs_proc.join()
dev_addr_type_list = []
for i in range(0, len(addr_list)):
dev_addr_type_list.append((hci_indexes[i],) + addr_list[i])
return dev_addr_type_list
def change_config_var(filename: str, group: str, variable: str,
new_value: int):
with open(filename, "r") as file:
cfg = yaml.safe_load(file)
if group:
cfg[group][variable] = new_value
else:
cfg[variable] = new_value
with open(filename, "w") as file:
yaml.safe_dump(cfg, file, indent=1, sort_keys=False,
default_style=None, default_flow_style=False)
def get_init_dict(filename: str, args_list: list, modes: list, dir: str):
ini = {
modes[0]:{
"dev_index": args_list[0][0],
"own_address_type": args_list[0][1],
"own_address": args_list[0][2],
"peer_dev_index": args_list[1][0],
"peer_address_type": args_list[1][1],
"peer_address": args_list[1][2]
},
modes[1]:{
"dev_index": args_list[1][0],
"own_address_type": args_list[1][1],
"own_address": args_list[1][2],
"peer_dev_index": args_list[0][0],
"peer_address_type": args_list[0][1],
"peer_address": args_list[0][2]
},
"test_dir": dir
}
with open(filename, 'w') as file:
yaml.safe_dump(ini, file, indent=1, sort_keys=False)
return ini
def run_once(modes: list, cfg_file: str, init_file: str):
list_proc = []
for mode in modes:
proc = subprocess.Popen(["python", "hci_device.py", "-m",
mode, "-if", init_file, "-cf", cfg_file])
print("start subprocess pid: ", proc.pid)
list_proc.append(proc)
try:
for proc in list_proc:
proc.wait(PROCESS_TIMEOUT)
except subprocess.TimeoutExpired:
for proc in list_proc:
print("TimeoutExpired subprocess pid: ", proc.pid)
proc.terminate()
for proc in list_proc:
proc.wait()
return -1
for proc in list_proc:
print("stop subprocess pid: ", proc.pid)
proc.terminate()
proc.wait()
return 0
def testing_variable_influence(cfg: dict, modes: list, cfg_file: str,
init_file: str, init_dict: dict, save_to_file: bool):
tp_test_counter = 1
changed_params_list = []
averages = []
cfg_group = cfg["test"]["change_param_group"]
cfg_variable = cfg["test"]["change_param_variable"]
cfg_start_val = cfg["test"]["start_value"]
cfg_stop_val = cfg["test"]["stop_value"]
cfg_step = cfg["test"]["step"]
data_type = cfg["tp"]["data_type"]
total_iterations = math.ceil((cfg_stop_val - cfg_start_val) / cfg_step)
average_tp_csv_path = init_dict["test_dir"] + "/average_rx_tp.csv"
with open(average_tp_csv_path, "w") as file:
file.write(f"Average throughput [{data_type}ps]\n")
for i in range(cfg_start_val, cfg_stop_val, cfg_step):
changed_params_list.append(i)
if cfg_group and cfg_variable:
print(f"Current param value: {i}")
num_of_params_to_change = len(cfg_variable)
for j in range(0, num_of_params_to_change):
change_config_var(filename=cfg_file, group=cfg_group[j],
variable=cfg_variable[j], new_value=i)
print(f"Running test: {tp_test_counter}/{total_iterations}...")
rc = run_once(modes, cfg_file, init_file)
if rc != 0:
print(f"Test {i} failed. Closing...")
return
tp_test_counter += 1
with open(average_tp_csv_path, "r") as file:
csv_reader = csv.reader(file)
next(csv_reader)
for row in csv_reader:
averages.append(float(*row))
fig, ax = plt.subplots()
ax.plot(changed_params_list[:len(averages)], averages, '-k')
ax.set_ylabel(f"Average throughput [{data_type}/s]")
ax.set_xlabel("Changed parameter/next iteration")
ax.set_title("Average througput")
if save_to_file:
name = init_dict["test_dir"] + "/average_tps"
plt.savefig(fname=name, format='png')
plt.show(block=True)
def main():
args = parse_arguments()
init_file = "init.yaml"
cfg_file = args.config_file[0]
with open(cfg_file, "r") as file:
cfg = yaml.safe_load(file)
addr_list = get_dev_addr_and_type(args.indexes)
if len(addr_list) != len(args.indexes):
raise Exception("No device address received. Check HCI indexes.")
print(f"Received: {addr_list}")
test_dir_path = util.create_test_directory()
init_dict = get_init_dict(filename=init_file, args_list=addr_list,
modes=args.modes, dir=test_dir_path)
util.copy_config_files_to_test_directory([init_file, cfg_file],
init_dict["test_dir"])
try:
if cfg["flag_testing"]:
testing_variable_influence(cfg, args.modes, *args.config_file,
init_file, init_dict, True)
else:
print(f"Running test...")
rc = run_once(args.modes, cfg_file, init_file)
if rc != 0:
print("Test failed.")
print("Finished. Closing...")
except KeyboardInterrupt:
pass
except Exception as e:
print(traceback.format_exc())
finally:
# Set default ownership for dirs and files
util.set_default_chmod_recurs(os.getcwd() + "/tests")
sys.exit()
if __name__ == "__main__":
main()
+3
View File
@@ -0,0 +1,3 @@
matplotlib==3.1.2
PyYAML==6.0
libusb1
@@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
pkg.name: tools/hci_throughput/targets/nordic_pca10040_blehci
pkg.type: target
pkg.description:
pkg.author:
pkg.homepage:
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
syscfg.vals:
BLE_LL_CFG_FEAT_DATA_LEN_EXT: 1
BLE_LL_CFG_FEAT_LE_2M_PHY: 1
BLE_LL_HCI_VS_EVENT_ON_ASSERT: 1
MSYS_1_BLOCK_COUNT: 80
MSYS_1_BLOCK_SIZE: 308
BLE_TRANSPORT_ACL_COUNT: 80
BLE_TRANSPORT_ACL_SIZE: 255
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
target.app: "@apache-mynewt-nimble/apps/blehci"
target.bsp: "@apache-mynewt-core/hw/bsp/nordic_pca10040"
target.build_profile: optimized
@@ -0,0 +1,6 @@
pkg.name: targets/nordic_pca10040_boot
pkg.type: target
pkg.description:
pkg.author:
pkg.homepage:
@@ -0,0 +1,3 @@
target.app: "@mcuboot/boot/mynewt"
target.bsp: "@apache-mynewt-core/hw/bsp/nordic_pca10040"
target.build_profile: optimized
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
pkg.name: tools/hci_throughput/targets/nordic_pca10056_blehci
pkg.type: target
pkg.description:
pkg.author:
pkg.homepage:
pkg.deps:
- "@apache-mynewt-core/hw/usb/tinyusb/std_descriptors"
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
syscfg.vals:
BLE_LL_CFG_FEAT_DATA_LEN_EXT: 1
BLE_LL_CFG_FEAT_LE_2M_PHY: 1
BLE_LL_HCI_VS_EVENT_ON_ASSERT: 1
BLE_TRANSPORT_HS: usb
USBD_VID: 0xDCAB
USBD_PID: 0x1234
USBD_BTH: 1
USBD_PRODUCT_STRING: '"throughput"'
MSYS_1_BLOCK_COUNT: 80
MSYS_1_BLOCK_SIZE: 308
BLE_TRANSPORT_ACL_COUNT: 80
BLE_TRANSPORT_ACL_SIZE: 255
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
target.app: "@apache-mynewt-nimble/apps/blehci"
target.bsp: "@apache-mynewt-core/hw/bsp/nordic_pca10056"
target.build_profile: optimized
@@ -0,0 +1,6 @@
pkg.name: targets/nordic_pca10056_boot
pkg.type: target
pkg.description:
pkg.author:
pkg.homepage:
@@ -0,0 +1,3 @@
target.app: "@mcuboot/boot/mynewt"
target.bsp: "@apache-mynewt-core/hw/bsp/nordic_pca10056"
target.build_profile: optimized
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
pkg.name: tools/hci_throughput/targets/nordic_pca10059_blehci
pkg.type: target
pkg.description:
pkg.author:
pkg.homepage:
pkg.deps:
- "@apache-mynewt-core/hw/usb/tinyusb/std_descriptors"
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
syscfg.vals:
BLE_LL_CFG_FEAT_DATA_LEN_EXT: 1
BLE_LL_CFG_FEAT_LE_2M_PHY: 1
BLE_LL_HCI_VS_EVENT_ON_ASSERT: 1
BLE_TRANSPORT_HS: usb
USBD_VID: 0xDCAB
USBD_PID: 0x1234
USBD_BTH: 1
USBD_PRODUCT_STRING: '"throughput_dongle"'
MSYS_1_BLOCK_COUNT: 80
MSYS_1_BLOCK_SIZE: 308
BLE_TRANSPORT_ACL_COUNT: 80
BLE_TRANSPORT_ACL_SIZE: 255
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
target.app: "@apache-mynewt-nimble/apps/blehci"
target.bsp: "@apache-mynewt-core/hw/bsp/nordic_pca10059"
target.build_profile: optimized
@@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
pkg.name: tools/hci_throughput/targets/nrf52832_blehci
pkg.type: target
pkg.description:
pkg.author:
pkg.homepage:
@@ -0,0 +1,26 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
syscfg.vals:
BLE_LL_CFG_FEAT_LE_2M_PHY: 1
MSYS_1_BLOCK_COUNT: 80
MSYS_1_BLOCK_SIZE: 308
BLE_TRANSPORT_ACL_COUNT: 80
BLE_TRANSPORT_ACL_SIZE: 255
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
target.app: "@apache-mynewt-nimble/apps/blehci"
target.bsp: "@apache-mynewt-core/hw/bsp/nordic_pca10040"
target.build_profile: optimized
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
pkg.name: tools/hci_throughput/targets/nrf52840_blehci
pkg.type: target
pkg.description:
pkg.author:
pkg.homepage:
pkg.deps:
- "@apache-mynewt-core/hw/usb/tinyusb/std_descriptors"
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
syscfg.vals:
BLE_LL_CFG_FEAT_DATA_LEN_EXT: 1
BLE_LL_CFG_FEAT_LE_2M_PHY: 1
BLE_LL_HCI_VS_EVENT_ON_ASSERT: 1
BLE_TRANSPORT_HS: usb
USBD_VID: 0xDCAB
USBD_PID: 0x1234
USBD_BTH: 1
USBD_PRODUCT_STRING: '"throughput"'
MSYS_1_BLOCK_COUNT: 80
MSYS_1_BLOCK_SIZE: 308
BLE_TRANSPORT_ACL_COUNT: 80
BLE_TRANSPORT_ACL_SIZE: 255
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
target.app: "@apache-mynewt-nimble/apps/blehci"
target.bsp: "@apache-mynewt-core/hw/bsp/nordic_pca10056"
target.build_profile: optimized
+2
View File
@@ -0,0 +1,2 @@
*
!.gitignore
+197
View File
@@ -0,0 +1,197 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import time
import matplotlib.pyplot as plt
import csv
import struct
import argparse
import traceback
def parse_arguments():
parser = argparse.ArgumentParser(
description='Plot throughput from the csv file.',
epilog='How to run script: \
python throughput.py -f tests/Wed_Apr_13_08:36:29_2022/tp_receiver.csv -s 0.1')
parser.add_argument('-f', '--file', type=str, nargs='*',
help='csv file path', default=["tp_receiver"])
parser.add_argument('-s', '--samp_t', type=float, nargs='*',
help='specify throughput sample time', default=1.0)
try:
args = parser.parse_args()
except Exception as e:
print(traceback.format_exc())
return args
data_types = ['kb', 'kB']
def gen_data(num_of_bytes_in_packet: int, last_number_from_previous_data_packet: int):
counter = last_number_from_previous_data_packet + 1
rem = num_of_bytes_in_packet % 4
valid_data_len = int((num_of_bytes_in_packet - rem) / 4)
total_data_len = valid_data_len + rem
data = [0] * total_data_len
for i in range(rem,total_data_len):
data[i] = counter
counter += 1
last_value = data[len(data)-1]
if rem:
fmt = "<" + str(rem) + "B" + str(valid_data_len) + "I"
else:
fmt = "<" + str(valid_data_len) + "I"
data_ba = struct.pack(fmt, *data)
return data_ba, last_value
class Throughput():
def __init__(self, name="tp_chart", mode="rx", total_packets_number=0, bytes_number_in_packet=0,
throughput_data_type='kb', flag_plot_packets=True, sample_time=1, test_directory=None):
self.name = name
self.mode = mode
self.total_packets_number = total_packets_number
self.bytes_number_in_packet = bytes_number_in_packet
self.predef_packet_key = int((bytes_number_in_packet - (bytes_number_in_packet % 4))/4)
self.total_bits_number = bytes_number_in_packet * 8
assert throughput_data_type in data_types
self.throughput_data_type = throughput_data_type
self.flag_plot_packets = flag_plot_packets
self.sample_time = sample_time
self.test_directory = test_directory
if self.test_directory is not None:
self.csv_file_name = self.test_directory + "/" + time.strftime("%Y_%m_%d_%H_%M_%S_") + self.name + ".csv"
else:
self.csv_file_name = time.strftime("%Y_%m_%d_%H_%M_%S_") + self.name + ".csv"
self.clean_csv_file()
def calc_throughput(self, current_num, last_num, current_time, last_time):
if self.throughput_data_type == 'kb':
return float((((current_num - last_num) * \
self.total_bits_number) / (current_time-last_time))/1000)
elif self.throughput_data_type == 'kB':
return float((((current_num - last_num) * \
self.bytes_number_in_packet) / (current_time-last_time))/1000)
def clean_csv_file(self):
file = open(self.csv_file_name, 'w')
file.write("Time,Packet\n")
def append_to_csv_file(self, timestamp: float = 0.0, packet_number: int = 0):
with open(self.csv_file_name, "a") as file:
csv_writer = csv.writer(file)
csv_writer.writerow([timestamp, packet_number])
def get_average(self, packet_numbers, timestamps):
if self.throughput_data_type == 'kb':
average_tp = ((packet_numbers * self.total_bits_number) \
/ (timestamps[-1] - timestamps[0]))/1000
elif self.throughput_data_type == 'kB':
average_tp = ((packet_numbers * self.bytes_number_in_packet) \
/ (timestamps[-1] - timestamps[0]))/1000
return average_tp
def save_average(self, tp_csv_filename = None):
if self.mode == "rx":
timestamps = []
packet_numbers = []
if tp_csv_filename is None:
tp_csv_filename = self.csv_file_name
else:
tp_csv_filename += ".csv"
with open(tp_csv_filename, "r") as file:
csv_reader = csv.reader(file)
next(csv_reader)
for row in csv_reader:
timestamps.append(float(row[0]))
packet_numbers.append(float(row[1]))
average_tp = self.get_average(packet_numbers[-1], timestamps)
print(f"Average rx throughput: {round(average_tp, 3)} {self.throughput_data_type}ps")
with open(self.test_directory + "/average_rx_tp.csv", "a") as file:
csv_writer = csv.writer(file)
csv_writer.writerow([average_tp])
def plot_tp_from_file(self, filename: str = None, sample_time: float = 1,
save_to_file: bool = True):
timestamps = []
packet_numbers = []
if filename is None:
filename = self.csv_file_name
print("Results:", filename)
with open(filename, "r") as file:
csv_reader = csv.reader(file)
next(csv_reader)
for row in csv_reader:
timestamps.append(float(row[0]))
packet_numbers.append(float(row[1]))
last_time = 0
last_number = packet_numbers[0]
throughput = []
offset = timestamps[0]
for i in range(0, len(timestamps)):
timestamps[i] -= offset
if timestamps[i] - last_time > sample_time:
throughput.append((timestamps[i],
self.calc_throughput(packet_numbers[i],
last_number,
timestamps[i],
last_time)))
last_time = timestamps[i]
last_number = packet_numbers[i]
average_tp = self.get_average(packet_numbers[-1], timestamps)
fig, ax = plt.subplots()
if self.flag_plot_packets:
ax2 = ax.twinx()
ax.plot(*zip(*throughput), 'k-')
if self.flag_plot_packets:
ax2.plot(timestamps, packet_numbers, 'b-')
ax.set_title(self.name)
ax.set_ylabel(f"Throughput [{self.throughput_data_type}/s]")
ax.set_xlabel("Time [s]")
ax.text(0.9, 1.02, f"Average: {round(average_tp, 3)}"
f"{self.throughput_data_type}ps", transform=ax.transAxes,
color='k')
if self.flag_plot_packets:
ax2 = ax2.set_ylabel(f"Packets [Max:{len(packet_numbers)}]",
color='b')
if save_to_file:
path = filename.replace(".csv", ".png")
plt.savefig(path)
plt.show(block=True)
if __name__ == "__main__":
args = parse_arguments()
tp = Throughput(bytes_number_in_packet=247)
tp.plot_tp_from_file(*args.file, args.samp_t[0], save_to_file=False)
+38
View File
@@ -0,0 +1,38 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import hci_socket
class TransportFactory:
def __init__(self, device_index=None, device_mode=None,
asyncio_loop=None) -> None:
if (type(device_index) == int or device_index.isnumeric()):
self.transport = hci_socket.HCI_User_Channel_Socket(int(device_index),
device_mode, asyncio_loop)
else:
raise Exception("No such transport found.")
self.rx_buffer_q = self.transport.rx_buffer_q
self.send = self.transport.send
def start(self):
self.transport.start()
def stop(self):
self.transport.stop()
+70
View File
@@ -0,0 +1,70 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import shutil
import time
import os
def create_test_directory():
test_dir_name = "tests/" + time.strftime("%Y_%m_%d_%H_%M_%S")
path = os.path.join(os.getcwd(), test_dir_name)
os.mkdir(path, mode=0o777)
print("Test directory: ", path)
return path
def configure_logging(log_filename, clear_log_file=True):
format_template = ("%(asctime)s %(threadName)s %(name)s %(levelname)s "
"%(filename)-25s %(lineno)-5s "
"%(funcName)-25s : %(message)s")
logging.basicConfig(format=format_template,
filename=log_filename,
filemode='a',
level=logging.DEBUG)
if clear_log_file:
with open(log_filename, "w") as f:
f.write("asctime\t\t\t\t\tthreadName name levelname filename\
\tlineno\tfuncName\t\t\t\tmessage\n")
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("matplotlib").setLevel(logging.WARNING)
def copy_config_files_to_test_directory(files: list, test_directory: str):
for file in files:
shutil.copy(file, test_directory + "/" + file)
def copy_log_files_to_test_directory(dir: str):
log_files = ["log/log_rx.log", "log/log_tx.log", "log/check_addr.log"]
for file in log_files:
shutil.copy(file, dir + "/" + time.strftime("%Y_%m_%d_%H_%M_%S_") +
file.replace("log/", ""))
# Running tests as sudo implies root permissions on created directories/files.
# This function sets the default permission mode to dirs/files in given path
# recursively.
def set_default_chmod_recurs(path):
for root, dirs, files in os.walk(path):
for d in dirs:
os.chmod(os.path.join(root, d), 0o0777)
for f in files:
os.chmod(os.path.join(root, f), 0o0777)