[tests] support testing commissioning (#3441)

Add support for testing commissioning process, including:

1. Add DTLS, Thread Discovery and commissioning messages(JOIN_FIN.req etc) parsing;
2. Support parsing log from UART response, and construct decrypted messages;
3. Common commands for commissioning related verification;
This commit is contained in:
wgtdkp
2019-02-27 07:49:57 +08:00
committed by Jonathan Hui
parent cc37bd1dfb
commit dc947f0152
13 changed files with 1420 additions and 47 deletions
+73
View File
@@ -33,11 +33,14 @@ import sys
import ipv6
import network_data
import network_layer
import common
import config
import mesh_cop
import mle
from collections import Counter
from enum import IntEnum
from network_data import Prefix, BorderRouter, LowpanId
class CheckType(IntEnum):
CONTAIN = 0
@@ -270,6 +273,7 @@ def check_parent_request(command_msg, is_first_request):
elif not scan_mask.end_device:
raise ValueError("Second parent request without E bit set")
def check_parent_response(command_msg, mle_frame_counter = CheckType.OPTIONAL):
"""Verify a properly formatted Parent Response command message.
"""
@@ -478,3 +482,72 @@ def check_address_registration_tlv(addr_reg_tlv, address_set):
"""Verify all addresses contained in address_set are contained in add_reg_tlv
"""
assert all(addr in addr_reg_tlv.addresses for addr in address_set), 'Some addresses are not included in AddressRegistration TLV'
def assert_contains_tlv(tlvs, check_type, tlv_type):
"""Assert a tlv list contains specific tlv and return the first qualified.
"""
tlvs = [tlv for tlv in tlvs if isinstance(tlv, tlv_type)]
if check_type is CheckType.CONTAIN:
assert tlvs
return tlvs[0]
elif check_type is CheckType.NOT_CONTAIN:
assert not tlvs
return None
elif check_type is CheckType.OPTIONAL:
return None
else:
raise ValueError("Invalid check type: {}".format(check_type))
def check_discovery_request(command_msg):
"""Verify a properly formatted Thread Discovery Request command message.
"""
assert not isinstance(command_msg.mle, mle.MleMessageSecured)
tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs
request = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.DiscoveryRequest)
assert request.version == config.PROTOCOL_VERSION
def check_discovery_response(command_msg, request_src_addr, steering_data=CheckType.OPTIONAL):
"""Verify a properly formatted Thread Discovery Response command message.
"""
assert not isinstance(command_msg.mle, mle.MleMessageSecured)
assert command_msg.mac_header.src_address.type == common.MacAddressType.LONG
assert command_msg.mac_header.dest_address == request_src_addr
tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs
response = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.DiscoveryResponse)
assert response.version == config.PROTOCOL_VERSION
assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.ExtendedPanid)
assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.NetworkName)
assert_contains_tlv(tlvs, steering_data, network_data.SteeringData)
assert_contains_tlv(tlvs, steering_data, mesh_cop.JoinerUdpPort)
check_type = CheckType.CONTAIN if response.native_flag else CheckType.OPTIONAL
assert_contains_tlv(tlvs, check_type, network_data.CommissionerUdpPort)
def get_joiner_udp_port_in_discovery_response(command_msg):
"""Get the udp port specified in a DISCOVERY RESPONSE message
"""
tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs
udp_port_tlv = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.JoinerUdpPort)
return udp_port_tlv.udp_port
def check_joiner_commissioning_messages(commissioning_messages):
"""Verify COAP messages sent by joiner while commissioning process.
"""
print(commissioning_messages)
assert len(commissioning_messages) >= 2
join_fin_req = commissioning_messages[0]
assert join_fin_req.type == mesh_cop.MeshCopMessageType.JOIN_FIN_REQ
assert_contains_tlv(join_fin_req.tlvs, CheckType.NOT_CONTAIN, mesh_cop.ProvisioningUrl)
join_ent_rsp = commissioning_messages[1]
assert join_ent_rsp.type == mesh_cop.MeshCopMessageType.JOIN_ENT_RSP
def check_commissioner_commissioning_messages(commissioning_messages):
"""Verify COAP messages sent by commissioner while commissioning process.
"""
assert any(msg.type == mesh_cop.MeshCopMessageType.JOIN_FIN_RSP for msg in commissioning_messages)
def check_joiner_router_commissioning_messages(commissioning_messages):
"""Verify COAP messages sent by joiner router while commissioning process.
"""
assert any(msg.type == mesh_cop.MeshCopMessageType.JOIN_ENT_NTF for msg in commissioning_messages)