[thread-cert] refactor case 6.4.2 using pktverify (#5842)

This commit is contained in:
Jing Ma
2020-11-28 01:02:44 +08:00
committed by GitHub
parent 9d9646afaa
commit e1d1e0aad8
3 changed files with 190 additions and 17 deletions
@@ -28,34 +28,58 @@
#
import unittest
import copy
import config
import thread_cert
from pktverify.packet_verifier import PacketVerifier
LEADER = 1
ROUTER = 2
ED = 3
MTD = 3
FRAGMENTED_DATA_LEN = 256
# Test Purpose and Description:
# -----------------------------
# The purpose of this test case is to validate the Realm-Local addresses
# that the DUT configures.
#
# Test Topology:
# -------------
# Leader
# |
# Router
# |
# DUT
#
# DUT Types:
# ----------
# ED
# SED
class Cert_5_3_2_RealmLocal(thread_cert.TestCase):
class Cert_6_4_2_RealmLocal_Base(thread_cert.TestCase):
TOPOLOGY = {
LEADER: {
'name': 'LEADER',
'mode': 'rdn',
'panid': 0xface,
'allowlist': [ROUTER]
},
ROUTER: {
'name': 'ROUTER',
'mode': 'rdn',
'panid': 0xface,
'router_selection_jitter': 1,
'allowlist': [LEADER, ED]
'allowlist': [LEADER, MTD]
},
ED: {
MTD: {
'name': 'DUT',
'is_mtd': True,
'mode': 'rn',
'panid': 0xface,
'timeout': config.DEFAULT_CHILD_TIMEOUT,
'allowlist': [ROUTER]
},
4: {},
}
def test(self):
@@ -67,22 +91,164 @@ class Cert_5_3_2_RealmLocal(thread_cert.TestCase):
self.simulator.go(5)
self.assertEqual(self.nodes[ROUTER].get_state(), 'router')
self.nodes[ED].start()
self.nodes[MTD].start()
self.simulator.go(5)
self.assertEqual(self.nodes[ED].get_state(), 'child')
self.assertEqual(self.nodes[MTD].get_state(), 'child')
addrs = self.nodes[ED].get_addrs()
for addr in addrs:
if addr[0:4] != 'fe80':
self.assertTrue(self.nodes[LEADER].ping(addr, size=256))
self.assertTrue(self.nodes[LEADER].ping(addr))
self.collect_ipaddrs()
self.collect_rloc16s()
self.assertTrue(self.nodes[LEADER].ping('ff03::1', num_responses=2, size=256))
self.assertTrue(self.nodes[LEADER].ping('ff03::1', num_responses=2))
dut_addr = self.nodes[MTD].get_ip6_address(config.ADDRESS_TYPE.ML_EID)
self.assertTrue(self.nodes[LEADER].\
ping(dut_addr,
size=FRAGMENTED_DATA_LEN))
self.simulator.go(1)
self.assertTrue(self.nodes[LEADER].\
ping(dut_addr))
self.simulator.go(1)
self.assertTrue(self.nodes[LEADER].ping('ff33:0040:fd00:db8:0:0:0:1', num_responses=2, size=256))
self.assertTrue(self.nodes[LEADER].ping('ff33:0040:fd00:db8:0:0:0:1', num_responses=2))
if self.TOPOLOGY[MTD]['mode'] == 'rn':
self.assertTrue(self.nodes[LEADER].\
ping(config.REALM_LOCAL_ALL_NODES_ADDRESS,
num_responses=2,
size=FRAGMENTED_DATA_LEN))
self.simulator.go(2)
self.assertTrue(self.nodes[LEADER].\
ping(config.REALM_LOCAL_ALL_NODES_ADDRESS,
num_responses=2))
self.simulator.go(2)
self.assertTrue(self.nodes[LEADER].\
ping(config.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS,
num_responses=2,
size=FRAGMENTED_DATA_LEN))
self.simulator.go(2)
self.assertTrue(self.nodes[LEADER].\
ping(config.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS,
num_responses=2))
self.simulator.go(2)
def verify(self, pv):
pkts = pv.pkts
pv.summary.show()
LEADER = pv.vars['LEADER']
LEADER_MLEID = pv.vars['LEADER_MLEID']
ROUTER = pv.vars['ROUTER']
ROUTER_MLEID = pv.vars['ROUTER_MLEID']
ROUTER_RLOC16 = pv.vars['ROUTER_RLOC16']
DUT = pv.vars['DUT']
DUT_MLEID = pv.vars['DUT_MLEID']
DUT_RLOC16 = pv.vars['DUT_RLOC16']
# Step 1: Ensure topology is formed correctly
pv.verify_attached('ROUTER', 'LEADER')
pv.verify_attached('DUT', 'ROUTER', 'MTD')
# Step 2: Leader sends a Fragmented ICMPv6 Echo Request to
# DUT's ML-EID
# The DUT MUST respond with an ICMPv6 Echo Reply
_pkt = pkts.filter_ping_request().\
filter_ipv6_src_dst(LEADER_MLEID, DUT_MLEID).\
filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
must_next()
pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
filter_ipv6_src_dst(DUT_MLEID, LEADER_MLEID).\
filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
must_next()
# Step 3: Leader sends an Unfragmented ICMPv6 Echo Request to
# DUTs ML-EID
# The DUT MUST respond with an ICMPv6 Echo Reply
_pkt = pkts.filter_ping_request().\
filter_ipv6_src_dst(LEADER_MLEID, DUT_MLEID).\
must_next()
pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
filter_ipv6_src_dst(DUT_MLEID, LEADER_MLEID).\
must_next()
if self.TOPOLOGY[MTD]['mode'] == 'rn':
# Step 4: Leader sends a Fragmented ICMPv6 Echo Request to the
# Realm-Local All Nodes multicast address (FF03::1)
# The DUT MUST respond with an ICMPv6 Echo Reply
_pkt1 = pkts.filter_ping_request().\
filter_wpan_src64(LEADER).\
filter_RLANMA().\
filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
must_next()
with pkts.save_index():
pkts.filter_ping_reply(identifier=_pkt1.icmpv6.echo.identifier).\
filter_ipv6_src_dst(ROUTER_MLEID, LEADER_MLEID).\
filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
must_next()
pkts.filter_ping_request(identifier=_pkt1.icmpv6.echo.identifier).\
filter_wpan_src16_dst16(ROUTER_RLOC16, DUT_RLOC16).\
filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
must_not_next()
# Step 5: Leader sends an Unfragmented ICMPv6 Echo Request to the
# Realm-Local All Nodes multicast address (FF03::1)
# The DUT MUST respond with an ICMPv6 Echo Reply
_pkt2 = pkts.filter_ping_request().\
filter_wpan_src64(LEADER).\
filter_RLANMA().\
filter(lambda p: p.icmpv6.echo.sequence_number !=
_pkt1.icmpv6.echo.sequence_number
).\
must_next()
with pkts.save_index():
pkts.filter_ping_reply(identifier=_pkt2.icmpv6.echo.identifier).\
filter_ipv6_src_dst(ROUTER_MLEID, LEADER_MLEID).\
must_next()
pkts.filter_ping_request(identifier = _pkt2.icmpv6.echo.identifier).\
filter_wpan_src16_dst16(ROUTER_RLOC16, DUT_RLOC16).\
must_not_next()
# Step 6: Leader sends a Fragmented ICMPv6 Echo Request to the
# Realm-Local All Thread Nodes multicast address
# The DUT MUST respond with an ICMPv6 Echo Reply
_pkt = pkts.filter_ping_request().\
filter_wpan_src64(LEADER).\
filter_RLATNMA().\
filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
must_next()
pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
filter_wpan_src64(DUT).\
filter_ipv6_dst(LEADER_MLEID).\
filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
must_next()
# Step 7: Leader sends an Unfragmented ICMPv6 Echo Request to the
# Realm-Local All Thread Nodes multicast address
# The DUT MUST respond with an ICMPv6 Echo Reply
_pkt = pkts.filter_ping_request().\
filter_wpan_src64(LEADER).\
filter_RLATNMA().\
filter(lambda p: p.icmpv6.data.len != FRAGMENTED_DATA_LEN).\
must_next()
pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
filter_wpan_src64(DUT).\
filter_ipv6_dst(LEADER_MLEID).\
must_next()
class Cert_6_4_2_RealmLocal_ED(Cert_6_4_2_RealmLocal_Base):
TOPOLOGY = copy.deepcopy(Cert_6_4_2_RealmLocal_Base.TOPOLOGY)
TOPOLOGY[MTD]['mode'] = 'rn'
class Cert_6_4_2_RealmLocal_SED(Cert_6_4_2_RealmLocal_Base):
TOPOLOGY = copy.deepcopy(Cert_6_4_2_RealmLocal_Base.TOPOLOGY)
TOPOLOGY[MTD]['mode'] = '-'
del (Cert_6_4_2_RealmLocal_Base)
if __name__ == '__main__':
unittest.main()
+1
View File
@@ -55,6 +55,7 @@ LINK_LOCAL_REGEX_PATTERN = '^fe80:.*'
ALOC_FLAG_REGEX_PATTERN = '.*:fc..$'
LINK_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS = 'ff32:40:fd00:db8:0:0:0:1'
REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS = 'ff33:40:fd00:db8:0:0:0:1'
REALM_LOCAL_ALL_NODES_ADDRESS = 'ff03::1'
REALM_LOCAL_ALL_ROUTERS_ADDRESS = 'ff03::2'
LINK_LOCAL_ALL_NODES_ADDRESS = 'ff02::1'
LINK_LOCAL_ALL_ROUTERS_ADDRESS = 'ff02::2'
@@ -504,9 +504,15 @@ class PacketFilter(object):
assert isinstance(dst_addr, (str, Ipv6Addr))
return self.filter(lambda p: p.ipv6.src == src_addr and p.ipv6.dst == dst_addr, **kwargs)
def filter_RLANMA(self, **kwargs):
return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_ALL_NODES_ADDRESS, **kwargs)
def filter_RLARMA(self, **kwargs):
return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_ALL_ROUTERS_ADDRESS, **kwargs)
def filter_RLATNMA(self, **kwargs):
return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS, **kwargs)
def filter_LLANMA(self, **kwargs):
return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_ALL_NODES_MULTICAST_ADDRESS, **kwargs)