diff --git a/.github/workflows/simulation-1.1.yml b/.github/workflows/simulation-1.1.yml index 9ec81eaec..2d8504fd1 100644 --- a/.github/workflows/simulation-1.1.yml +++ b/.github/workflows/simulation-1.1.yml @@ -52,6 +52,7 @@ jobs: REFERENCE_DEVICE: 1 THREAD_VERSION: 1.1 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 MULTIPLY: 3 steps: - name: Harden Runner @@ -106,6 +107,7 @@ jobs: REFERENCE_DEVICE: 1 THREAD_VERSION: 1.1 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 steps: - name: Harden Runner uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 @@ -160,6 +162,7 @@ jobs: THREAD_VERSION: 1.1 USE_MTD: 1 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 MESSAGE_USE_HEAP: ${{ matrix.message_use_heap }} steps: - name: Harden Runner @@ -209,6 +212,7 @@ jobs: REFERENCE_DEVICE: 1 THREAD_VERSION: 1.1 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 steps: - name: Harden Runner uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 @@ -355,6 +359,7 @@ jobs: COVERAGE: 1 THREAD_VERSION: 1.1 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 CXXFLAGS: "-DOPENTHREAD_CONFIG_LOG_PREPEND_UPTIME=0" steps: - name: Harden Runner diff --git a/.github/workflows/simulation-1.4.yml b/.github/workflows/simulation-1.4.yml index a8dc55aea..cdfd80765 100644 --- a/.github/workflows/simulation-1.4.yml +++ b/.github/workflows/simulation-1.4.yml @@ -55,6 +55,7 @@ jobs: COVERAGE: 1 THREAD_VERSION: 1.4 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 INTER_OP: 1 INTER_OP_BBR: 1 CC: ${{ matrix.compiler.c }} @@ -124,6 +125,7 @@ jobs: env: REFERENCE_DEVICE: 1 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 COVERAGE: 1 PACKET_VERIFICATION: 1 THREAD_VERSION: 1.4 @@ -201,6 +203,7 @@ jobs: env: REFERENCE_DEVICE: 1 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 PACKET_VERIFICATION: 1 THREAD_VERSION: 1.4 INTER_OP_BBR: 1 @@ -257,6 +260,7 @@ jobs: COVERAGE: 1 THREAD_VERSION: 1.4 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 1 steps: - name: Harden Runner uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 @@ -356,6 +360,7 @@ jobs: OT_NODE_TYPE: rcp USE_MTD: 1 VIRTUAL_TIME: 1 + OT_VT_USE_UNIX_SOCKET: 0 INTER_OP: 1 steps: - name: Harden Runner diff --git a/examples/platforms/simulation/virtual_time/platform-sim.c b/examples/platforms/simulation/virtual_time/platform-sim.c index ba475f077..192552684 100644 --- a/examples/platforms/simulation/virtual_time/platform-sim.c +++ b/examples/platforms/simulation/virtual_time/platform-sim.c @@ -44,6 +44,7 @@ #include #include #include +#include #include #include @@ -57,6 +58,8 @@ uint32_t gNodeId = 1; extern bool gPlatformPseudoResetWasRequested; static volatile bool gTerminate = false; +static bool sUseUnixSocket = false; + int gArgumentsCount = 0; char **gArguments = NULL; @@ -74,20 +77,28 @@ static void handleSignal(int aSignal) void otSimSendEvent(const struct Event *aEvent) { - ssize_t rval; - struct sockaddr_in sockaddr; + ssize_t rval; - memset(&sockaddr, 0, sizeof(sockaddr)); - sockaddr.sin_family = AF_INET; - inet_pton(AF_INET, "127.0.0.1", &sockaddr.sin_addr); - sockaddr.sin_port = htons(sPortBase + sPortOffset); + if (sUseUnixSocket) + { + rval = send(sSockFd, aEvent, offsetof(struct Event, mData) + aEvent->mDataLength, 0); + } + else + { + struct sockaddr_in sockaddr; - rval = sendto(sSockFd, aEvent, offsetof(struct Event, mData) + aEvent->mDataLength, 0, (struct sockaddr *)&sockaddr, - sizeof(sockaddr)); + memset(&sockaddr, 0, sizeof(sockaddr)); + sockaddr.sin_family = AF_INET; + inet_pton(AF_INET, "127.0.0.1", &sockaddr.sin_addr); + sockaddr.sin_port = htons(sPortBase + sPortOffset); + + rval = sendto(sSockFd, aEvent, offsetof(struct Event, mData) + aEvent->mDataLength, 0, + (struct sockaddr *)&sockaddr, sizeof(sockaddr)); + } if (rval < 0) { - perror("sendto"); + perror("Send simulation event"); DieNow(OT_EXIT_ERROR_ERRNO); } } @@ -95,11 +106,12 @@ void otSimSendEvent(const struct Event *aEvent) static void receiveEvent(otInstance *aInstance) { struct Event event; - ssize_t rval = recvfrom(sSockFd, (char *)&event, sizeof(event), 0, NULL, NULL); + ssize_t rval = sUseUnixSocket ? recv(sSockFd, (char *)&event, sizeof(event), 0) + : recvfrom(sSockFd, (char *)&event, sizeof(event), 0, NULL, NULL); if (rval < 0 || (uint16_t)rval < offsetof(struct Event, mData)) { - perror("recvfrom"); + perror("Receive simulation event"); DieNow(OT_EXIT_ERROR_ERRNO); } @@ -166,19 +178,18 @@ otError otPlatUartFlush(void) { return OT_ERROR_NONE; } static void socket_init(void) { - struct sockaddr_in sockaddr; - memset(&sockaddr, 0, sizeof(sockaddr)); - sockaddr.sin_family = AF_INET; - + { + char *env = getenv("OT_VT_USE_UNIX_SOCKET"); + if (env != NULL && !strcmp(env, "1")) + { + sUseUnixSocket = true; + } + } parseFromEnvAsUint16("PORT_BASE", &sPortBase); - parseFromEnvAsUint16("PORT_OFFSET", &sPortOffset); sPortOffset *= (MAX_NETWORK_SIZE + 1); - sockaddr.sin_port = htons((uint16_t)(sPortBase + sPortOffset + gNodeId)); - sockaddr.sin_addr.s_addr = INADDR_ANY; - - sSockFd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + sSockFd = sUseUnixSocket ? socket(AF_UNIX, SOCK_SEQPACKET, 0) : socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (sSockFd == -1) { @@ -186,10 +197,50 @@ static void socket_init(void) DieNow(OT_EXIT_ERROR_ERRNO); } - if (bind(sSockFd, (struct sockaddr *)&sockaddr, sizeof(sockaddr)) == -1) + if (sUseUnixSocket) { - perror("bind"); - DieNow(OT_EXIT_ERROR_ERRNO); + uint16_t port = sPortBase + sPortOffset + gNodeId; + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + sprintf(addr.sun_path, "vt.%u.sock", port); + + if (unlink(addr.sun_path) == -1 && errno != ENOENT) + { + perror("unlink"); + DieNow(OT_EXIT_ERROR_ERRNO); + } + if (bind(sSockFd, (struct sockaddr *)&addr, sizeof(addr)) == -1) + { + perror("bind"); + DieNow(OT_EXIT_ERROR_ERRNO); + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + sprintf(addr.sun_path, "vt.%u.sock", sPortBase + sPortOffset); + + if (connect(sSockFd, (struct sockaddr *)&addr, sizeof(addr)) == -1) + { + perror("connect"); + DieNow(OT_EXIT_ERROR_ERRNO); + } + } + else + { + struct sockaddr_in sockaddr; + memset(&sockaddr, 0, sizeof(sockaddr)); + sockaddr.sin_family = AF_INET; + + sockaddr.sin_port = htons((uint16_t)(sPortBase + sPortOffset + gNodeId)); + sockaddr.sin_addr.s_addr = INADDR_ANY; + + if (bind(sSockFd, (struct sockaddr *)&sockaddr, sizeof(sockaddr)) == -1) + { + perror("bind"); + DieNow(OT_EXIT_ERROR_ERRNO); + } } } diff --git a/script/test b/script/test index 9ee6862c6..f5de3b3a1 100755 --- a/script/test +++ b/script/test @@ -89,6 +89,9 @@ readonly TREL LOCAL_OTBR_DIR=${LOCAL_OTBR_DIR:-""} readonly LOCAL_OTBR_DIR +: "${OT_VT_USE_UNIX_SOCKET:=0}" +export OT_VT_USE_UNIX_SOCKET + build_simulation() { local version="$1" diff --git a/tests/scripts/thread-cert/node.py b/tests/scripts/thread-cert/node.py index e2a2049aa..66bfdd359 100755 --- a/tests/scripts/thread-cert/node.py +++ b/tests/scripts/thread-cert/node.py @@ -922,11 +922,11 @@ class NodeImpl: assert len(payload) == payload_len return (direction, type, payload) - def send_command(self, cmd, go=True, expect_command_echo=True): + def send_command(self, cmd, go=True, expect_command_echo=True, maybeoff=False): print("%d: %s" % (self.nodeid, cmd)) self.pexpect.send(cmd + '\n') if go: - self.simulator.go(0, nodeid=self.nodeid) + self.simulator.go(0, nodeid=self.nodeid, maybeoff=maybeoff) sys.stdout.flush() if expect_command_echo: @@ -2659,7 +2659,7 @@ class NodeImpl: self._reset('factoryreset') def _reset(self, cmd): - self.send_command(cmd, expect_command_echo=False) + self.send_command(cmd, expect_command_echo=False, maybeoff=True) time.sleep(self.RESET_DELAY) # Send a "version" command and drain the CLI output after reset self.send_command('version', expect_command_echo=False) diff --git a/tests/scripts/thread-cert/simulator.py b/tests/scripts/thread-cert/simulator.py index 250501077..a8811240d 100755 --- a/tests/scripts/thread-cert/simulator.py +++ b/tests/scripts/thread-cert/simulator.py @@ -31,9 +31,11 @@ import binascii import bisect import os import socket +import selectors import struct import traceback import time +import types import io import config @@ -69,7 +71,7 @@ class BaseSimulator(object): def get_messages_sent_by(self, nodeid): raise NotImplementedError - def go(self, duration, nodeid=None): + def go(self, duration, **kwargs): raise NotImplementedError def stop(self): @@ -108,7 +110,7 @@ class RealTime(BaseSimulator): def now(self): return time.time() - def go(self, duration, nodeid=None): + def go(self, duration, **kwargs): time.sleep(duration) def stop(self): @@ -146,17 +148,37 @@ class VirtualTime(BaseSimulator): NCP_SIM = os.getenv('NODE_TYPE', 'sim') == 'ncp-sim' + USE_UNIX_SOCKET = os.getenv('OT_VT_USE_UNIX_SOCKET', '0') == '1' + _message_factory = None def __init__(self, use_message_factory=True): - super(VirtualTime, self).__init__() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2 * 1024 * 1024) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2 * 1024 * 1024) + super().__init__() - ip = '127.0.0.1' self.port = self.BASE_PORT + (self.PORT_OFFSET * (self.MAX_NODES + 1)) - self.sock.bind((ip, self.port)) + + if self.USE_UNIX_SOCKET: + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + addr = f'vt.{self.port}.sock' + try: + os.unlink(addr) + except OSError: + if os.path.exists(addr): + raise + + self.sock.bind(addr) + self.sock.listen() + self.sock.setblocking(False) + + else: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2 * 1024 * 1024) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2 * 1024 * 1024) + ip = '127.0.0.1' + self.sock.bind((ip, self.port)) + + self.sel = selectors.DefaultSelector() + self.sel.register(self.sock, selectors.EVENT_READ, data=None) self.devices = {} self.event_queue = [] @@ -165,12 +187,13 @@ class VirtualTime(BaseSimulator): self.current_time = 0 self.current_event = None self.awake_devices = set() + self._maybeoff_ports = () self._nodes_by_ack_seq = {} self._node_ack_seq = {} self._pcap = pcap.PcapCodec(os.getenv('TEST_NAME', 'current')) # the addr for spinel-cli sending OT_SIM_EVENT_POSTCMD - self._spinel_cli_addr = (ip, self.BASE_PORT + self.port) + self._spinel_cli_addr = f'vt{self.BASE_PORT + self.port}.sock' self.current_nodeid = None self._pause_time = 0 @@ -192,14 +215,12 @@ class VirtualTime(BaseSimulator): def is_running(self): return self.sock is not None - def _add_message(self, nodeid, message_obj): - addr = ('127.0.0.1', self.port + nodeid) - + def _add_message(self, port, message_obj): # Ignore any exceptions try: if self._message_factory is not None: messages = self._message_factory.create(io.BytesIO(message_obj)) - self.devices[addr]['msgs'] += messages + self.devices[port]['msgs'] += messages except message.DropPacketException: print('Drop current packet because it cannot be handled in test scripts') @@ -224,31 +245,34 @@ class VirtualTime(BaseSimulator): Returns: MessagesSet: a set with received messages. """ - addr = ('127.0.0.1', self.port + nodeid) + port = self.port + nodeid - messages = self.devices[addr]['msgs'] - self.devices[addr]['msgs'] = [] + messages = self.devices[port]['msgs'] + self.devices[port]['msgs'] = [] ret = message.MessagesSet(messages, self.commissioning_messages[nodeid]) self.commissioning_messages[nodeid] = [] return ret - def _is_radio(self, addr): - return addr[1] < self.BASE_PORT * 2 + def _is_radio(self, port): + return port < self.BASE_PORT * 2 - def _to_core_addr(self, addr): - assert self._is_radio(addr) - return (addr[0], addr[1] + self.BASE_PORT) + def _to_core_port(self, port): + assert self._is_radio(port) + return port + self.BASE_PORT - def _to_radio_addr(self, addr): - assert not self._is_radio(addr) - return (addr[0], addr[1] - self.BASE_PORT) + def _to_radio_port(self, port): + assert not self._is_radio(port) + return port - self.BASE_PORT - def _core_addr_from(self, nodeid): - if self._nodes[nodeid].is_posix: - return ('127.0.0.1', self.BASE_PORT + self.port + nodeid) + def _core_port_from(self, nodeid): + if nodeid in self._nodes and self._nodes[nodeid].is_posix: + return self.BASE_PORT + self.port + nodeid else: - return ('127.0.0.1', self.port + nodeid) + return self.port + nodeid + + def _radio_port_from(self, nodeid): + return self.port + nodeid def _next_event_time(self): if len(self.event_queue) == 0: @@ -256,15 +280,64 @@ class VirtualTime(BaseSimulator): else: return self.event_queue[0][0] - def receive_events(self): + def receive_events(self, timeout): + events = self.sel.select(timeout=timeout) + for key, mask in events: + if not self.USE_UNIX_SOCKET: + msg, addr = key.fileobj.recvfrom(self.MAX_MESSAGE) + port = addr[1] + if port not in self.devices: + self.devices[port] = {} + self.devices[port]['alarm'] = None + self.devices[port]['msgs'] = [] + self.devices[port]['time'] = self.current_time + self.awake_devices.discard(port) + + yield (msg, port) + + elif key.data is None: + sock, addr = key.fileobj.accept() + sock.setblocking(False) + + port = int(addr.split('.')[1]) + data = types.SimpleNamespace(port=port, inb=b"", outb=b"") + self.sel.register(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data) + + if addr == self._spinel_cli_addr: + continue + + self.devices[port] = {} + self.devices[port]['sock'] = sock + self.devices[port]['alarm'] = None + self.devices[port]['msgs'] = [] + self.devices[port]['time'] = self.current_time + self.awake_devices.discard(port) + + elif mask & selectors.EVENT_READ: + try: + msg = key.fileobj.recv(self.MAX_MESSAGE) + except ConnectionResetError: + msg = b'' + port = key.data.port + if msg: + yield (msg, port) + else: + del self.devices[port]['sock'] + self.sel.unregister(key.fileobj) + key.fileobj.close() + + def process_events(self): """ Receive events until all devices are asleep. """ while True: + timeout = 0 if (self.current_event or len(self.awake_devices) or (self._next_event_time() > self._pause_time and self.current_nodeid)): - self.sock.settimeout(self.BLOCK_TIMEOUT) - try: - msg, addr = self.sock.recvfrom(self.MAX_MESSAGE) - except socket.error: + timeout = self.BLOCK_TIMEOUT + + try: + events = self.receive_events(timeout) + except: + if timeout: # print debug information on failure print('Current nodeid:') print(self.current_nodeid) @@ -278,145 +351,143 @@ class VirtualTime(BaseSimulator): for event in self.event_queue: print(event) raise - else: - self.sock.settimeout(0) - try: - msg, addr = self.sock.recvfrom(self.MAX_MESSAGE) - except socket.error: + else: break - if addr != self._spinel_cli_addr and addr not in self.devices: - self.devices[addr] = {} - self.devices[addr]['alarm'] = None - self.devices[addr]['msgs'] = [] - self.devices[addr]['time'] = self.current_time - self.awake_devices.discard(addr) - # print "New device:", addr, self.devices + has_events = False + for (msg, port) in events: + has_events = True + self.process_event(msg, port) - delay, type, datalen = struct.unpack('=QBH', msg[:11]) - data = msg[11:] + if timeout or has_events: + continue + break - event_time = self.current_time + delay + def process_event(self, msg, port): + delay, type, datalen = struct.unpack('=QBH', msg[:11]) + data = msg[11:] - if data: - dbg_print( - "New event: ", - event_time, - addr, - type, - datalen, - binascii.hexlify(data), - ) - else: - dbg_print("New event: ", event_time, addr, type, datalen) + event_time = self.current_time + delay - if type == self.OT_SIM_EVENT_ALARM_FIRED: - # remove any existing alarm event for device - if self.devices[addr]['alarm']: - self.event_queue.remove(self.devices[addr]['alarm']) - # print "-- Remove\t", self.devices[addr]['alarm'] + if data: + dbg_print( + "New event: ", + event_time, + port, + type, + datalen, + binascii.hexlify(data), + ) + else: + dbg_print("New event: ", event_time, port, type, datalen) - # add alarm event to event queue - event = (event_time, self.event_sequence, addr, type, datalen) - self.event_sequence += 1 - # print "-- Enqueue\t", event, delay, self.current_time - bisect.insort(self.event_queue, event) - self.devices[addr]['alarm'] = event + if type == self.OT_SIM_EVENT_ALARM_FIRED: + # remove any existing alarm event for device + if self.devices[port]['alarm']: + self.event_queue.remove(self.devices[port]['alarm']) + # print "-- Remove\t", self.devices[port]['alarm'] - self.awake_devices.discard(addr) + # add alarm event to event queue + event = (event_time, self.event_sequence, port, type, datalen) + self.event_sequence += 1 + # print "-- Enqueue\t", event, delay, self.current_time + bisect.insort(self.event_queue, event) + self.devices[port]['alarm'] = event - if (self.current_event and self.current_event[self.EVENT_ADDR] == addr): - # print "Done\t", self.current_event - self.current_event = None + self.awake_devices.discard(port) - elif type == self.OT_SIM_EVENT_RADIO_RECEIVED: - assert self._is_radio(addr) - # add radio receive events event queue - frame_info = wpan.dissect(data) + if (self.current_event and self.current_event[self.EVENT_ADDR] == port): + # print "Done\t", self.current_event + self.current_event = None - recv_devices = None - if frame_info.frame_type == wpan.FrameType.ACK: - recv_devices = self._nodes_by_ack_seq.get(frame_info.seq_no) + elif type == self.OT_SIM_EVENT_RADIO_RECEIVED: + assert self._is_radio(port) + # add radio receive events event queue + frame_info = wpan.dissect(data) - recv_devices = recv_devices or self.devices.keys() + recv_devices = None + if frame_info.frame_type == wpan.FrameType.ACK: + recv_devices = self._nodes_by_ack_seq.get(frame_info.seq_no) - for device in recv_devices: - if device != addr and self._is_radio(device): - event = ( - event_time, - self.event_sequence, - device, - type, - datalen, - data, - ) - self.event_sequence += 1 - # print "-- Enqueue\t", event - bisect.insort(self.event_queue, event) + recv_devices = recv_devices or self.devices.keys() - self._pcap.append(data, (event_time // 1000000, event_time % 1000000)) - self._add_message(addr[1] - self.port, data) + for device in recv_devices: + if device != port and self._is_radio(device): + event = ( + event_time, + self.event_sequence, + device, + type, + datalen, + data, + ) + self.event_sequence += 1 + # print "-- Enqueue\t", event + bisect.insort(self.event_queue, event) - # add radio transmit done events to event queue - event = ( - event_time, - self.event_sequence, - addr, - type, - datalen, - data, - ) - self.event_sequence += 1 - bisect.insort(self.event_queue, event) + self._pcap.append(data, (event_time // 1000000, event_time % 1000000)) + self._add_message(port, data) - if frame_info.frame_type != wpan.FrameType.ACK and not frame_info.is_broadcast: - self._on_ack_seq_change(addr, frame_info.seq_no) + # add radio transmit done events to event queue + event = ( + event_time, + self.event_sequence, + port, + type, + datalen, + data, + ) + self.event_sequence += 1 + bisect.insort(self.event_queue, event) - self.awake_devices.add(addr) + if frame_info.frame_type != wpan.FrameType.ACK and not frame_info.is_broadcast: + self._on_ack_seq_change(port, frame_info.seq_no) - elif type == self.OT_SIM_EVENT_RADIO_SPINEL_WRITE: - assert not self._is_radio(addr) - radio_addr = self._to_radio_addr(addr) - if radio_addr not in self.devices: - self.awake_devices.add(radio_addr) + self.awake_devices.add(port) - event = ( - event_time, - self.event_sequence, - radio_addr, - self.OT_SIM_EVENT_UART_WRITE, - datalen, - data, - ) - self.event_sequence += 1 - bisect.insort(self.event_queue, event) + elif type == self.OT_SIM_EVENT_RADIO_SPINEL_WRITE: + assert not self._is_radio(port) + radio_port = self._to_radio_port(port) + if radio_port not in self.devices: + self.awake_devices.add(radio_port) - self.awake_devices.add(addr) + event = ( + event_time, + self.event_sequence, + radio_port, + self.OT_SIM_EVENT_UART_WRITE, + datalen, + data, + ) + self.event_sequence += 1 + bisect.insort(self.event_queue, event) - elif type == self.OT_SIM_EVENT_UART_WRITE: - assert self._is_radio(addr) - core_addr = self._to_core_addr(addr) - if core_addr not in self.devices: - self.awake_devices.add(core_addr) + self.awake_devices.add(port) - event = ( - event_time, - self.event_sequence, - core_addr, - self.OT_SIM_EVENT_RADIO_SPINEL_WRITE, - datalen, - data, - ) - self.event_sequence += 1 - bisect.insort(self.event_queue, event) + elif type == self.OT_SIM_EVENT_UART_WRITE: + assert self._is_radio(port) + core_port = self._to_core_port(port) + if core_port not in self.devices: + self.awake_devices.add(core_port) - self.awake_devices.add(addr) + event = ( + event_time, + self.event_sequence, + core_port, + self.OT_SIM_EVENT_RADIO_SPINEL_WRITE, + datalen, + data, + ) + self.event_sequence += 1 + bisect.insort(self.event_queue, event) - elif type == self.OT_SIM_EVENT_POSTCMD: - assert self.current_time == self._pause_time - nodeid = struct.unpack('=B', data)[0] - if self.current_nodeid == nodeid: - self.current_nodeid = None + self.awake_devices.add(port) + + elif type == self.OT_SIM_EVENT_POSTCMD: + assert self.current_time == self._pause_time + nodeid = struct.unpack('=B', data)[0] + if self.current_nodeid == nodeid: + self.current_nodeid = None def _on_ack_seq_change(self, device: tuple, seq_no: int): old_seq = self._node_ack_seq.pop(device, None) @@ -426,15 +497,18 @@ class VirtualTime(BaseSimulator): self._node_ack_seq[device] = seq_no self._nodes_by_ack_seq.setdefault(seq_no, set()).add(device) - def _send_message(self, message, addr): - while True: - try: - sent = self.sock.sendto(message, addr) - except socket.error: - traceback.print_exc() - time.sleep(0) + def _send_message(self, message, port): + if not self.USE_UNIX_SOCKET: + sent = self.sock.sendto(message, ('127.0.0.1', port)) + else: + sock = self.devices[port].get('sock', None) + if sock: + sent = sock.send(message) else: - break + assert port in self._maybeoff_ports, f'The node {port} is unexpectedly off' + dbg_print('skip sending message to off node', port) + return + assert sent == len(message) def process_next_event(self): @@ -445,14 +519,14 @@ class VirtualTime(BaseSimulator): event = self.event_queue.pop(0) if len(event) == 5: - event_time, sequence, addr, type, datalen = event - dbg_print("Pop event: ", event_time, addr, type, datalen) + event_time, sequence, port, type, datalen = event + dbg_print("Pop event: ", event_time, port, type, datalen) else: - event_time, sequence, addr, type, datalen, data = event + event_time, sequence, port, type, datalen, data = event dbg_print( "Pop event: ", event_time, - addr, + port, type, datalen, binascii.hexlify(data), @@ -463,54 +537,58 @@ class VirtualTime(BaseSimulator): assert event_time >= self.current_time self.current_time = event_time - elapsed = event_time - self.devices[addr]['time'] - self.devices[addr]['time'] = event_time + elapsed = event_time - self.devices[port]['time'] + self.devices[port]['time'] = event_time message = struct.pack('=QBH', elapsed, type, datalen) if type == self.OT_SIM_EVENT_ALARM_FIRED: - self.devices[addr]['alarm'] = None - self._send_message(message, addr) + self.devices[port]['alarm'] = None + self._send_message(message, port) elif type == self.OT_SIM_EVENT_RADIO_RECEIVED: message += data - self._send_message(message, addr) + self._send_message(message, port) elif type == self.OT_SIM_EVENT_RADIO_SPINEL_WRITE: message += data - self._send_message(message, addr) + self._send_message(message, port) elif type == self.OT_SIM_EVENT_UART_WRITE: message += data - self._send_message(message, addr) + self._send_message(message, port) def sync_devices(self): self.current_time = self._pause_time - for addr in self.devices: - elapsed = self.current_time - self.devices[addr]['time'] + for port in self.devices: + elapsed = self.current_time - self.devices[port]['time'] if elapsed == 0: continue - dbg_print('syncing', addr, elapsed) - self.devices[addr]['time'] = self.current_time + dbg_print('syncing', port, elapsed) + self.devices[port]['time'] = self.current_time message = struct.pack('=QBH', elapsed, self.OT_SIM_EVENT_ALARM_FIRED, 0) - self._send_message(message, addr) - self.awake_devices.add(addr) - self.receive_events() + self._send_message(message, port) + self.awake_devices.add(port) + self.process_events() self.awake_devices.clear() def now(self): return self.current_time / 1000000 - def go(self, duration, nodeid=None): + def go(self, duration, nodeid=None, maybeoff=False): assert self.current_time == self._pause_time duration = int(duration * 1000000) dbg_print('running for %d us' % duration) self._pause_time += duration + self._maybeoff_ports = () if nodeid: if self.NCP_SIM: self.current_nodeid = nodeid - self.awake_devices.add(self._core_addr_from(nodeid)) - self.receive_events() + core_port = self._core_port_from(nodeid) + self.awake_devices.add(core_port) + if maybeoff: + self._maybeoff_ports = (core_port,) + self.process_events() while self._next_event_time() <= self._pause_time: self.process_next_event() - self.receive_events() + self.process_events() if duration > 0: self.sync_devices() dbg_print('current time %d us' % self.current_time)