[channel-manager] add local csl channel selection on SSED (#9641)

This commit enables channel manager on SSED, together with channel monitor,
auto-selecting a better CSL channel for the link between child and its parent.

It also fixes tracking of CcaSuccessRate on CslChannel and adds toranj tests
for auto-channel selection and thread_cert test for autocsl-channel selection.
This commit is contained in:
Martin Zimmermann
2024-03-25 22:06:29 +01:00
committed by GitHub
parent 09aa9630aa
commit 4db6520d17
17 changed files with 818 additions and 76 deletions
+88 -1
View File
@@ -807,6 +807,18 @@ class NodeImpl:
results = [line for line in output if self._match_pattern(line, pattern)]
return results
def _expect_key_value_pairs(self, pattern, separator=': '):
"""Expect 'key: value' in multiple lines.
Returns:
Dictionary of the key:value pairs.
"""
result = {}
for line in self._expect_results(pattern):
key, val = line.split(separator)
result.update({key: val})
return result
@staticmethod
def _match_pattern(line, pattern):
if isinstance(pattern, str):
@@ -1799,7 +1811,7 @@ class NodeImpl:
def get_csl_info(self):
self.send_command('csl')
self._expect_done()
return self._expect_key_value_pairs(r'\S+')
def set_csl_channel(self, csl_channel):
self.send_command('csl channel %d' % csl_channel)
@@ -3598,6 +3610,81 @@ class NodeImpl:
line = self._expect_command_output()[0]
return [int(item) for item in line.split()]
def get_channel_monitor_info(self) -> Dict:
"""
Returns:
Dict of channel monitor info, e.g.
{'enabled': '1',
'interval': '41000',
'threshold': '-75',
'window': '960',
'count': '985',
'occupancies': {
'11': '0.00%',
'12': '3.50%',
'13': '9.89%',
'14': '15.36%',
'15': '20.02%',
'16': '21.95%',
'17': '32.71%',
'18': '35.76%',
'19': '37.97%',
'20': '43.68%',
'21': '48.95%',
'22': '54.05%',
'23': '58.65%',
'24': '68.26%',
'25': '66.73%',
'26': '73.12%'
}
}
"""
config = {}
self.send_command('channel monitor')
for line in self._expect_results(r'\S+'):
if re.match(r'.*:\s.*', line):
key, val = line.split(':')
config.update({key: val.strip()})
elif re.match(r'.*:', line): # occupancy
occ_key, val = line.split(':')
val = {}
config.update({occ_key: val})
elif 'busy' in line:
# channel occupancies
key = line.split()[1]
val = line.split()[3]
config[occ_key].update({key: val})
return config
def set_channel_manager_auto_enable(self, enable: bool):
self.send_command(f'channel manager auto {int(enable)}')
self._expect_done()
def set_channel_manager_autocsl_enable(self, enable: bool):
self.send_command(f'channel manager autocsl {int(enable)}')
self._expect_done()
def set_channel_manager_supported(self, channel_mask: int):
self.send_command(f'channel manager supported {int(channel_mask)}')
self._expect_done()
def set_channel_manager_favored(self, channel_mask: int):
self.send_command(f'channel manager favored {int(channel_mask)}')
self._expect_done()
def set_channel_manager_interval(self, interval: int):
self.send_command(f'channel manager interval {interval}')
self._expect_done()
def set_channel_manager_cca_threshold(self, hex_value: str):
self.send_command(f'channel manager threshold {hex_value}')
self._expect_done()
def get_channel_manager_config(self):
self.send_command('channel manager')
return self._expect_key_value_pairs(r'\S+')
class Node(NodeImpl, OtCli):
pass