[tests] add traffic analysis for Cert_5_5_01 as TestPlan (#2540)

This commit is contained in:
Zhanglong Xia
2018-02-09 01:04:04 +08:00
committed by Jonathan Hui
parent 6805cfa81e
commit edc299a368
5 changed files with 207 additions and 98 deletions
@@ -86,7 +86,8 @@ class Cert_5_2_7_REEDSynchronization(unittest.TestCase):
# 3. DUT_REED: Verify sent a Link Request to at least 3 neighboring Routers.
for i in range(0, 3):
msg = reed_messages.next_mle_message(mle.CommandType.LINK_REQUEST)
command.check_link_request(msg)
command.check_link_request(msg, source_address = command.CheckType.CONTAIN, \
leader_data = command.CheckType.CONTAIN)
# 4. DUT_ROUTER1: Verify sent a Link Accept to DUT_REED.
self.simulator.go(30)
+151
View File
@@ -0,0 +1,151 @@
#!/usr/bin/env python
#
# Copyright (c) 2018, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import copy
import time
import unittest
import command
import config
import mle
import node
DUT_LEADER = 1
DUT_ROUTER1 = 2
class Cert_5_5_1_LeaderReboot(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
self.nodes = {}
for i in range(1,3):
self.nodes[i] = node.Node(i, simulator = self.simulator)
self.nodes[DUT_LEADER].set_panid(0xface)
self.nodes[DUT_LEADER].set_mode('rsdn')
self._setUpLeader()
self.nodes[DUT_ROUTER1].set_panid(0xface)
self.nodes[DUT_ROUTER1].set_mode('rsdn')
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[DUT_LEADER].get_addr64())
self.nodes[DUT_ROUTER1].enable_whitelist()
self.nodes[DUT_ROUTER1].set_router_selection_jitter(1)
def _setUpLeader(self):
self.nodes[DUT_LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64())
self.nodes[DUT_LEADER].enable_whitelist()
def tearDown(self):
for node in list(self.nodes.values()):
node.stop()
del self.nodes
del self.simulator
def test(self):
# 1 ALL: Build and verify the topology
self.nodes[DUT_LEADER].start()
self.simulator.go(5)
self.assertEqual(self.nodes[DUT_LEADER].get_state(), 'leader')
self.nodes[DUT_ROUTER1].start()
self.simulator.go(5)
self.assertEqual(self.nodes[DUT_ROUTER1].get_state(), 'router')
# 2 DUT_LEADER, DUT_ROUTER1: Verify both DUT_LEADER and DUT_ROUTER1 send MLE Advertisement message
leader_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = leader_messages.next_mle_message(mle.CommandType.ADVERTISEMENT)
command.check_mle_advertisement(msg)
router1_messages = self.simulator.get_messages_sent_by(DUT_ROUTER1)
msg = router1_messages.next_mle_message(mle.CommandType.ADVERTISEMENT)
command.check_mle_advertisement(msg)
# Send a harness helper ping to the DUT
router1_rloc = self.nodes[DUT_ROUTER1].get_ip6_address(config.ADDRESS_TYPE.RLOC)
self.assertTrue(self.nodes[DUT_LEADER].ping(router1_rloc))
leader_rloc = self.nodes[DUT_LEADER].get_ip6_address(config.ADDRESS_TYPE.RLOC)
self.assertTrue(self.nodes[DUT_ROUTER1].ping(leader_rloc))
# 3 DUT_LEADER: Reset DUT_LEADER
leader_rloc16 = self.nodes[DUT_LEADER].get_addr16()
self.nodes[DUT_LEADER].reset()
self._setUpLeader()
# Clean sniffer's buffer
self.simulator.get_messages_sent_by(DUT_LEADER)
self.simulator.get_messages_sent_by(DUT_ROUTER1)
# DUT_LEADER sleep time is less than leader timeout value
self.simulator.go(config.MAX_ADVERTISEMENT_INTERVAL)
# Verify DUT_LEADER didn't send MLE Advertisement messages
leader_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
msg = leader_messages.next_mle_message(mle.CommandType.ADVERTISEMENT, False)
self.assertTrue(msg is None)
self.nodes[DUT_LEADER].start()
# Verify the DUT_LEADER is still a leader
self.simulator.go(5)
self.assertEqual(self.nodes[DUT_LEADER].get_state(), 'leader')
self.assertEqual(self.nodes[DUT_LEADER].get_addr16(), leader_rloc16)
# 4 DUT_LEADER: Verify DUT_LEADER sent a multicast Link Request message
leader_messages = self.simulator.get_messages_sent_by(DUT_LEADER)
leader_messages_temp = copy.deepcopy(leader_messages)
msg = leader_messages.next_mle_message(mle.CommandType.LINK_REQUEST)
command.check_link_request(msg, tlv_request_address16 = command.CheckType.CONTAIN, \
tlv_request_route64 = command.CheckType.CONTAIN)
# 5 DUT_ROUTER1: Verify DUT_ROUTER1 replied with Link Accept message
router1_messages = self.simulator.get_messages_sent_by(DUT_ROUTER1)
router1_messages_temp = copy.deepcopy(router1_messages)
msg = router1_messages.next_mle_message(mle.CommandType.LINK_ACCEPT)
if msg is not None:
command.check_link_accept(msg, self.nodes[DUT_LEADER], address16 = command.CheckType.CONTAIN, \
leader_data = command.CheckType.CONTAIN, route64 = command.CheckType.CONTAIN)
else:
msg = router1_messages_temp.next_mle_message(mle.CommandType.LINK_ACCEPT_AND_REQUEST)
self.assertTrue(msg is not None)
command.check_link_accept(msg, self.nodes[DUT_LEADER], address16 = command.CheckType.CONTAIN, \
leader_data = command.CheckType.CONTAIN, route64 = command.CheckType.CONTAIN, \
challenge = command.CheckType.CONTAIN)
# 6 DUT_LEADER: Verify DUT_LEADER didn't send a Parent Request message
msg = leader_messages_temp.next_mle_message(mle.CommandType.PARENT_REQUEST, False)
self.assertTrue(msg is None)
# 7 ALL: Verify connectivity by sending an ICMPv6 Echo Request from DUT_LEADER to DUT_ROUTER1 link local address
router1_link_local_address = self.nodes[DUT_ROUTER1].get_ip6_address(config.ADDRESS_TYPE.LINK_LOCAL)
self.assertTrue(self.nodes[DUT_LEADER].ping(router1_link_local_address))
if __name__ == '__main__':
unittest.main()
@@ -1,88 +0,0 @@
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import time
import unittest
import config
import node
LEADER = 1
ROUTER = 2
class Cert_5_5_1_LeaderReset(unittest.TestCase):
def setUp(self):
self.simulator = config.create_default_simulator()
self.nodes = {}
for i in range(1,3):
self.nodes[i] = node.Node(i, simulator=self.simulator)
self.nodes[LEADER].set_panid(0xface)
self.nodes[LEADER].set_mode('rsdn')
self._setUpLeader()
self.nodes[ROUTER].set_panid(0xface)
self.nodes[ROUTER].set_mode('rsdn')
self.nodes[ROUTER].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER].enable_whitelist()
self.nodes[ROUTER].set_router_selection_jitter(1)
def _setUpLeader(self):
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER].get_addr64())
self.nodes[LEADER].enable_whitelist()
def tearDown(self):
for node in list(self.nodes.values()):
node.stop()
del self.nodes
del self.simulator
def test(self):
self.nodes[LEADER].start()
self.simulator.go(5)
self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
self.nodes[ROUTER].start()
self.simulator.go(5)
self.assertEqual(self.nodes[ROUTER].get_state(), 'router')
rloc16 = self.nodes[LEADER].get_addr16()
self.nodes[LEADER].reset();
self._setUpLeader()
self.simulator.go(5)
self.nodes[LEADER].start()
self.simulator.go(5)
self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
self.assertEqual(self.nodes[LEADER].get_addr16(), rloc16)
if __name__ == '__main__':
unittest.main()
+2 -2
View File
@@ -62,7 +62,7 @@ EXTRA_DIST = \
Cert_5_3_09_AddressQuery.py \
Cert_5_3_10_AddressQuery.py \
Cert_5_3_11_AddressQueryTimeoutIntervals.py \
Cert_5_5_01_LeaderReset.py \
Cert_5_5_01_LeaderReboot.py \
Cert_5_5_02_LeaderReboot.py \
Cert_5_5_03_SplitMergeChildren.py \
Cert_5_5_04_SplitMergeRouters.py \
@@ -193,7 +193,7 @@ check_SCRIPTS = \
Cert_5_3_09_AddressQuery.py \
Cert_5_3_10_AddressQuery.py \
Cert_5_3_11_AddressQueryTimeoutIntervals.py \
Cert_5_5_01_LeaderReset.py \
Cert_5_5_01_LeaderReboot.py \
Cert_5_5_02_LeaderReboot.py \
Cert_5_5_03_SplitMergeChildren.py \
Cert_5_5_04_SplitMergeRouters.py \
+52 -7
View File
@@ -66,25 +66,70 @@ def check_address_release(command_msg, destination_node):
destination_rloc = destination_node.get_ip6_address(config.ADDRESS_TYPE.RLOC)
assert ipv6.ip_address(destination_rloc) == command_msg.ipv6_packet.ipv6_header.destination_address, "Error: The destination is not RLOC address"
def check_link_request(command_msg):
def check_tlv_request_tlv(command_msg, check_type, tlv_id):
"""Verify if TLV Request TLV contains specified TLV ID
"""
tlv_request_tlv = command_msg.get_mle_message_tlv(mle.TlvRequest)
if check_type == CheckType.CONTAIN:
assert tlv_request_tlv is not None, "Error: The msg doesn't contain TLV Request TLV"
assert any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs), "Error: The msg doesn't contain TLV Request TLV ID: {}".format(tlv_id)
elif check_type == CheckType.NOT_CONTAIN:
if tlv_request_tlv is not None:
assert any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs) is False, "Error: The msg contains TLV Request TLV ID: {}".format(tlv_id)
elif check_type == CheckType.OPTIONAL:
if tlv_request_tlv is not None:
if any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs):
print("TLV Request TLV contains TLV ID: {}".format(tlv_id))
else:
print("TLV Request TLV doesn't contain TLV ID: {}".format(tlv_id))
else:
print("The msg doesn't contain TLV Request TLV")
else:
raise ValueError("Invalid check type")
def check_link_request(command_msg, source_address = CheckType.OPTIONAL, leader_data = CheckType.OPTIONAL, \
tlv_request_address16 = CheckType.OPTIONAL, tlv_request_route64 = CheckType.OPTIONAL, \
tlv_request_link_margin = CheckType.OPTIONAL):
"""Verify a properly formatted Link Request command message.
"""
command_msg.assertMleMessageContainsTlv(mle.Challenge)
command_msg.assertMleMessageContainsTlv(mle.LeaderData)
command_msg.assertMleMessageContainsTlv(mle.SourceAddress)
command_msg.assertMleMessageContainsTlv(mle.Version)
def check_link_accept(command_msg, destination_node):
"""Verify a properly formatted Link Accept command message.
check_mle_optional_tlv(command_msg, source_address, mle.SourceAddress)
check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData)
check_tlv_request_tlv(command_msg, tlv_request_address16, mle.TlvType.ADDRESS16)
check_tlv_request_tlv(command_msg, tlv_request_route64, mle.TlvType.ROUTE64)
check_tlv_request_tlv(command_msg, tlv_request_link_margin, mle.TlvType.LINK_MARGIN)
def check_link_accept(command_msg, destination_node, \
leader_data = CheckType.OPTIONAL, link_margin = CheckType.OPTIONAL, mle_frame_counter = CheckType.OPTIONAL, \
challenge = CheckType.OPTIONAL, address16 = CheckType.OPTIONAL, route64 = CheckType.OPTIONAL, \
tlv_request_link_margin = CheckType.OPTIONAL):
"""verify a properly formatted link accept command message.
"""
command_msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
command_msg.assertMleMessageContainsTlv(mle.SourceAddress)
command_msg.assertMleMessageContainsTlv(mle.Response)
command_msg.assertMleMessageContainsTlv(mle.Version)
command_msg.assertMleMessageContainsOptionalTlv(mle.MleFrameCounter)
check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData)
check_mle_optional_tlv(command_msg, link_margin, mle.LinkMargin)
check_mle_optional_tlv(command_msg, mle_frame_counter, mle.MleFrameCounter)
check_mle_optional_tlv(command_msg, challenge, mle.Challenge)
check_mle_optional_tlv(command_msg, address16, mle.Address16)
check_mle_optional_tlv(command_msg, route64, mle.Route64)
check_tlv_request_tlv(command_msg, tlv_request_link_margin, mle.TlvType.LINK_MARGIN)
destination_link_local = destination_node.get_ip6_address(config.ADDRESS_TYPE.LINK_LOCAL)
assert ipv6.ip_address(destination_link_local) == command_msg.ipv6_packet.ipv6_header.destination_address, "Error: The destination is unexpected"
assert ipv6.ip_address(destination_link_local) == command_msg.ipv6_packet.ipv6_header.destination_address, \
"Error: The destination is unexpected"
def check_icmp_path(sniffer, path, nodes, icmp_type = ipv6.ICMP_ECHO_REQUEST):
"""Verify icmp message is forwarded along the path.