mirror of
https://github.com/espressif/openthread.git
synced 2026-06-06 05:24:51 +00:00
[style] change python yapf column_limit to 119 (#5339)
This commit is contained in:
@@ -58,19 +58,15 @@ def check_address_query(command_msg, source_node, destination_address):
|
||||
command_msg.assertCoapMessageContainsTlv(network_layer.TargetEid)
|
||||
|
||||
source_rloc = source_node.get_ip6_address(config.ADDRESS_TYPE.RLOC)
|
||||
assert (
|
||||
ipv6.ip_address(
|
||||
source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address
|
||||
), ("Error: The IPv6 source address is not the RLOC of the originator. The source node's rloc is: "
|
||||
+ str(ipv6.ip_address(source_rloc)) +
|
||||
", but the source_address in command msg is: " +
|
||||
assert (ipv6.ip_address(source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address), (
|
||||
"Error: The IPv6 source address is not the RLOC of the originator. The source node's rloc is: " +
|
||||
str(ipv6.ip_address(source_rloc)) + ", but the source_address in command msg is: " +
|
||||
str(command_msg.ipv6_packet.ipv6_header.source_address))
|
||||
|
||||
if isinstance(destination_address, bytearray):
|
||||
destination_address = bytes(destination_address)
|
||||
|
||||
assert (ipv6.ip_address(destination_address) ==
|
||||
command_msg.ipv6_packet.ipv6_header.destination_address
|
||||
assert (ipv6.ip_address(destination_address) == command_msg.ipv6_packet.ipv6_header.destination_address
|
||||
), "Error: The IPv6 destination address is not expected."
|
||||
|
||||
|
||||
@@ -83,21 +79,15 @@ def check_address_notification(command_msg, source_node, destination_node):
|
||||
command_msg.assertCoapMessageContainsTlv(network_layer.MlEid)
|
||||
|
||||
source_rloc = source_node.get_ip6_address(config.ADDRESS_TYPE.RLOC)
|
||||
assert (
|
||||
ipv6.ip_address(
|
||||
source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address
|
||||
), "Error: The IPv6 source address is not the RLOC of the originator."
|
||||
assert (ipv6.ip_address(source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address
|
||||
), "Error: The IPv6 source address is not the RLOC of the originator."
|
||||
|
||||
destination_rloc = destination_node.get_ip6_address(
|
||||
config.ADDRESS_TYPE.RLOC)
|
||||
assert (
|
||||
ipv6.ip_address(destination_rloc) ==
|
||||
command_msg.ipv6_packet.ipv6_header.destination_address
|
||||
), "Error: The IPv6 destination address is not the RLOC of the destination."
|
||||
destination_rloc = destination_node.get_ip6_address(config.ADDRESS_TYPE.RLOC)
|
||||
assert (ipv6.ip_address(destination_rloc) == command_msg.ipv6_packet.ipv6_header.destination_address
|
||||
), "Error: The IPv6 destination address is not the RLOC of the destination."
|
||||
|
||||
|
||||
def check_address_error_notification(command_msg, source_node,
|
||||
destination_address):
|
||||
def check_address_error_notification(command_msg, source_node, destination_address):
|
||||
"""Verify source_node sent a properly formatted Address Error Notification command message to destination_address.
|
||||
"""
|
||||
command_msg.assertCoapMessageRequestUriPath('/a/ae')
|
||||
@@ -105,23 +95,17 @@ def check_address_error_notification(command_msg, source_node,
|
||||
command_msg.assertCoapMessageContainsTlv(network_layer.MlEid)
|
||||
|
||||
source_rloc = source_node.get_ip6_address(config.ADDRESS_TYPE.RLOC)
|
||||
assert (
|
||||
ipv6.ip_address(
|
||||
source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address
|
||||
), ("Error: The IPv6 source address is not the RLOC of the originator. The source node's rloc is: "
|
||||
+ str(ipv6.ip_address(source_rloc)) +
|
||||
", but the source_address in command msg is: " +
|
||||
assert (ipv6.ip_address(source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address), (
|
||||
"Error: The IPv6 source address is not the RLOC of the originator. The source node's rloc is: " +
|
||||
str(ipv6.ip_address(source_rloc)) + ", but the source_address in command msg is: " +
|
||||
str(command_msg.ipv6_packet.ipv6_header.source_address))
|
||||
|
||||
if isinstance(destination_address, bytearray):
|
||||
destination_address = bytes(destination_address)
|
||||
|
||||
assert (
|
||||
ipv6.ip_address(destination_address) ==
|
||||
command_msg.ipv6_packet.ipv6_header.destination_address
|
||||
), ("Error: The IPv6 destination address is not expected. The destination node's rloc is: "
|
||||
+ str(ipv6.ip_address(destination_address)) +
|
||||
", but the destination_address in command msg is: " +
|
||||
assert (ipv6.ip_address(destination_address) == command_msg.ipv6_packet.ipv6_header.destination_address), (
|
||||
"Error: The IPv6 destination address is not expected. The destination node's rloc is: " +
|
||||
str(ipv6.ip_address(destination_address)) + ", but the destination_address in command msg is: " +
|
||||
str(command_msg.ipv6_packet.ipv6_header.destination_address))
|
||||
|
||||
|
||||
@@ -142,10 +126,8 @@ def check_address_release(command_msg, destination_node):
|
||||
command_msg.assertCoapMessageContainsTlv(network_layer.Rloc16)
|
||||
command_msg.assertCoapMessageContainsTlv(network_layer.MacExtendedAddress)
|
||||
|
||||
destination_rloc = destination_node.get_ip6_address(
|
||||
config.ADDRESS_TYPE.RLOC)
|
||||
assert (ipv6.ip_address(destination_rloc) ==
|
||||
command_msg.ipv6_packet.ipv6_header.destination_address
|
||||
destination_rloc = destination_node.get_ip6_address(config.ADDRESS_TYPE.RLOC)
|
||||
assert (ipv6.ip_address(destination_rloc) == command_msg.ipv6_packet.ipv6_header.destination_address
|
||||
), "Error: The destination is not RLOC address"
|
||||
|
||||
|
||||
@@ -155,26 +137,22 @@ def check_tlv_request_tlv(command_msg, check_type, tlv_id):
|
||||
tlv_request_tlv = command_msg.get_mle_message_tlv(mle.TlvRequest)
|
||||
|
||||
if check_type == CheckType.CONTAIN:
|
||||
assert (tlv_request_tlv is
|
||||
not None), "Error: The msg doesn't contain TLV Request TLV"
|
||||
assert (tlv_request_tlv is not None), "Error: The msg doesn't contain TLV Request TLV"
|
||||
assert any(
|
||||
tlv_id == tlv for tlv in tlv_request_tlv.tlvs
|
||||
), "Error: The msg doesn't contain TLV Request TLV ID: {}".format(
|
||||
tlv_id)
|
||||
tlv_id == tlv
|
||||
for tlv in tlv_request_tlv.tlvs), "Error: The msg doesn't contain TLV Request TLV ID: {}".format(tlv_id)
|
||||
|
||||
elif check_type == CheckType.NOT_CONTAIN:
|
||||
if tlv_request_tlv is not None:
|
||||
assert (
|
||||
any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs) is False
|
||||
), "Error: The msg contains TLV Request TLV ID: {}".format(tlv_id)
|
||||
assert (any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs) is
|
||||
False), "Error: The msg contains TLV Request TLV ID: {}".format(tlv_id)
|
||||
|
||||
elif check_type == CheckType.OPTIONAL:
|
||||
if tlv_request_tlv is not None:
|
||||
if any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs):
|
||||
print("TLV Request TLV contains TLV ID: {}".format(tlv_id))
|
||||
else:
|
||||
print(
|
||||
"TLV Request TLV doesn't contain TLV ID: {}".format(tlv_id))
|
||||
print("TLV Request TLV doesn't contain TLV ID: {}".format(tlv_id))
|
||||
else:
|
||||
print("The msg doesn't contain TLV Request TLV")
|
||||
|
||||
@@ -198,11 +176,9 @@ def check_link_request(
|
||||
check_mle_optional_tlv(command_msg, source_address, mle.SourceAddress)
|
||||
check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData)
|
||||
|
||||
check_tlv_request_tlv(command_msg, tlv_request_address16,
|
||||
mle.TlvType.ADDRESS16)
|
||||
check_tlv_request_tlv(command_msg, tlv_request_address16, mle.TlvType.ADDRESS16)
|
||||
check_tlv_request_tlv(command_msg, tlv_request_route64, mle.TlvType.ROUTE64)
|
||||
check_tlv_request_tlv(command_msg, tlv_request_link_margin,
|
||||
mle.TlvType.LINK_MARGIN)
|
||||
check_tlv_request_tlv(command_msg, tlv_request_link_margin, mle.TlvType.LINK_MARGIN)
|
||||
|
||||
|
||||
def check_link_accept(
|
||||
@@ -230,14 +206,11 @@ def check_link_accept(
|
||||
check_mle_optional_tlv(command_msg, address16, mle.Address16)
|
||||
check_mle_optional_tlv(command_msg, route64, mle.Route64)
|
||||
|
||||
check_tlv_request_tlv(command_msg, tlv_request_link_margin,
|
||||
mle.TlvType.LINK_MARGIN)
|
||||
check_tlv_request_tlv(command_msg, tlv_request_link_margin, mle.TlvType.LINK_MARGIN)
|
||||
|
||||
destination_link_local = destination_node.get_ip6_address(
|
||||
config.ADDRESS_TYPE.LINK_LOCAL)
|
||||
assert (
|
||||
ipv6.ip_address(destination_link_local) == command_msg.ipv6_packet.
|
||||
ipv6_header.destination_address), "Error: The destination is unexpected"
|
||||
destination_link_local = destination_node.get_ip6_address(config.ADDRESS_TYPE.LINK_LOCAL)
|
||||
assert (ipv6.ip_address(destination_link_local) == command_msg.ipv6_packet.ipv6_header.destination_address
|
||||
), "Error: The destination is unexpected"
|
||||
|
||||
|
||||
def check_icmp_path(sniffer, path, nodes, icmp_type=ipv6.ICMP_ECHO_REQUEST):
|
||||
@@ -253,8 +226,7 @@ def check_icmp_path(sniffer, path, nodes, icmp_type=ipv6.ICMP_ECHO_REQUEST):
|
||||
if i < len_path - 1:
|
||||
next_node = nodes[path[i + 1]]
|
||||
next_node_rloc16 = next_node.get_addr16()
|
||||
assert (next_node_rloc16 == node_icmp_msg.mac_header.dest_address.
|
||||
rloc), "Error: The path is unexpected."
|
||||
assert (next_node_rloc16 == node_icmp_msg.mac_header.dest_address.rloc), "Error: The path is unexpected."
|
||||
else:
|
||||
return True
|
||||
|
||||
@@ -305,8 +277,7 @@ def check_mle_optional_tlv(command_msg, type, tlv):
|
||||
|
||||
def check_mle_advertisement(command_msg):
|
||||
command_msg.assertSentWithHopLimit(255)
|
||||
command_msg.assertSentToDestinationAddress(
|
||||
config.LINK_LOCAL_ALL_NODES_ADDRESS)
|
||||
command_msg.assertSentToDestinationAddress(config.LINK_LOCAL_ALL_NODES_ADDRESS)
|
||||
command_msg.assertMleMessageContainsTlv(mle.SourceAddress)
|
||||
command_msg.assertMleMessageContainsTlv(mle.LeaderData)
|
||||
command_msg.assertMleMessageContainsTlv(mle.Route64)
|
||||
@@ -316,13 +287,10 @@ def check_parent_request(command_msg, is_first_request):
|
||||
"""Verify a properly formatted Parent Request command message.
|
||||
"""
|
||||
if command_msg.mle.aux_sec_hdr.key_id_mode != 0x2:
|
||||
raise ValueError(
|
||||
"The Key Identifier Mode of the Security Control Field SHALL be set to 0x02"
|
||||
)
|
||||
raise ValueError("The Key Identifier Mode of the Security Control Field SHALL be set to 0x02")
|
||||
|
||||
command_msg.assertSentWithHopLimit(255)
|
||||
command_msg.assertSentToDestinationAddress(
|
||||
config.LINK_LOCAL_ALL_ROUTERS_ADDRESS)
|
||||
command_msg.assertSentToDestinationAddress(config.LINK_LOCAL_ALL_ROUTERS_ADDRESS)
|
||||
command_msg.assertMleMessageContainsTlv(mle.Mode)
|
||||
command_msg.assertMleMessageContainsTlv(mle.Challenge)
|
||||
command_msg.assertMleMessageContainsTlv(mle.Version)
|
||||
@@ -363,9 +331,7 @@ def check_child_id_request(
|
||||
"""Verify a properly formatted Child Id Request command message.
|
||||
"""
|
||||
if command_msg.mle.aux_sec_hdr.key_id_mode != 0x2:
|
||||
raise ValueError(
|
||||
"The Key Identifier Mode of the Security Control Field SHALL be set to 0x02"
|
||||
)
|
||||
raise ValueError("The Key Identifier Mode of the Security Control Field SHALL be set to 0x02")
|
||||
|
||||
command_msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
|
||||
command_msg.assertMleMessageContainsTlv(mle.Mode)
|
||||
@@ -375,15 +341,13 @@ def check_child_id_request(
|
||||
|
||||
check_mle_optional_tlv(command_msg, tlv_request, mle.TlvRequest)
|
||||
check_mle_optional_tlv(command_msg, mle_frame_counter, mle.MleFrameCounter)
|
||||
check_mle_optional_tlv(command_msg, address_registration,
|
||||
mle.AddressRegistration)
|
||||
check_mle_optional_tlv(command_msg, address_registration, mle.AddressRegistration)
|
||||
check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp)
|
||||
check_mle_optional_tlv(command_msg, pending_timestamp, mle.PendingTimestamp)
|
||||
check_mle_optional_tlv(command_msg, route64, mle.Route64)
|
||||
|
||||
check_tlv_request_tlv(command_msg, CheckType.CONTAIN, mle.TlvType.ADDRESS16)
|
||||
check_tlv_request_tlv(command_msg, CheckType.CONTAIN,
|
||||
mle.TlvType.NETWORK_DATA)
|
||||
check_tlv_request_tlv(command_msg, CheckType.CONTAIN, mle.TlvType.NETWORK_DATA)
|
||||
|
||||
|
||||
def check_child_id_response(
|
||||
@@ -405,29 +369,22 @@ def check_child_id_response(
|
||||
|
||||
check_mle_optional_tlv(command_msg, route64, mle.Route64)
|
||||
check_mle_optional_tlv(command_msg, network_data, mle.NetworkData)
|
||||
check_mle_optional_tlv(command_msg, address_registration,
|
||||
mle.AddressRegistration)
|
||||
check_mle_optional_tlv(command_msg, address_registration, mle.AddressRegistration)
|
||||
check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp)
|
||||
check_mle_optional_tlv(command_msg, pending_timestamp, mle.PendingTimestamp)
|
||||
check_mle_optional_tlv(command_msg, active_operational_dataset,
|
||||
mle.ActiveOperationalDataset)
|
||||
check_mle_optional_tlv(command_msg, pending_operational_dataset,
|
||||
mle.PendingOperationalDataset)
|
||||
check_mle_optional_tlv(command_msg, active_operational_dataset, mle.ActiveOperationalDataset)
|
||||
check_mle_optional_tlv(command_msg, pending_operational_dataset, mle.PendingOperationalDataset)
|
||||
|
||||
if network_data_check is not None:
|
||||
network_data_tlv = command_msg.assertMleMessageContainsTlv(
|
||||
mle.NetworkData)
|
||||
network_data_tlv = command_msg.assertMleMessageContainsTlv(mle.NetworkData)
|
||||
network_data_check.check(network_data_tlv)
|
||||
|
||||
|
||||
def check_prefix(prefix):
|
||||
"""Verify if a prefix contains 6loWPAN sub-TLV and border router sub-TLV
|
||||
"""
|
||||
assert contains_tlv(prefix.sub_tlvs, network_data.BorderRouter
|
||||
), 'Prefix doesn\'t contain a border router sub-TLV!'
|
||||
assert contains_tlv(
|
||||
prefix.sub_tlvs,
|
||||
network_data.LowpanId), 'Prefix doesn\'t contain a LowpanId sub-TLV!'
|
||||
assert contains_tlv(prefix.sub_tlvs, network_data.BorderRouter), 'Prefix doesn\'t contain a border router sub-TLV!'
|
||||
assert contains_tlv(prefix.sub_tlvs, network_data.LowpanId), 'Prefix doesn\'t contain a LowpanId sub-TLV!'
|
||||
|
||||
|
||||
def check_child_update_request_from_child(
|
||||
@@ -447,8 +404,7 @@ def check_child_update_request_from_child(
|
||||
check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData)
|
||||
check_mle_optional_tlv(command_msg, challenge, mle.Challenge)
|
||||
check_mle_optional_tlv(command_msg, time_out, mle.Timeout)
|
||||
check_mle_optional_tlv(command_msg, address_registration,
|
||||
mle.AddressRegistration)
|
||||
check_mle_optional_tlv(command_msg, address_registration, mle.AddressRegistration)
|
||||
check_mle_optional_tlv(command_msg, tlv_request_tlv, mle.TlvRequest)
|
||||
check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp)
|
||||
|
||||
@@ -472,11 +428,9 @@ def check_router_id_cached(node, router_id, cached=True):
|
||||
"""
|
||||
eidcaches = node.get_eidcaches()
|
||||
if cached:
|
||||
assert any(
|
||||
router_id == (int(rloc, 16) >> 10) for (_, rloc) in eidcaches)
|
||||
assert any(router_id == (int(rloc, 16) >> 10) for (_, rloc) in eidcaches)
|
||||
else:
|
||||
assert (any(router_id == (int(rloc, 16) >> 10)
|
||||
for (_, rloc) in eidcaches) is False)
|
||||
assert (any(router_id == (int(rloc, 16) >> 10) for (_, rloc) in eidcaches) is False)
|
||||
|
||||
|
||||
def contains_tlv(sub_tlvs, tlv_type):
|
||||
@@ -488,9 +442,7 @@ def contains_tlv(sub_tlvs, tlv_type):
|
||||
def contains_tlvs(sub_tlvs, tlv_types):
|
||||
"""Verify if all types of tlv in a list are included in a sub-tlv list.
|
||||
"""
|
||||
return all((any(isinstance(sub_tlv, tlv_type)
|
||||
for sub_tlv in sub_tlvs))
|
||||
for tlv_type in tlv_types)
|
||||
return all((any(isinstance(sub_tlv, tlv_type) for sub_tlv in sub_tlvs)) for tlv_type in tlv_types)
|
||||
|
||||
|
||||
def check_secure_mle_key_id_mode(command_msg, key_id_mode):
|
||||
@@ -500,9 +452,7 @@ def check_secure_mle_key_id_mode(command_msg, key_id_mode):
|
||||
assert command_msg.mle.aux_sec_hdr.key_id_mode == key_id_mode
|
||||
|
||||
|
||||
def check_data_response(command_msg,
|
||||
network_data_check=None,
|
||||
active_timestamp=CheckType.OPTIONAL):
|
||||
def check_data_response(command_msg, network_data_check=None, active_timestamp=CheckType.OPTIONAL):
|
||||
"""Verify a properly formatted Data Response command message.
|
||||
"""
|
||||
check_secure_mle_key_id_mode(command_msg, 0x02)
|
||||
@@ -510,8 +460,7 @@ def check_data_response(command_msg,
|
||||
command_msg.assertMleMessageContainsTlv(mle.LeaderData)
|
||||
check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp)
|
||||
if network_data_check is not None:
|
||||
network_data_tlv = command_msg.assertMleMessageContainsTlv(
|
||||
mle.NetworkData)
|
||||
network_data_tlv = command_msg.assertMleMessageContainsTlv(mle.NetworkData)
|
||||
network_data_check.check(network_data_tlv)
|
||||
|
||||
|
||||
@@ -554,14 +503,12 @@ def check_child_update_response(
|
||||
command_msg.assertMleMessageContainsTlv(mle.SourceAddress)
|
||||
command_msg.assertMleMessageContainsTlv(mle.Mode)
|
||||
check_mle_optional_tlv(command_msg, timeout, mle.Timeout)
|
||||
check_mle_optional_tlv(command_msg, address_registration,
|
||||
mle.AddressRegistration)
|
||||
check_mle_optional_tlv(command_msg, address_registration, mle.AddressRegistration)
|
||||
check_mle_optional_tlv(command_msg, address16, mle.Address16)
|
||||
check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData)
|
||||
check_mle_optional_tlv(command_msg, network_data, mle.NetworkData)
|
||||
check_mle_optional_tlv(command_msg, response, mle.Response)
|
||||
check_mle_optional_tlv(command_msg, link_layer_frame_counter,
|
||||
mle.LinkLayerFrameCounter)
|
||||
check_mle_optional_tlv(command_msg, link_layer_frame_counter, mle.LinkLayerFrameCounter)
|
||||
check_mle_optional_tlv(command_msg, mle_frame_counter, mle.MleFrameCounter)
|
||||
|
||||
if (address_registration == CheckType.CONTAIN) and len(CIDs) > 0:
|
||||
@@ -569,8 +516,7 @@ def check_child_update_response(
|
||||
|
||||
|
||||
def _check_address_registration(command_msg, CIDs=()):
|
||||
addresses = command_msg.assertMleMessageContainsTlv(
|
||||
mle.AddressRegistration).addresses
|
||||
addresses = command_msg.assertMleMessageContainsTlv(mle.AddressRegistration).addresses
|
||||
for cid in CIDs:
|
||||
found = False
|
||||
for address in addresses:
|
||||
@@ -595,22 +541,17 @@ def check_address_registration_tlv(
|
||||
"""
|
||||
found = False
|
||||
addr = ipaddress.ip_address(full_address)
|
||||
addresses = command_msg.assertMleMessageContainsTlv(
|
||||
mle.AddressRegistration).addresses
|
||||
addresses = command_msg.assertMleMessageContainsTlv(mle.AddressRegistration).addresses
|
||||
|
||||
for item in addresses:
|
||||
if isinstance(item, mle.AddressFull) and ipaddress.ip_address(
|
||||
item.ipv6_address) == addr:
|
||||
if isinstance(item, mle.AddressFull) and ipaddress.ip_address(item.ipv6_address) == addr:
|
||||
found = True
|
||||
break
|
||||
|
||||
return found
|
||||
|
||||
|
||||
def check_compressed_address_registration_tlv(command_msg,
|
||||
cid,
|
||||
iid,
|
||||
cid_present_once=False):
|
||||
def check_compressed_address_registration_tlv(command_msg, cid, iid, cid_present_once=False):
|
||||
'''Check whether or not a compressed IPv6 address in AddressRegistrationTlv.
|
||||
note: only compare the iid part of the address.
|
||||
|
||||
@@ -624,8 +565,7 @@ def check_compressed_address_registration_tlv(command_msg,
|
||||
found = False
|
||||
cid_cnt = 0
|
||||
|
||||
addresses = command_msg.assertMleMessageContainsTlv(
|
||||
mle.AddressRegistration).addresses
|
||||
addresses = command_msg.assertMleMessageContainsTlv(mle.AddressRegistration).addresses
|
||||
|
||||
for item in addresses:
|
||||
if isinstance(item, mle.AddressCompressed):
|
||||
@@ -634,12 +574,10 @@ def check_compressed_address_registration_tlv(command_msg,
|
||||
if iid == item.iid.hex():
|
||||
found = True
|
||||
break
|
||||
assert found, 'Error: Expected (cid, iid):({},{}) Not Found'.format(
|
||||
cid, iid)
|
||||
assert found, 'Error: Expected (cid, iid):({},{}) Not Found'.format(cid, iid)
|
||||
|
||||
assert cid_present_once == (
|
||||
cid_cnt == 1), 'Error: Expected cid present {} but present {}'.format(
|
||||
'once' if cid_present_once else '', cid_cnt)
|
||||
assert cid_present_once == (cid_cnt == 1), 'Error: Expected cid present {} but present {}'.format(
|
||||
'once' if cid_present_once else '', cid_cnt)
|
||||
|
||||
|
||||
def assert_contains_tlv(tlvs, check_type, tlv_type):
|
||||
@@ -663,32 +601,26 @@ def check_discovery_request(command_msg):
|
||||
"""
|
||||
assert not isinstance(command_msg.mle, mle.MleMessageSecured)
|
||||
tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs
|
||||
request = assert_contains_tlv(tlvs, CheckType.CONTAIN,
|
||||
mesh_cop.DiscoveryRequest)
|
||||
request = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.DiscoveryRequest)
|
||||
assert request.version == config.PROTOCOL_VERSION
|
||||
|
||||
|
||||
def check_discovery_response(command_msg,
|
||||
request_src_addr,
|
||||
steering_data=CheckType.OPTIONAL):
|
||||
def check_discovery_response(command_msg, request_src_addr, steering_data=CheckType.OPTIONAL):
|
||||
"""Verify a properly formatted Thread Discovery Response command message.
|
||||
"""
|
||||
assert not isinstance(command_msg.mle, mle.MleMessageSecured)
|
||||
assert (
|
||||
command_msg.mac_header.src_address.type == common.MacAddressType.LONG)
|
||||
assert (command_msg.mac_header.src_address.type == common.MacAddressType.LONG)
|
||||
assert command_msg.mac_header.dest_address == request_src_addr
|
||||
|
||||
tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs
|
||||
response = assert_contains_tlv(tlvs, CheckType.CONTAIN,
|
||||
mesh_cop.DiscoveryResponse)
|
||||
response = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.DiscoveryResponse)
|
||||
assert response.version == config.PROTOCOL_VERSION
|
||||
assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.ExtendedPanid)
|
||||
assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.NetworkName)
|
||||
assert_contains_tlv(tlvs, steering_data, mesh_cop.SteeringData)
|
||||
assert_contains_tlv(tlvs, steering_data, mesh_cop.JoinerUdpPort)
|
||||
|
||||
check_type = (CheckType.CONTAIN
|
||||
if response.native_flag else CheckType.OPTIONAL)
|
||||
check_type = (CheckType.CONTAIN if response.native_flag else CheckType.OPTIONAL)
|
||||
assert_contains_tlv(tlvs, check_type, mesh_cop.CommissionerUdpPort)
|
||||
|
||||
|
||||
@@ -696,8 +628,7 @@ def get_joiner_udp_port_in_discovery_response(command_msg):
|
||||
"""Get the udp port specified in a DISCOVERY RESPONSE message
|
||||
"""
|
||||
tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs
|
||||
udp_port_tlv = assert_contains_tlv(tlvs, CheckType.CONTAIN,
|
||||
mesh_cop.JoinerUdpPort)
|
||||
udp_port_tlv = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.JoinerUdpPort)
|
||||
return udp_port_tlv.udp_port
|
||||
|
||||
|
||||
@@ -708,8 +639,7 @@ def check_joiner_commissioning_messages(commissioning_messages):
|
||||
assert len(commissioning_messages) >= 2
|
||||
join_fin_req = commissioning_messages[0]
|
||||
assert join_fin_req.type == mesh_cop.MeshCopMessageType.JOIN_FIN_REQ
|
||||
assert_contains_tlv(join_fin_req.tlvs, CheckType.NOT_CONTAIN,
|
||||
mesh_cop.ProvisioningUrl)
|
||||
assert_contains_tlv(join_fin_req.tlvs, CheckType.NOT_CONTAIN, mesh_cop.ProvisioningUrl)
|
||||
join_ent_rsp = commissioning_messages[1]
|
||||
assert join_ent_rsp.type == mesh_cop.MeshCopMessageType.JOIN_ENT_RSP
|
||||
|
||||
@@ -717,15 +647,13 @@ def check_joiner_commissioning_messages(commissioning_messages):
|
||||
def check_commissioner_commissioning_messages(commissioning_messages):
|
||||
"""Verify COAP messages sent by commissioner while commissioning process.
|
||||
"""
|
||||
assert any(msg.type == mesh_cop.MeshCopMessageType.JOIN_FIN_RSP
|
||||
for msg in commissioning_messages)
|
||||
assert any(msg.type == mesh_cop.MeshCopMessageType.JOIN_FIN_RSP for msg in commissioning_messages)
|
||||
|
||||
|
||||
def check_joiner_router_commissioning_messages(commissioning_messages):
|
||||
"""Verify COAP messages sent by joiner router while commissioning process.
|
||||
"""
|
||||
assert any(msg.type == mesh_cop.MeshCopMessageType.JOIN_ENT_NTF
|
||||
for msg in commissioning_messages)
|
||||
assert any(msg.type == mesh_cop.MeshCopMessageType.JOIN_ENT_NTF for msg in commissioning_messages)
|
||||
return None
|
||||
|
||||
|
||||
@@ -737,8 +665,7 @@ def check_payload_same(tp1, tp2):
|
||||
for tlv in tp2:
|
||||
peer_tlv = get_sub_tlv(tp1, type(tlv))
|
||||
assert (peer_tlv is not None and
|
||||
peer_tlv == tlv), 'peer_tlv:{}, tlv:{} type:{}'.format(
|
||||
peer_tlv, tlv, type(tlv))
|
||||
peer_tlv == tlv), 'peer_tlv:{}, tlv:{} type:{}'.format(peer_tlv, tlv, type(tlv))
|
||||
|
||||
|
||||
def check_coap_message(msg, payloads, dest_addrs=None):
|
||||
@@ -759,17 +686,13 @@ class SinglePrefixCheck:
|
||||
self._border_router_16 = border_router_16
|
||||
|
||||
def check(self, prefix_tlv):
|
||||
border_router_tlv = assert_contains_tlv(prefix_tlv.sub_tlvs,
|
||||
CheckType.CONTAIN,
|
||||
network_data.BorderRouter)
|
||||
assert_contains_tlv(prefix_tlv.sub_tlvs, CheckType.CONTAIN,
|
||||
network_data.LowpanId)
|
||||
border_router_tlv = assert_contains_tlv(prefix_tlv.sub_tlvs, CheckType.CONTAIN, network_data.BorderRouter)
|
||||
assert_contains_tlv(prefix_tlv.sub_tlvs, CheckType.CONTAIN, network_data.LowpanId)
|
||||
result = True
|
||||
if self._prefix is not None:
|
||||
result &= self._prefix == binascii.hexlify(prefix_tlv.prefix)
|
||||
if self._border_router_16 is not None:
|
||||
result &= (
|
||||
self._border_router_16 == border_router_tlv.border_router_16)
|
||||
result &= (self._border_router_16 == border_router_tlv.border_router_16)
|
||||
return result
|
||||
|
||||
|
||||
@@ -782,8 +705,7 @@ class PrefixesCheck:
|
||||
def check(self, prefix_tlvs):
|
||||
# if prefix_cnt is given, then check count only
|
||||
if self._prefix_cnt > 0:
|
||||
assert (len(prefix_tlvs) >=
|
||||
self._prefix_cnt), 'prefix count is less than expected'
|
||||
assert (len(prefix_tlvs) >= self._prefix_cnt), 'prefix count is less than expected'
|
||||
else:
|
||||
for prefix_check in self._prefix_check_list:
|
||||
found = False
|
||||
@@ -802,11 +724,9 @@ class CommissioningDataCheck:
|
||||
|
||||
def check(self, commissioning_data_tlv):
|
||||
if self._stable is not None:
|
||||
assert (self._stable == commissioning_data_tlv.stable
|
||||
), 'Commissioning Data stable flag is not correct'
|
||||
assert (self._stable == commissioning_data_tlv.stable), 'Commissioning Data stable flag is not correct'
|
||||
assert contains_tlvs(commissioning_data_tlv.sub_tlvs,
|
||||
self._sub_tlv_type_list
|
||||
), 'Some sub tlvs are missing in Commissioning Data'
|
||||
self._sub_tlv_type_list), 'Some sub tlvs are missing in Commissioning Data'
|
||||
|
||||
|
||||
class NetworkDataCheck:
|
||||
@@ -817,10 +737,7 @@ class NetworkDataCheck:
|
||||
|
||||
def check(self, network_data_tlv):
|
||||
if self._prefixes_check is not None:
|
||||
prefix_tlvs = [
|
||||
tlv for tlv in network_data_tlv.tlvs
|
||||
if isinstance(tlv, network_data.Prefix)
|
||||
]
|
||||
prefix_tlvs = [tlv for tlv in network_data_tlv.tlvs if isinstance(tlv, network_data.Prefix)]
|
||||
self._prefixes_check.check(prefix_tlvs)
|
||||
if self._commissioning_data_check is not None:
|
||||
commissioning_data_tlv = assert_contains_tlv(
|
||||
|
||||
Reference in New Issue
Block a user