mirror of
https://github.com/espressif/openthread.git
synced 2026-06-06 05:24:51 +00:00
[tests] add 5_7_01 network diagnostic test cases (#4663)
This commit adds Network Diagnostic TLV parsing and 5.7.01 test case.
This commit is contained in:
+229
@@ -0,0 +1,229 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Copyright (c) 2020, The OpenThread Authors.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
# 1. Redistributions of source code must retain the above copyright
|
||||
# notice, this list of conditions and the following disclaimer.
|
||||
# 2. Redistributions in binary form must reproduce the above copyright
|
||||
# notice, this list of conditions and the following disclaimer in the
|
||||
# documentation and/or other materials provided with the distribution.
|
||||
# 3. Neither the name of the copyright holder nor the
|
||||
# names of its contributors may be used to endorse or promote products
|
||||
# derived from this software without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
||||
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
# POSSIBILITY OF SUCH DAMAGE.
|
||||
#
|
||||
|
||||
import unittest
|
||||
|
||||
import config
|
||||
import mle
|
||||
import network_diag
|
||||
import network_layer
|
||||
import node
|
||||
|
||||
from network_diag import TlvType
|
||||
|
||||
LEADER = 1
|
||||
ROUTER1 = 2
|
||||
REED1 = 3
|
||||
SED1 = 4
|
||||
MED1 = 5
|
||||
FED1 = 6
|
||||
|
||||
DUT = ROUTER1
|
||||
MTDS = [MED1, SED1]
|
||||
|
||||
|
||||
class Cert_5_7_01_CoapDiagCommands_A(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.simulator = config.create_default_simulator()
|
||||
|
||||
self.nodes = {}
|
||||
for i in range(1, 7):
|
||||
self.nodes[i] = node.Node(i, (i in MTDS), simulator=self.simulator)
|
||||
|
||||
self.nodes[LEADER].set_panid(0xface)
|
||||
self.nodes[LEADER].set_mode('rsdn')
|
||||
self.nodes[LEADER].add_whitelist(self.nodes[ROUTER1].get_addr64())
|
||||
self.nodes[LEADER].enable_whitelist()
|
||||
|
||||
self.nodes[ROUTER1].set_panid(0xface)
|
||||
self.nodes[ROUTER1].set_mode('rsdn')
|
||||
self.nodes[ROUTER1].add_whitelist(self.nodes[LEADER].get_addr64())
|
||||
self.nodes[ROUTER1].add_whitelist(self.nodes[REED1].get_addr64())
|
||||
self.nodes[ROUTER1].add_whitelist(self.nodes[SED1].get_addr64())
|
||||
self.nodes[ROUTER1].add_whitelist(self.nodes[MED1].get_addr64())
|
||||
self.nodes[ROUTER1].add_whitelist(self.nodes[FED1].get_addr64())
|
||||
self.nodes[ROUTER1].enable_whitelist()
|
||||
self.nodes[ROUTER1].set_router_selection_jitter(1)
|
||||
|
||||
self.nodes[REED1].set_panid(0xface)
|
||||
self.nodes[REED1].set_mode('rsdn')
|
||||
self.nodes[REED1].add_whitelist(self.nodes[ROUTER1].get_addr64())
|
||||
self.nodes[REED1].set_router_upgrade_threshold(0)
|
||||
self.nodes[REED1].enable_whitelist()
|
||||
|
||||
self.nodes[SED1].set_panid(0xface)
|
||||
self.nodes[SED1].set_mode('s')
|
||||
self.nodes[SED1].add_whitelist(self.nodes[ROUTER1].get_addr64())
|
||||
self.nodes[SED1].enable_whitelist()
|
||||
self.nodes[SED1].set_timeout(config.DEFAULT_CHILD_TIMEOUT)
|
||||
|
||||
self.nodes[MED1].set_panid(0xface)
|
||||
self.nodes[MED1].set_mode('rsn')
|
||||
self.nodes[MED1].add_whitelist(self.nodes[ROUTER1].get_addr64())
|
||||
self.nodes[MED1].enable_whitelist()
|
||||
|
||||
self.nodes[FED1].set_panid(0xface)
|
||||
self.nodes[FED1].set_mode('rsdn')
|
||||
self.nodes[FED1].add_whitelist(self.nodes[ROUTER1].get_addr64())
|
||||
self.nodes[FED1].set_router_upgrade_threshold(0)
|
||||
self.nodes[FED1].enable_whitelist()
|
||||
|
||||
def tearDown(self):
|
||||
for n in list(self.nodes.values()):
|
||||
n.stop()
|
||||
n.destroy()
|
||||
self.simulator.stop()
|
||||
|
||||
def test(self):
|
||||
# 1 - Form topology
|
||||
self.nodes[LEADER].start()
|
||||
self.simulator.go(5)
|
||||
self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
|
||||
|
||||
self.nodes[ROUTER1].start()
|
||||
self.simulator.go(5)
|
||||
self.assertEqual(self.nodes[ROUTER1].get_state(), 'router')
|
||||
|
||||
self.nodes[REED1].start()
|
||||
self.simulator.go(5)
|
||||
self.assertEqual(self.nodes[REED1].get_state(), 'child')
|
||||
|
||||
self.nodes[SED1].start()
|
||||
self.simulator.go(10)
|
||||
self.assertEqual(self.nodes[SED1].get_state(), 'child')
|
||||
|
||||
self.nodes[MED1].start()
|
||||
self.simulator.go(5)
|
||||
self.assertEqual(self.nodes[MED1].get_state(), 'child')
|
||||
|
||||
self.nodes[FED1].start()
|
||||
self.simulator.go(5)
|
||||
self.assertEqual(self.nodes[FED1].get_state(), 'child')
|
||||
|
||||
dut_rloc = self.nodes[DUT].get_ip6_address(config.ADDRESS_TYPE.RLOC)
|
||||
|
||||
# 2 - Leader sends DIAG_GET.req
|
||||
tlv_types = [
|
||||
TlvType.EXT_ADDRESS, TlvType.ADDRESS16, TlvType.MODE,
|
||||
TlvType.CONNECTIVITY, TlvType.ROUTE64, TlvType.LEADER_DATA,
|
||||
TlvType.NETWORK_DATA, TlvType.IPV6_ADDRESS_LIST,
|
||||
TlvType.CHANNEL_PAGES
|
||||
]
|
||||
self.nodes[LEADER].send_network_diag_get(dut_rloc, tlv_types)
|
||||
self.simulator.go(2)
|
||||
|
||||
dut_messages = self.simulator.get_messages_sent_by(DUT)
|
||||
|
||||
diag_get_rsp = dut_messages.next_coap_message(code='2.04')
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(
|
||||
network_layer.MacExtendedAddress)
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(mle.Address16)
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(mle.Mode)
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(mle.Connectivity)
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(mle.Route64)
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(mle.LeaderData)
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(mle.NetworkData)
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(network_diag.Ipv6AddressList)
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(network_diag.ChannelPages)
|
||||
|
||||
# 3 - Leader sends DIAG_GET.req (MAC Counters TLV type included)
|
||||
self.nodes[LEADER].send_network_diag_get(dut_rloc,
|
||||
[TlvType.MAC_COUNTERS])
|
||||
self.simulator.go(2)
|
||||
|
||||
dut_messages = self.simulator.get_messages_sent_by(DUT)
|
||||
diag_get_rsp = dut_messages.next_coap_message(code='2.04')
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(network_diag.MacCounters)
|
||||
mac_counters = diag_get_rsp.get_coap_message_tlv(
|
||||
network_diag.MacCounters)
|
||||
|
||||
# 4 - Leader sends DIAG_GET.req (Timeout/Polling Period TLV type included)
|
||||
self.nodes[LEADER].send_network_diag_get(dut_rloc,
|
||||
[TlvType.POLLING_PERIOD])
|
||||
self.simulator.go(2)
|
||||
|
||||
dut_messages = self.simulator.get_messages_sent_by(DUT)
|
||||
diag_get_rsp = dut_messages.next_coap_message(code='2.04')
|
||||
diag_get_rsp.assertCoapMessageDoesNotContainTlv(mle.Timeout)
|
||||
|
||||
# 5 - Leader sends DIAG_GET.req (Battery Level and Supply Voltage TLV types included)
|
||||
self.nodes[LEADER].send_network_diag_get(
|
||||
dut_rloc, [TlvType.BATTERY_LEVEL, TlvType.SUPPLY_VOLTAGE])
|
||||
self.simulator.go(2)
|
||||
|
||||
dut_messages = self.simulator.get_messages_sent_by(DUT)
|
||||
diag_get_rsp = dut_messages.next_coap_message(code='2.04')
|
||||
diag_get_rsp.assertCoapMessageContainsOptionalTlv(
|
||||
network_diag.BatteryLevel)
|
||||
diag_get_rsp.assertCoapMessageContainsOptionalTlv(
|
||||
network_diag.SupplyVoltage)
|
||||
|
||||
# 6 - Leader sends DIAG_GET.req (Child Table TLV types included)
|
||||
self.nodes[LEADER].send_network_diag_get(dut_rloc,
|
||||
[TlvType.CHILD_TABLE])
|
||||
self.simulator.go(2)
|
||||
|
||||
dut_messages = self.simulator.get_messages_sent_by(DUT)
|
||||
diag_get_rsp = dut_messages.next_coap_message(code='2.04')
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(network_diag.ChildTable)
|
||||
child_table = diag_get_rsp.get_coap_message_tlv(network_diag.ChildTable)
|
||||
self.assertEqual(len(child_table.children), 4)
|
||||
# TODO(wgtdkp): more validations
|
||||
|
||||
# 7 - Leader sends DIAG_RST.ntf (MAC Counters TLV type included)
|
||||
self.nodes[LEADER].send_network_diag_reset(dut_rloc,
|
||||
[TlvType.MAC_COUNTERS])
|
||||
self.simulator.go(2)
|
||||
|
||||
dut_messages = self.simulator.get_messages_sent_by(DUT)
|
||||
|
||||
# Make sure the response is there.
|
||||
dut_messages.next_coap_message(code='2.04')
|
||||
|
||||
# 8 - Leader Sends DIAG_GET.req (MAC Counters TLV type included)
|
||||
self.nodes[LEADER].send_network_diag_get(dut_rloc,
|
||||
[TlvType.MAC_COUNTERS])
|
||||
self.simulator.go(2)
|
||||
|
||||
dut_messages = self.simulator.get_messages_sent_by(DUT)
|
||||
diag_get_rsp = dut_messages.next_coap_message(code='2.04')
|
||||
diag_get_rsp.assertCoapMessageContainsTlv(network_diag.MacCounters)
|
||||
reset_mac_counters = diag_get_rsp.get_coap_message_tlv(
|
||||
network_diag.MacCounters)
|
||||
|
||||
self.assertEqual(len(mac_counters.counters),
|
||||
len(reset_mac_counters.counters))
|
||||
for old_counter, new_counter in zip(mac_counters.counters,
|
||||
reset_mac_counters.counters):
|
||||
self.assertTrue(new_counter == 0 or new_counter < old_counter)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -78,6 +78,7 @@ EXTRA_DIST = \
|
||||
Cert_5_6_07_NetworkDataRequestREED.py \
|
||||
Cert_5_6_08_ContextManagement.py \
|
||||
Cert_5_6_09_NetworkDataForwarding.py \
|
||||
Cert_5_7_01_CoapDiagCommands_A.py \
|
||||
Cert_5_8_01_KeySynchronization.py \
|
||||
Cert_5_8_02_KeyIncrement.py \
|
||||
Cert_5_8_03_KeyIncrementRollOver.py \
|
||||
@@ -136,6 +137,7 @@ EXTRA_DIST = \
|
||||
mle.py \
|
||||
net_crypto.py \
|
||||
network_data.py \
|
||||
network_diag.py \
|
||||
network_layer.py \
|
||||
node.py \
|
||||
pcap.py \
|
||||
@@ -227,6 +229,7 @@ check_SCRIPTS = \
|
||||
Cert_5_6_07_NetworkDataRequestREED.py \
|
||||
Cert_5_6_08_ContextManagement.py \
|
||||
Cert_5_6_09_NetworkDataForwarding.py \
|
||||
Cert_5_7_01_CoapDiagCommands_A.py \
|
||||
Cert_5_8_01_KeySynchronization.py \
|
||||
Cert_5_8_02_KeyIncrement.py \
|
||||
Cert_5_8_03_KeyIncrementRollOver.py \
|
||||
@@ -295,6 +298,7 @@ XFAIL_NCP_TESTS = \
|
||||
test_reed_address_solicit_rejected.py \
|
||||
test_service.py \
|
||||
Cert_5_3_10_AddressQuery.py \
|
||||
Cert_5_7_01_CoapDiagCommands_A.py \
|
||||
Cert_8_1_01_Commissioning.py \
|
||||
Cert_8_1_02_Commissioning.py \
|
||||
Cert_8_2_01_JoinerRouter.py \
|
||||
|
||||
@@ -39,6 +39,7 @@ import message
|
||||
import mle
|
||||
import net_crypto
|
||||
import network_data
|
||||
import network_diag
|
||||
import network_layer
|
||||
import simulator
|
||||
import sniffer
|
||||
@@ -423,18 +424,66 @@ def create_default_mesh_cop_tlvs_factory():
|
||||
sub_tlvs_factories=create_default_mesh_cop_tlvs_factories())
|
||||
|
||||
|
||||
def create_default_network_diag_tlv_factories():
|
||||
return {
|
||||
network_diag.TlvType.EXT_ADDRESS:
|
||||
network_layer.MacExtendedAddressFactory(),
|
||||
network_diag.TlvType.ADDRESS16:
|
||||
mle.Address16Factory(),
|
||||
network_diag.TlvType.MODE:
|
||||
mle.ModeFactory(),
|
||||
network_diag.TlvType.POLLING_PERIOD:
|
||||
mle.TimeoutFactory(),
|
||||
network_diag.TlvType.CONNECTIVITY:
|
||||
mle.ConnectivityFactory(),
|
||||
network_diag.TlvType.ROUTE64:
|
||||
create_default_mle_tlv_route64_factory(),
|
||||
network_diag.TlvType.LEADER_DATA:
|
||||
mle.LeaderDataFactory(),
|
||||
network_diag.TlvType.NETWORK_DATA:
|
||||
create_default_mle_tlv_network_data_factory(),
|
||||
network_diag.TlvType.IPV6_ADDRESS_LIST:
|
||||
network_diag.Ipv6AddressListFactory(),
|
||||
network_diag.TlvType.MAC_COUNTERS:
|
||||
network_diag.MacCountersFactory(),
|
||||
network_diag.TlvType.BATTERY_LEVEL:
|
||||
network_diag.BatteryLevelFactory(),
|
||||
network_diag.TlvType.SUPPLY_VOLTAGE:
|
||||
network_diag.SupplyVoltageFactory(),
|
||||
network_diag.TlvType.CHILD_TABLE:
|
||||
network_diag.ChildTableFactory(),
|
||||
network_diag.TlvType.CHANNEL_PAGES:
|
||||
network_diag.ChannelPagesFactory(),
|
||||
network_diag.TlvType.TYPE_LIST:
|
||||
network_diag.TypeListFactory(),
|
||||
network_diag.TlvType.MAX_CHILD_TIMEOUT:
|
||||
network_diag.MaxChildTimeoutFactory()
|
||||
}
|
||||
|
||||
|
||||
def create_default_network_diag_tlvs_factory():
|
||||
return SubTlvsFactory(
|
||||
sub_tlvs_factories=create_default_network_diag_tlv_factories())
|
||||
|
||||
|
||||
def create_default_uri_path_based_payload_factories():
|
||||
network_layer_tlvs_factory = create_default_network_tlvs_factory()
|
||||
mesh_cop_tlvs_factory = create_default_mesh_cop_tlvs_factory()
|
||||
network_diag_tlvs_factory = create_default_network_diag_tlvs_factory()
|
||||
|
||||
return {
|
||||
"/a/as": network_layer_tlvs_factory,
|
||||
"/a/aq": network_layer_tlvs_factory,
|
||||
"/a/ar": network_layer_tlvs_factory,
|
||||
"/a/ae": network_layer_tlvs_factory,
|
||||
"/a/an": network_layer_tlvs_factory,
|
||||
"/a/sd": network_layer_tlvs_factory,
|
||||
"/c/lp": mesh_cop_tlvs_factory,
|
||||
"/c/cs": mesh_cop_tlvs_factory,
|
||||
'/a/as': network_layer_tlvs_factory,
|
||||
'/a/aq': network_layer_tlvs_factory,
|
||||
'/a/ar': network_layer_tlvs_factory,
|
||||
'/a/ae': network_layer_tlvs_factory,
|
||||
'/a/an': network_layer_tlvs_factory,
|
||||
'/a/sd': network_layer_tlvs_factory,
|
||||
'/c/lp': mesh_cop_tlvs_factory,
|
||||
'/c/cs': mesh_cop_tlvs_factory,
|
||||
'/d/da': network_diag_tlvs_factory,
|
||||
'/d/dg': network_diag_tlvs_factory,
|
||||
'/d/dq': network_diag_tlvs_factory,
|
||||
'/d/dr': network_diag_tlvs_factory,
|
||||
}
|
||||
|
||||
|
||||
@@ -483,7 +532,7 @@ def create_default_ipv6_icmp_body_factories():
|
||||
ipv6.ICMPv6EchoBodyFactory(),
|
||||
ipv6.ICMP_ECHO_RESPONSE:
|
||||
ipv6.ICMPv6EchoBodyFactory(),
|
||||
"default":
|
||||
'default':
|
||||
ipv6.BytesPayloadFactory(),
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,336 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Copyright (c) 2020, The OpenThread Authors.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
# 1. Redistributions of source code must retain the above copyright
|
||||
# notice, this list of conditions and the following disclaimer.
|
||||
# 2. Redistributions in binary form must reproduce the above copyright
|
||||
# notice, this list of conditions and the following disclaimer in the
|
||||
# documentation and/or other materials provided with the distribution.
|
||||
# 3. Neither the name of the copyright holder nor the
|
||||
# names of its contributors may be used to endorse or promote products
|
||||
# derived from this software without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
||||
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
# POSSIBILITY OF SUCH DAMAGE.
|
||||
#
|
||||
|
||||
import struct
|
||||
|
||||
from enum import IntEnum
|
||||
from typing import List
|
||||
|
||||
import common
|
||||
import ipaddress
|
||||
import mle
|
||||
|
||||
|
||||
class TlvType(IntEnum):
|
||||
EXT_ADDRESS = 0
|
||||
ADDRESS16 = 1
|
||||
MODE = 2
|
||||
POLLING_PERIOD = 3
|
||||
CONNECTIVITY = 4
|
||||
ROUTE64 = 5
|
||||
LEADER_DATA = 6
|
||||
NETWORK_DATA = 7
|
||||
IPV6_ADDRESS_LIST = 8
|
||||
MAC_COUNTERS = 9
|
||||
BATTERY_LEVEL = 14
|
||||
SUPPLY_VOLTAGE = 15
|
||||
CHILD_TABLE = 16
|
||||
CHANNEL_PAGES = 17
|
||||
TYPE_LIST = 18
|
||||
MAX_CHILD_TIMEOUT = 19
|
||||
|
||||
|
||||
class Ipv6AddressList:
|
||||
|
||||
def __init__(self, addresses: List[ipaddress.IPv6Address]):
|
||||
self._addresses = addresses
|
||||
|
||||
@property
|
||||
def addresses(self):
|
||||
return self._addresses
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
return self.addresses == other.addresses
|
||||
|
||||
def __repr__(self):
|
||||
return f'Ipv6AddressList({self.addresses})'
|
||||
|
||||
|
||||
class Ipv6AddressListFactory:
|
||||
|
||||
def parse(self, data, message_info):
|
||||
addresses = []
|
||||
while data.tell() < message_info.length:
|
||||
addresses.append(ipaddress.IPv6Address(data.read(16)))
|
||||
return Ipv6AddressList(addresses)
|
||||
|
||||
|
||||
class MacCounters:
|
||||
|
||||
def __init__(self, counters: List[int]):
|
||||
self._counters = counters
|
||||
|
||||
@property
|
||||
def if_in_unknown_protos(self):
|
||||
return self._counters[0]
|
||||
|
||||
@property
|
||||
def if_in_errors(self):
|
||||
return self._counters[1]
|
||||
|
||||
@property
|
||||
def if_out_errors(self):
|
||||
return self._counters[2]
|
||||
|
||||
@property
|
||||
def if_in_ucast_pkts(self):
|
||||
return self._counters[3]
|
||||
|
||||
@property
|
||||
def if_in_broadcast_pkts(self):
|
||||
return self._counters[4]
|
||||
|
||||
@property
|
||||
def if_in_discards(self):
|
||||
return self._counters[5]
|
||||
|
||||
@property
|
||||
def if_out_ucast_pkts(self):
|
||||
return self._counters[6]
|
||||
|
||||
@property
|
||||
def if_out_broadcast_pkts(self):
|
||||
return self._counters[7]
|
||||
|
||||
@property
|
||||
def if_out_discards(self):
|
||||
return self._counters[8]
|
||||
|
||||
@property
|
||||
def counters(self):
|
||||
return self._counters
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
|
||||
return self.counters == other.counters
|
||||
|
||||
def __repr__(self):
|
||||
return ('MacCounters(' +
|
||||
f'if_in_unknown_protos={self.if_in_unknown_protos}, ' +
|
||||
f'if_in_errors={self.if_in_errors}, ' +
|
||||
f'if_out_errors={self.if_out_errors}, ' +
|
||||
f'if_in_ucast_pkts={self.if_in_ucast_pkts}, ' +
|
||||
f'if_in_broadcast_pkts={self.if_in_broadcast_pkts}, ' +
|
||||
f'if_in_discards={self.if_in_discards}, ' +
|
||||
f'if_out_ucast_pkts={self.if_out_ucast_pkts}, ' +
|
||||
f'if_out_broadcast_pkts={self.if_out_broadcast_pkts}, ' +
|
||||
f'if_out_discards={self.if_out_discards})')
|
||||
|
||||
|
||||
class MacCountersFactory:
|
||||
|
||||
def parse(self, data, message_info):
|
||||
return MacCounters(struct.unpack('>9I', data.read(4 * 9)))
|
||||
|
||||
|
||||
class BatteryLevel:
|
||||
|
||||
def __init__(self, battery_level: int):
|
||||
self._battery_level = battery_level
|
||||
|
||||
@property
|
||||
def battery_level(self):
|
||||
return self._battery_level
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
|
||||
return self.battery_level == other.battery_level
|
||||
|
||||
def __repr__(self):
|
||||
return f'BatteryLevel(battery_level={self.battery_level})'
|
||||
|
||||
|
||||
class BatteryLevelFactory:
|
||||
|
||||
def parse(self, data, message_info):
|
||||
return BatteryLevel(struct.unpack('>B', data.read(1))[0])
|
||||
|
||||
|
||||
class SupplyVoltage:
|
||||
|
||||
def __init__(self, supply_voltage: int):
|
||||
self._supply_voltage = supply_voltage
|
||||
|
||||
@property
|
||||
def supply_voltage(self):
|
||||
return self._supply_voltage
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
|
||||
return self.supply_voltage == other.supply_voltage
|
||||
|
||||
def __repr__(self):
|
||||
return f'SupplyVoltage(supply_voltage={self.supply_voltage})'
|
||||
|
||||
|
||||
class SupplyVoltageFactory:
|
||||
|
||||
def parse(self, data, message_info):
|
||||
return SupplyVoltage(struct.unpack('>H', data.read(2))[0])
|
||||
|
||||
|
||||
class ChildTableEntry:
|
||||
|
||||
def __init__(self, timeout: int, child_id: int, mode: mle.Mode):
|
||||
self._timeout = timeout
|
||||
self._child_id = child_id
|
||||
self._mode = mode
|
||||
|
||||
@property
|
||||
def timeout(self):
|
||||
return self._timeout
|
||||
|
||||
@property
|
||||
def child_id(self):
|
||||
return self._child_id
|
||||
|
||||
@property
|
||||
def mode(self):
|
||||
return self._mode
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
|
||||
return (self.timeout == other.timeout and
|
||||
self.child_id == other.child_id and self.mode == other.mode)
|
||||
|
||||
def __repr__(self):
|
||||
return f'ChildTableEntry(timeout={self.timeout}, child_id={self.child_id}, mode={self.mode})'
|
||||
|
||||
|
||||
class ChildTable:
|
||||
|
||||
def __init__(self, children: List[ChildTableEntry]):
|
||||
self._children = sorted(children, key=lambda child: child.child_id)
|
||||
|
||||
@property
|
||||
def children(self):
|
||||
return self._children
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
|
||||
return self.children == other.children
|
||||
|
||||
def __repr__(self):
|
||||
return f'ChildTable({self.children})'
|
||||
|
||||
|
||||
class ChildTableFactory:
|
||||
|
||||
def parse(self, data, message_info):
|
||||
children = []
|
||||
while message_info.length > 0:
|
||||
timeout_and_id = struct.unpack('>H', data.read(2))[0]
|
||||
message_info.length -= 2
|
||||
|
||||
timeout = (timeout_and_id & 0xf800) >> 11
|
||||
child_id = timeout_and_id & 0x1fff
|
||||
|
||||
mode = mle.ModeFactory().parse(data, message_info)
|
||||
message_info.length -= 1
|
||||
|
||||
children.append(ChildTableEntry(timeout, child_id, mode))
|
||||
return ChildTable(children)
|
||||
|
||||
|
||||
class ChannelPages:
|
||||
|
||||
def __init__(self, channel_pages: bytes):
|
||||
self._channel_pages = channel_pages
|
||||
|
||||
@property
|
||||
def channel_pages(self):
|
||||
return self._channel_pages
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
|
||||
return self.channel_pages == other.channel_pages
|
||||
|
||||
def __repr__(self):
|
||||
return f'ChannelPages(channel_pages={self.channel_pages})'
|
||||
|
||||
|
||||
class ChannelPagesFactory:
|
||||
|
||||
def parse(self, data, message_info):
|
||||
return ChannelPages(data.getvalue())
|
||||
|
||||
|
||||
class TypeList:
|
||||
|
||||
def __init__(self, tlv_types: List[int]):
|
||||
self._tlv_types = tlv_types
|
||||
|
||||
@property
|
||||
def tlv_types(self):
|
||||
return self._tlv_types
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
|
||||
return self.tlv_types == other.tlv_types
|
||||
|
||||
def __repr__(self):
|
||||
return f'TypeList(tlv_types={self.tlv_types})'
|
||||
|
||||
|
||||
class TypeListFactory:
|
||||
|
||||
def parse(self, data, message_info):
|
||||
return TypeList([ord(t) for t in data.getvalue()])
|
||||
|
||||
|
||||
class MaxChildTimeout:
|
||||
|
||||
def __init__(self, max_child_timeout: int):
|
||||
self._max_child_timeout = max_child_timeout
|
||||
|
||||
@property
|
||||
def max_child_timeout(self):
|
||||
return self._max_child_timeout
|
||||
|
||||
def __eq__(self, other):
|
||||
common.expect_the_same_class(self, other)
|
||||
|
||||
return self.max_child_timeout == other.max_child_timeout
|
||||
|
||||
def __repr__(self):
|
||||
return f'MaxChildTimeout(max_child_timeout={self.max_child_timeout})'
|
||||
|
||||
|
||||
class MaxChildTimeoutFactory:
|
||||
|
||||
def parse(self, data, message_info):
|
||||
return MaxChildTimeout(struct.unpack('>I', data.read(4))[0])
|
||||
@@ -901,6 +901,30 @@ class Node:
|
||||
self.send_command('netdataregister')
|
||||
self._expect('Done')
|
||||
|
||||
def send_network_diag_get(self, addr, tlv_types):
|
||||
self.send_command('networkdiagnostic get %s %s' %
|
||||
(addr, ' '.join([str(t.value) for t in tlv_types])))
|
||||
|
||||
if isinstance(self.simulator, simulator.VirtualTime):
|
||||
self.simulator.go(8)
|
||||
timeout = 1
|
||||
else:
|
||||
timeout = 8
|
||||
|
||||
self._expect('Done', timeout=timeout)
|
||||
|
||||
def send_network_diag_reset(self, addr, tlv_types):
|
||||
self.send_command('networkdiagnostic reset %s %s' %
|
||||
(addr, ' '.join([str(t.value) for t in tlv_types])))
|
||||
|
||||
if isinstance(self.simulator, simulator.VirtualTime):
|
||||
self.simulator.go(8)
|
||||
timeout = 1
|
||||
else:
|
||||
timeout = 8
|
||||
|
||||
self._expect('Done', timeout=timeout)
|
||||
|
||||
def energy_scan(self, mask, count, period, scan_duration, ipaddr):
|
||||
cmd = 'commissioner energy %d %d %d %d %s' % (
|
||||
mask,
|
||||
|
||||
Reference in New Issue
Block a user