[tests] add traffic analysis for Cert_5_3_05_RoutingLinkQuality.py (#2289)

This commit is contained in:
hjian2017
2017-11-01 10:53:18 -05:00
committed by Jonathan Hui
parent 977cd004bf
commit d8e2c2f73d
3 changed files with 81 additions and 48 deletions
@@ -31,9 +31,11 @@ import time
import unittest
import node
import config
import command
LEADER = 1
ROUTER1 = 2
DUT_ROUTER1 = 2
ROUTER2 = 3
ROUTER3 = 4
@@ -45,90 +47,93 @@ class Cert_5_3_5_RoutingLinkQuality(unittest.TestCase):
self.nodes[LEADER].set_panid(0xface)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER2].get_addr64())
self.nodes[LEADER].enable_whitelist()
self.nodes[ROUTER1].set_panid(0xface)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[ROUTER2].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[ROUTER3].get_addr64())
self.nodes[ROUTER1].enable_whitelist()
self.nodes[ROUTER1].set_router_selection_jitter(1)
self.nodes[DUT_ROUTER1].set_panid(0xface)
self.nodes[DUT_ROUTER1].set_mode('rsdn')
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[ROUTER2].get_addr64())
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[ROUTER3].get_addr64())
self.nodes[DUT_ROUTER1].enable_whitelist()
self.nodes[DUT_ROUTER1].set_router_selection_jitter(1)
self.nodes[ROUTER2].set_panid(0xface)
self.nodes[ROUTER2].set_mode('rsdn')
self.nodes[ROUTER2].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER2].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[ROUTER2].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64())
self.nodes[ROUTER2].enable_whitelist()
self.nodes[ROUTER2].set_router_selection_jitter(1)
self.nodes[ROUTER3].set_panid(0xface)
self.nodes[ROUTER3].set_mode('rsdn')
self.nodes[ROUTER3].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[ROUTER3].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64())
self.nodes[ROUTER3].enable_whitelist()
self.nodes[ROUTER3].set_router_selection_jitter(1)
self.sniffer = config.create_default_thread_sniffer()
self.sniffer.start()
def tearDown(self):
self.sniffer.stop()
del self.sniffer
for node in list(self.nodes.values()):
node.stop()
del self.nodes
def test(self):
# 1
self.nodes[LEADER].start()
self.nodes[LEADER].set_state('leader')
self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
self.nodes[ROUTER1].start()
time.sleep(5)
self.assertEqual(self.nodes[ROUTER1].get_state(), 'router')
self.nodes[ROUTER2].start()
time.sleep(5)
self.assertEqual(self.nodes[ROUTER2].get_state(), 'router')
self.nodes[ROUTER3].start()
time.sleep(5)
self.assertEqual(self.nodes[ROUTER3].get_state(), 'router')
for router in range(DUT_ROUTER1, ROUTER3 + 1):
self.nodes[router].start()
time.sleep(10)
addrs = self.nodes[ROUTER3].get_addrs()
for addr in addrs:
if addr[0:4] != 'fe80':
self.assertTrue(self.nodes[LEADER].ping(addr))
for router in range(DUT_ROUTER1, ROUTER3 + 1):
self.assertEqual(self.nodes[router].get_state(), 'router')
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER1].get_addr64(), rssi=-95)
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(), rssi=-95)
# 2 & 3
leader_rloc = self.nodes[LEADER].get_ip6_address(config.ADDRESS_TYPE.RLOC)
# Verify the ICMPv6 Echo Request took the least cost path.
self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
path = [ROUTER3, DUT_ROUTER1, LEADER]
command.check_icmp_path(self.sniffer, path, self.nodes)
# 4 & 5
self.nodes[LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_1'])
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_1'])
time.sleep(70)
addrs = self.nodes[ROUTER3].get_addrs()
for addr in addrs:
if addr[0:4] != 'fe80':
self.assertTrue(self.nodes[LEADER].ping(addr))
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER1].get_addr64(), rssi=-85)
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(), rssi=-85)
# Verify the ICMPv6 Echo Request took the longer path because it cost less.
self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
path = [ROUTER3, DUT_ROUTER1, ROUTER2, LEADER]
command.check_icmp_path(self.sniffer, path, self.nodes)
# 6 & 7
self.nodes[LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_2'])
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_2'])
time.sleep(70)
addrs = self.nodes[ROUTER3].get_addrs()
for addr in addrs:
if addr[0:4] != 'fe80':
self.assertTrue(self.nodes[LEADER].ping(addr))
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER1].get_addr64(), rssi=-100)
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(), rssi=-100)
# Verify the direct neighbor would be prioritized when there are two paths with the same cost.
self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
path = [ROUTER3, DUT_ROUTER1, LEADER]
command.check_icmp_path(self.sniffer, path, self.nodes)
# 8 & 9
self.nodes[LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64(), config.RSSI['LINK_QULITY_0'])
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64(), config.RSSI['LINK_QULITY_0'])
time.sleep(70)
addrs = self.nodes[ROUTER3].get_addrs()
for addr in addrs:
if addr[0:4] != 'fe80':
self.assertTrue(self.nodes[LEADER].ping(addr))
# Verify the ICMPv6 Echo Request took the longer path.
leader_rloc = self.nodes[LEADER].get_ip6_address(config.ADDRESS_TYPE.RLOC)
self.assertTrue(self.nodes[ROUTER3].ping(leader_rloc))
path = [ROUTER3, DUT_ROUTER1, ROUTER2, LEADER]
command.check_icmp_path(self.sniffer, path, self.nodes)
if __name__ == '__main__':
unittest.main()
+19
View File
@@ -60,3 +60,22 @@ def check_link_accept(command_msg, destination_node):
destination_link_local = destination_node.get_ip6_address(config.ADDRESS_TYPE.LINK_LOCAL)
assert ipv6.ip_address(destination_link_local) == command_msg.ipv6_packet.ipv6_header.destination_address, "Error: The destination is unexpected"
def check_icmp_path(sniffer, path, nodes, icmp_type = ipv6.ICMP_ECHO_REQUEST):
"""Verify icmp message is forwarded along the path.
"""
len_path = len(path)
# Verify icmp message is forwarded to the next node of the path.
for i in range(0, len_path):
node_msg = sniffer.get_messages_sent_by(path[i])
node_icmp_msg = node_msg.get_icmp_message(icmp_type)
if i < len_path - 1:
next_node = nodes[path[i + 1]]
next_node_rloc16 = next_node.get_addr16()
assert next_node_rloc16 == node_icmp_msg.mac_header.dest_address.rloc, "Error: The path is unexpected."
else:
return True
return False
+9
View File
@@ -384,6 +384,15 @@ class MessagesSet(object):
return False
def get_icmp_message(self, icmp_type):
for m in self.messages:
if m.type != MessageType.ICMP:
continue
if m.icmp.header.type == icmp_type:
return m
return None
def contains_mle_message(self, command_type):
for m in self.messages: