[tests] add traffic check in test case Cert_7_1_01 (#3405)

This commit is contained in:
Irving-cl
2019-02-19 05:41:30 +08:00
committed by Jonathan Hui
parent 968f9b6898
commit cf7bc35bbe
4 changed files with 142 additions and 30 deletions
@@ -27,18 +27,27 @@
# POSSIBILITY OF SUCH DAMAGE.
#
import functools
import time
import unittest
from command import check_child_id_response
from command import check_child_update_response
from command import check_data_response
from command import check_message_address_registration_addr_set_equals
from command import CheckType
from command import NetworkDataCheckType
import config
import mle
import network_data
import node
LEADER = 1
ROUTER = 2
SED1 = 3
ED1 = 4
MED1 = 4
MTDS = [SED1, ED1]
MTDS = [SED1, MED1]
class Cert_7_1_1_BorderRouterAsLeader(unittest.TestCase):
def setUp(self):
@@ -52,7 +61,7 @@ class Cert_7_1_1_BorderRouterAsLeader(unittest.TestCase):
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[SED1].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ED1].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[MED1].get_addr64())
self.nodes[LEADER].enable_whitelist()
self.nodes[ROUTER].set_panid(0xface)
@@ -67,10 +76,10 @@ class Cert_7_1_1_BorderRouterAsLeader(unittest.TestCase):
self.nodes[SED1].enable_whitelist()
self.nodes[SED1].set_timeout(config.DEFAULT_CHILD_TIMEOUT)
self.nodes[ED1].set_panid(0xface)
self.nodes[ED1].set_mode('rsn')
self.nodes[ED1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ED1].enable_whitelist()
self.nodes[MED1].set_panid(0xface)
self.nodes[MED1].set_mode('rsn')
self.nodes[MED1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[MED1].enable_whitelist()
def tearDown(self):
for node in list(self.nodes.values()):
@@ -95,9 +104,9 @@ class Cert_7_1_1_BorderRouterAsLeader(unittest.TestCase):
self.simulator.go(5)
self.assertEqual(self.nodes[SED1].get_state(), 'child')
self.nodes[ED1].start()
self.nodes[MED1].start()
self.simulator.go(5)
self.assertEqual(self.nodes[ED1].get_state(), 'child')
self.assertEqual(self.nodes[MED1].get_state(), 'child')
addrs = self.nodes[SED1].get_addrs()
self.assertTrue(any('2001:2:0:1' in addr[0:10] for addr in addrs))
@@ -106,12 +115,48 @@ class Cert_7_1_1_BorderRouterAsLeader(unittest.TestCase):
if addr[0:10] == '2001:2:0:1' or addr[0:10] == '2001:2:0:2':
self.assertTrue(self.nodes[LEADER].ping(addr))
addrs = self.nodes[ED1].get_addrs()
addrs = self.nodes[MED1].get_addrs()
self.assertTrue(any('2001:2:0:1' in addr[0:10] for addr in addrs))
self.assertTrue(any('2001:2:0:2' in addr[0:10] for addr in addrs))
for addr in addrs:
if addr[0:10] == '2001:2:0:1' or addr[0:10] == '2001:2:0:2':
self.assertTrue(self.nodes[LEADER].ping(addr))
leader_messages = self.simulator.get_messages_sent_by(LEADER)
med1_messages = self.simulator.get_messages_sent_by(MED1)
sed1_messages = self.simulator.get_messages_sent_by(SED1)
# Step 1 - DUT sends MLE Advertisements
msg = leader_messages.next_mle_message(mle.CommandType.ADVERTISEMENT)
# Step 2 - DUT creates network data
msg = leader_messages.next_mle_message(mle.CommandType.DATA_RESPONSE)
check_data_response(msg, network_data_check=(NetworkDataCheckType.PREFIX_CONTENT,
[{network_data.TlvType.PREFIX:b'2001000200000001'}, {network_data.TlvType.PREFIX:b'2001000200000002'}]))
# Step 4 - DUT sends a MLE Child ID Response to Router1
msg = leader_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
check_child_id_response(msg, network_data_check=(NetworkDataCheckType.PREFIX_CNT, 2))
# Step 6 - DUT sends a MLE Child ID Response to SED1
msg = leader_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
check_child_id_response(msg, network_data_check=(NetworkDataCheckType.PREFIX_CONTENT, [{network_data.TlvType.BORDER_ROUTER:0xFFFE}]))
# For Step 10
msg_chd_upd_res_to_sed = leader_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_RESPONSE)
# Step 8 - DUT sends a MLE Child ID Response to MED1
msg = leader_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
check_child_id_response(msg, network_data_check=(NetworkDataCheckType.PREFIX_CNT, 2))
# Step 10 - DUT sends Child Update Response
msg_chd_upd_res_to_med = leader_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_RESPONSE)
check_child_update_response(msg_chd_upd_res_to_med, address_registration=CheckType.CONTAIN)
msg = med1_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_REQUEST)
check_message_address_registration_addr_set_equals(msg, msg_chd_upd_res_to_med)
check_child_update_response(msg_chd_upd_res_to_sed, address_registration=CheckType.CONTAIN)
msg = sed1_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_REQUEST)
check_message_address_registration_addr_set_equals(msg, msg_chd_upd_res_to_sed)
if __name__ == '__main__':
unittest.main()
@@ -27,14 +27,15 @@
# POSSIBILITY OF SUCH DAMAGE.
#
from binascii import hexlify
import time
import unittest
from command import CheckType
from command import NetworkDataCheckType
import config
import command
from command import CheckType
import mle
import network_data
import node
LEADER = 1
@@ -131,8 +132,8 @@ class Cert_7_1_3_BorderRouterAsLeader(unittest.TestCase):
# 3 - Leader
msg = leader_messages.next_mle_message(mle.CommandType.DATA_RESPONSE)
command.check_data_response(msg, network_data=CheckType.CONTAIN,
prefixes=[('2001:2:0:1::/64', 'paros'), ('2001:2:0:2::/64', 'paro')])
command.check_data_response(msg, network_data_check=(NetworkDataCheckType.PREFIX_CONTENT,
[{network_data.TlvType.PREFIX:b'2001000200000001'}, {network_data.TlvType.PREFIX:b'2001000200000002'}]))
# 4 - N/A
# Get addresses registered by MED1
@@ -143,7 +144,7 @@ class Cert_7_1_3_BorderRouterAsLeader(unittest.TestCase):
# Make a copy of leader's messages to ensure that we don't miss messages to SED1
leader_messages_copy = leader_messages.clone()
msg = leader_messages_copy.next_mle_message(mle.CommandType.CHILD_UPDATE_RESPONSE, sent_to_node=self.nodes[MED1])
command.check_child_update_response_from_parent(msg, address_registration=CheckType.CONTAIN)
command.check_child_update_response(msg, address_registration=CheckType.CONTAIN)
leader_addresses = msg.get_mle_message_tlv(mle.AddressRegistration).addresses
self.assertTrue(all(addr in leader_addresses for addr in med1_addresses))
@@ -163,7 +164,7 @@ class Cert_7_1_3_BorderRouterAsLeader(unittest.TestCase):
# 8 - Leader
msg = leader_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_RESPONSE, sent_to_node=self.nodes[SED1])
command.check_child_update_response_from_parent(msg, address_registration=CheckType.CONTAIN)
command.check_child_update_response(msg, address_registration=CheckType.CONTAIN)
leader_addresses = msg.get_mle_message_tlv(mle.AddressRegistration).addresses
self.assertTrue(all(addr in leader_addresses for addr in sed1_addresses))
+79 -13
View File
@@ -27,14 +27,16 @@
# POSSIBILITY OF SUCH DAMAGE.
#
import binascii
import sys
import ipv6
from network_data import Prefix, BorderRouter, LowpanId
import network_data
import network_layer
import config
import mle
from collections import Counter
from enum import IntEnum
class CheckType(IntEnum):
@@ -42,6 +44,10 @@ class CheckType(IntEnum):
NOT_CONTAIN = 1
OPTIONAL = 2
class NetworkDataCheckType:
PREFIX_CNT = 1
PREFIX_CONTENT = 2
def check_address_query(command_msg, source_node, destination_address):
"""Verify source_node sent a properly formatted Address Query Request message to the destination_address.
"""
@@ -303,10 +309,46 @@ def check_child_id_request(command_msg, tlv_request = CheckType.OPTIONAL, \
check_tlv_request_tlv(command_msg, CheckType.CONTAIN, mle.TlvType.ADDRESS16)
check_tlv_request_tlv(command_msg, CheckType.CONTAIN, mle.TlvType.NETWORK_DATA)
def find_prefix_tlv(tlvs, cond_map):
"""Find a prefix tlv in tlvs which matchs some conditions specified by cond_map
"""
for tlv in tlvs:
if network_data.TlvType.PREFIX in cond_map:
if binascii.hexlify(tlv.prefix) != cond_map[network_data.TlvType.PREFIX]:
continue
if network_data.TlvType.BORDER_ROUTER in cond_map:
border_router_tlv = get_sub_tlv(tlv.sub_tlvs, network_data.BorderRouter)
if border_router_tlv.border_router_16 != cond_map[network_data.TlvType.BORDER_ROUTER]:
continue
return tlv
return None
def check_network_data(data, check_detail):
check_type = check_detail[0]
prefixes = [tlv for tlv in data.tlvs if isinstance(tlv, network_data.Prefix)]
if check_type == NetworkDataCheckType.PREFIX_CNT:
# check_detail[1] should be a integer number representing the minimum count of prefixes should be
min_cnt = check_detail[1]
assert len(prefixes) >= min_cnt, 'Network data should contain at least {} prefixes'.format(mn_cnt)
for prefix in prefixes:
check_prefix(prefix)
elif check_type == NetworkDataCheckType.PREFIX_CONTENT:
# check_detail[1] should be a list of dictionary(like
# [{network_data.TlvType.PREFIX='...', network_data.TlvType.BORDER_ROUTER='...'}])
# each entry of the dictionary represents one thing to check of the prefix tlv
assert len(prefixes) >= len(check_detail[1]), 'Network data seems to have less prefixes than expected'
# basic check of prefixes
for prefix in prefixes:
check_prefix(prefix)
for cond_map in check_detail[1]:
tlv = find_prefix_tlv(prefixes, cond_map)
assert tlv is not None, 'Some prefix sub-tlv is not found:{}'.format(cond_map)
def check_child_id_response(command_msg, route64 = CheckType.OPTIONAL, network_data = CheckType.OPTIONAL, \
address_registration = CheckType.OPTIONAL, active_timestamp = CheckType.OPTIONAL, \
pending_timestamp = CheckType.OPTIONAL, active_operational_dataset = CheckType.OPTIONAL, \
pending_operational_dataset = CheckType.OPTIONAL):
pending_operational_dataset = CheckType.OPTIONAL,
network_data_check = None):
"""Verify a properly formatted Child Id Response command message.
"""
command_msg.assertMleMessageContainsTlv(mle.SourceAddress)
@@ -321,6 +363,16 @@ def check_child_id_response(command_msg, route64 = CheckType.OPTIONAL, network_d
check_mle_optional_tlv(command_msg, active_operational_dataset, mle.ActiveOperationalDataset)
check_mle_optional_tlv(command_msg, pending_operational_dataset, mle.PendingOperationalDataset)
if network_data_check != None:
network_data_tlv = command_msg.assertMleMessageContainsTlv(mle.NetworkData)
check_network_data(network_data_tlv, network_data_check)
def check_prefix(prefix):
"""Verify if a prefix contains 6loWPAN sub-TLV and border router sub-TLV
"""
assert contains_tlv(prefix.sub_tlvs, network_data.BorderRouter), 'Prefix doesn\'t contain a border router sub-TLV!'
assert contains_tlv(prefix.sub_tlvs, network_data.LowpanId), 'Prefix doesn\'t contain a LowpanId sub-TLV!'
def check_child_update_request_by_child(command_msg):
command_msg.assertMleMessageContainsTlv(mle.LeaderData)
command_msg.assertMleMessageContainsTlv(mle.Mode)
@@ -356,24 +408,21 @@ def check_secure_mle_key_id_mode(command_msg, key_id_mode):
assert isinstance(command_msg.mle, mle.MleMessageSecured)
assert command_msg.mle.aux_sec_hdr.key_id_mode == key_id_mode
def check_data_response(command_msg, network_data=CheckType.OPTIONAL, prefixes=[],
active_timestamp=CheckType.OPTIONAL):
def check_data_response(command_msg, network_data_opt=CheckType.OPTIONAL,
active_timestamp=CheckType.OPTIONAL,
network_data_check=None):
"""Verify a properly formatted Data Response command message.
"""
check_secure_mle_key_id_mode(command_msg, 0x02)
command_msg.assertMleMessageContainsTlv(mle.SourceAddress)
command_msg.assertMleMessageContainsTlv(mle.LeaderData)
check_mle_optional_tlv(command_msg, network_data, mle.NetworkData)
check_mle_optional_tlv(command_msg, network_data_opt, mle.NetworkData)
check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp)
if network_data == CheckType.CONTAIN and len(prefixes) > 0:
network_data_tlv = command_msg.get_mle_message_tlv(mle.NetworkData)
prefix_tlvs = [tlv for tlv in network_data_tlv.tlvs if isinstance(tlv, Prefix)]
assert len(prefix_tlvs) >= len(prefixes)
for prefix in prefix_tlvs:
assert contains_tlv(prefix.sub_tlvs, BorderRouter)
assert contains_tlv(prefix.sub_tlvs, LowpanId)
if network_data_check != None:
network_data_tlv = command_msg.assertMleMessageContainsTlv(mle.NetworkData)
check_network_data(network_data_tlv, network_data_check)
def check_child_update_request_from_parent(command_msg, leader_data=CheckType.OPTIONAL,
network_data=CheckType.OPTIONAL, challenge=CheckType.OPTIONAL,
@@ -389,7 +438,7 @@ def check_child_update_request_from_parent(command_msg, leader_data=CheckType.OP
check_mle_optional_tlv(command_msg, tlv_request, mle.TlvRequest)
check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp)
def check_child_update_response_from_parent(command_msg, timeout=CheckType.OPTIONAL,
def check_child_update_response(command_msg, timeout=CheckType.OPTIONAL,
address_registration=CheckType.OPTIONAL, address16=CheckType.OPTIONAL,
leader_data=CheckType.OPTIONAL, network_data=CheckType.OPTIONAL, response=CheckType.OPTIONAL,
link_layer_frame_counter=CheckType.OPTIONAL, mle_frame_counter=CheckType.OPTIONAL):
@@ -408,7 +457,24 @@ def check_child_update_response_from_parent(command_msg, timeout=CheckType.OPTIO
check_mle_optional_tlv(command_msg, link_layer_frame_counter, mle.LinkLayerFrameCounter)
check_mle_optional_tlv(command_msg, mle_frame_counter, mle.MleFrameCounter)
def unhashable_items_lists_equals(list1, list2):
return all(item in list1 for item in list2) and all(item in list2 for item in list1)
def check_message_address_registration_addr_set_equals(command_msg1, command_msg2):
"""Verify that all addresses in the address set of AddressRegistration tlv in msg2
are contained in that address set of AddressRegistration tlv in msg1
"""
addr_reg_tlv1 = command_msg1.assertMleMessageContainsTlv(mle.AddressRegistration)
addr_reg_tlv2 = command_msg2.assertMleMessageContainsTlv(mle.AddressRegistration)
# unordered lists comparison
assert unhashable_items_lists_equals(addr_reg_tlv1.addresses, addr_reg_tlv2.addresses), 'Some addresses are not included in AddressRegistration TLV'
def get_sub_tlv(tlvs, tlv_type):
for sub_tlv in tlvs:
if isinstance(sub_tlv, tlv_type):
return sub_tlv
def check_address_registration_tlv(addr_reg_tlv, address_set):
"""Verify all addresses contained in address_set are contained in add_reg_tlv
"""
assert all(addr in addr_reg_tlv.addresses for addr in address_set), 'Some addresses are not included in AddressRegistration TLV'
+1 -1
View File
@@ -292,7 +292,7 @@ class Message(object):
if self.mac_header.dest_address == mac_address:
sent_to_node = True
assert sent_to_node
assert sent_to_node == True
def assertSentToDestinationAddress(self, ipv6_address):
if sys.version_info[0] == 2: