mirror of
https://github.com/espressif/openthread.git
synced 2026-06-06 05:24:51 +00:00
[thci] find border agents with non link-local addresses (#7853)
Thread BR may advertise IPv6 GUA rather than LLA. For example, Avahi stops publishing LLA when any GUA is found on the network interface. This commit enhances mdns_query to find border agents with non-link-local addresses.
This commit is contained in:
@@ -40,7 +40,7 @@ def on_service_state_change(zeroconf, service_type, name, state_change):
|
||||
class BorderAgent(object):
|
||||
alias = None
|
||||
server_name = None
|
||||
link_local_addr = None
|
||||
addr = None
|
||||
port = None
|
||||
thread_status = None
|
||||
|
||||
@@ -48,19 +48,30 @@ class BorderAgent(object):
|
||||
self.alias = alias
|
||||
|
||||
def __repr__(self):
|
||||
return str([self.alias, self.link_local_addr, self.port, self.thread_status])
|
||||
return str([self.alias, self.addr, self.port, self.thread_status])
|
||||
|
||||
|
||||
def get_ipaddr_priority(addr: ipaddress.IPv6Address):
|
||||
# calculate the priority of IPv6 addresses in order: Global > non Global > Link local
|
||||
if addr.is_link_local:
|
||||
return 0
|
||||
|
||||
if not addr.is_global:
|
||||
return 1
|
||||
|
||||
return 2
|
||||
|
||||
|
||||
def parse_cache(cache):
|
||||
border_agents = []
|
||||
|
||||
# Find all border routers
|
||||
for ptr in cache['_meshcop._udp.local.']:
|
||||
for ptr in cache.get('_meshcop._udp.local.', []):
|
||||
border_agents.append(BorderAgent(ptr.alias))
|
||||
|
||||
# Find server name, port and Thread Interface status for each border router
|
||||
for ba in border_agents:
|
||||
for record in cache[ba.alias.lower()]:
|
||||
for record in cache.get(ba.alias.lower(), []):
|
||||
if isinstance(record, DNSService):
|
||||
ba.server_name = record.server
|
||||
ba.port = record.port
|
||||
@@ -69,14 +80,16 @@ def parse_cache(cache):
|
||||
sb = text.split(b'sb=')[1][0:4]
|
||||
ba.thread_status = (sb[3] & 0x18) >> 3
|
||||
|
||||
# Find link local address for each border router
|
||||
# Find IPv6 address for each border router
|
||||
for ba in border_agents:
|
||||
for record in cache[ba.server_name.lower()]:
|
||||
for record in cache.get(ba.server_name.lower(), []):
|
||||
if isinstance(record, DNSAddress):
|
||||
addr = ipaddress.ip_address(record.address)
|
||||
if isinstance(addr, ipaddress.IPv6Address) and addr.is_link_local:
|
||||
ba.link_local_addr = str(addr)
|
||||
break
|
||||
if not isinstance(addr, ipaddress.IPv6Address) or addr.is_multicast or addr.is_loopback:
|
||||
continue
|
||||
|
||||
if not ba.addr or get_ipaddr_priority(addr) > get_ipaddr_priority(ipaddress.IPv6Address(ba.addr)):
|
||||
ba.addr = str(addr)
|
||||
|
||||
return border_agents
|
||||
|
||||
|
||||
@@ -643,10 +643,12 @@ class OpenThread_BR(OpenThreadTHCI, IThci):
|
||||
output = self.bash(cmd)
|
||||
for line in output:
|
||||
print(line)
|
||||
alias, link_local_addr, port, thread_status = eval(line)
|
||||
if thread_status == 2 and link_local_addr:
|
||||
if (dst and link_local_addr in dst) or (link_local_addr not in addrs_blacklist):
|
||||
return '%s%%%s' % (link_local_addr, self.backboneNetif), port
|
||||
alias, addr, port, thread_status = eval(line)
|
||||
if thread_status == 2 and addr:
|
||||
if (dst and addr in dst) or (addr not in addrs_blacklist):
|
||||
if ipaddress.IPv6Address(addr.decode()).is_link_local:
|
||||
addr = '%s%%%s' % (addr, self.backboneNetif)
|
||||
return addr, port
|
||||
|
||||
raise Exception('No active Border Agents found')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user