[tests] add traffic analysis for Cert_5_3_04_AddressMapCache.py (#2290)

This commit is contained in:
hjian2017
2017-10-30 11:34:53 -05:00
committed by Jonathan Hui
parent 3c6f7d811e
commit 58a89be622
3 changed files with 67 additions and 63 deletions
@@ -86,13 +86,14 @@ class Cert_5_2_06_RouterDowngrade(unittest.TestCase):
# 3 DUT_ROUTER1:
time.sleep(10)
self.assertEqual(self.nodes[DUT_ROUTER1].get_state(), 'child')
# Verify it sent a Parent Request and Child ID Request.
dut_messages = self.sniffer.get_messages_sent_by(DUT_ROUTER1)
dut_messages.next_mle_message(mle.CommandType.PARENT_REQUEST)
dut_messages.next_mle_message(mle.CommandType.CHILD_ID_REQUEST)
# Verify it sent an Address Release Message to the Leader when it attached as a child.
self.assertEqual(self.nodes[DUT_ROUTER1].get_state(), 'child')
msg = dut_messages.next_coap_message('0.02')
command.check_address_release(msg, self.nodes[LEADER])
@@ -31,16 +31,18 @@ import time
import unittest
import node
import config
import command
LEADER = 1
ROUTER1 = 2
DUT_ROUTER1 = 2
SED1 = 3
ED2 = 4
ED3 = 5
ED4 = 6
ED5 = 7
ED1 = 4
ED2 = 5
ED3 = 6
ED4 = 7
MTDS = [SED1, ED2, ED3, ED4, ED5]
MTDS = [SED1, ED1, ED2, ED3, ED4]
class Cert_5_3_4_AddressMapCache(unittest.TestCase):
def setUp(self):
@@ -50,90 +52,88 @@ class Cert_5_3_4_AddressMapCache(unittest.TestCase):
self.nodes[LEADER].set_panid(0xface)
self.nodes[LEADER].set_mode('rsdn')
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ED1].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ED2].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ED3].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ED4].get_addr64())
self.nodes[LEADER].add_whitelist(self.nodes[ED5].get_addr64())
self.nodes[LEADER].enable_whitelist()
self.nodes[ROUTER1].set_panid(0xface)
self.nodes[ROUTER1].set_mode('rsdn')
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ROUTER1].add_whitelist(self.nodes[SED1].get_addr64())
self.nodes[ROUTER1].enable_whitelist()
self.nodes[ROUTER1].set_router_selection_jitter(1)
self.nodes[DUT_ROUTER1].set_panid(0xface)
self.nodes[DUT_ROUTER1].set_mode('rsdn')
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[DUT_ROUTER1].add_whitelist(self.nodes[SED1].get_addr64())
self.nodes[DUT_ROUTER1].enable_whitelist()
self.nodes[DUT_ROUTER1].set_router_selection_jitter(1)
self.nodes[SED1].set_panid(0xface)
self.nodes[SED1].set_mode('rsn')
self.nodes[SED1].add_whitelist(self.nodes[ROUTER1].get_addr64())
self.nodes[SED1].set_mode('s')
self.nodes[SED1].add_whitelist(self.nodes[DUT_ROUTER1].get_addr64())
# Set the SED1's timeout in order to receive the icmp reply when keep alive with DUT_ROUTER.
self.nodes[SED1].set_timeout(5)
self.nodes[SED1].enable_whitelist()
self.nodes[ED2].set_panid(0xface)
self.nodes[ED2].set_mode('rsn')
self.nodes[ED2].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ED2].enable_whitelist()
for ED in [ED1, ED2, ED3, ED4]:
self.nodes[ED].set_panid(0xface)
self.nodes[ED].set_mode('rsn')
self.nodes[ED].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ED].enable_whitelist()
self.nodes[ED3].set_panid(0xface)
self.nodes[ED3].set_mode('rsn')
self.nodes[ED3].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ED3].enable_whitelist()
self.nodes[ED4].set_panid(0xface)
self.nodes[ED4].set_mode('rsn')
self.nodes[ED4].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ED4].enable_whitelist()
self.nodes[ED5].set_panid(0xface)
self.nodes[ED5].set_mode('rsn')
self.nodes[ED5].add_whitelist(self.nodes[LEADER].get_addr64())
self.nodes[ED5].enable_whitelist()
self.sniffer = config.create_default_thread_sniffer()
self.sniffer.start()
def tearDown(self):
self.sniffer.stop()
del self.sniffer
for node in list(self.nodes.values()):
node.stop()
del self.nodes
def test(self):
# 1
self.nodes[LEADER].start()
self.nodes[LEADER].set_state('leader')
self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
self.nodes[ROUTER1].start()
self.nodes[DUT_ROUTER1].start()
for i in MTDS:
self.nodes[i].start()
time.sleep(5)
self.assertEqual(self.nodes[ROUTER1].get_state(), 'router')
self.nodes[SED1].start()
time.sleep(5)
self.assertEqual(self.nodes[SED1].get_state(), 'child')
self.assertEqual(self.nodes[DUT_ROUTER1].get_state(), 'router')
self.nodes[ED2].start()
time.sleep(5)
self.assertEqual(self.nodes[ED2].get_state(), 'child')
for i in MTDS:
self.assertEqual(self.nodes[i].get_state(), 'child')
self.nodes[ED3].start()
time.sleep(5)
self.assertEqual(self.nodes[ED3].get_state(), 'child')
# This method flushes the message queue so calling this method again will return only the newly logged messages.
dut_messages = self.sniffer.get_messages_sent_by(DUT_ROUTER1)
self.nodes[ED4].start()
time.sleep(5)
self.assertEqual(self.nodes[ED4].get_state(), 'child')
# 2
for ED in [ED1, ED2, ED3, ED4]:
ed_mleid = self.nodes[ED].get_ip6_address(config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[SED1].ping(ed_mleid))
time.sleep(5)
self.nodes[ED5].start()
time.sleep(5)
self.assertEqual(self.nodes[ED5].get_state(), 'child')
# Verify DUT_ROUTER1 generated an Address Query Request to find each node's RLOC.
dut_messages = self.sniffer.get_messages_sent_by(DUT_ROUTER1)
msg = dut_messages.next_coap_message('0.02', '/a/aq')
command.check_address_query(msg, self.nodes[DUT_ROUTER1], config.REALM_LOCAL_ALL_ROUTERS_ADDRESS)
for i in range(4, 8):
addrs = self.nodes[i].get_addrs()
for addr in addrs:
if addr[0:4] != 'fe80':
self.assertTrue(self.nodes[SED1].ping(addr))
# 3 & 4
# This method flushes the message queue so calling this method again will return only the newly logged messages.
dut_messages = self.sniffer.get_messages_sent_by(DUT_ROUTER1)
for i in range(4, 8):
addrs = self.nodes[i].get_addrs()
for addr in addrs:
if addr[0:4] != 'fe80':
self.assertTrue(self.nodes[SED1].ping(addr))
for ED in [ED1, ED2, ED3, ED4]:
ed_mleid = self.nodes[ED].get_ip6_address(config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[SED1].ping(ed_mleid))
time.sleep(5)
# Verify DUT_ROUTER1 didn't generate an Address Query Request.
dut_messages = self.sniffer.get_messages_sent_by(DUT_ROUTER1)
msg = dut_messages.next_coap_message('0.02', '/a/aq', False)
assert msg is None, "Error: The DUT sent an unexpected Address Query Request"
if __name__ == '__main__':
unittest.main()
+4 -1
View File
@@ -10,7 +10,10 @@ def check_address_query(command_msg, source_node, destination_address):
command_msg.assertCoapMessageContainsTlv(network_layer.TargetEid)
source_rloc = source_node.get_ip6_address(config.ADDRESS_TYPE.RLOC)
assert ipv6.ip_address(source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address, "Error: The IPv6 source address is not the RLOC of the originator."
assert ipv6.ip_address(source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address, \
"Error: The IPv6 source address is not the RLOC of the originator. The source node's rloc is: " \
+ str(ipv6.ip_address(source_rloc)) + ", but the source_rloc in command msg is: " \
+ str(command_msg.ipv6_packet.ipv6_header.source_address)
assert ipv6.ip_address(destination_address.decode('utf-8')) == command_msg.ipv6_packet.ipv6_header.destination_address, "Error: The IPv6 destination address is not expected."