diff --git a/test/test_network.py b/test/test_network.py index 3a59a597..c5a1dcbb 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -1,6 +1,7 @@ import logging -import unittest import threading +import time +import unittest import canopen import can @@ -231,59 +232,58 @@ def test_network_send_periodic(self): DATA1 = bytes([1, 2, 3]) DATA2 = bytes([4, 5, 6]) COB_ID = 0x123 - PERIOD = 0.1 + PERIOD = 0.01 TIMEOUT = PERIOD * 10 - self.network.connect(interface="virtual", receive_own_messages=True) + self.network.connect(interface="virtual") self.addCleanup(self.network.disconnect) - acc = [] - condition = threading.Condition() - - def hook(_, data, ts): - with condition: - item = data, ts - acc.append(item) - condition.notify_all() + bus = can.Bus(interface="virtual") + self.addCleanup(bus.shutdown) - self.network.subscribe(COB_ID, hook) - self.addCleanup(self.network.unsubscribe, COB_ID) + acc = [] task = self.network.send_periodic(COB_ID, DATA1, PERIOD) self.addCleanup(task.stop) - def periodicity(): + def wait_for_periodicity(): # Check if periodicity is established; flakiness has been observed # on macOS. - if len(acc) >= 2: - delta = acc[-1][1] - acc[-2][1] - return round(delta, ndigits=1) == PERIOD - return False + end_time = time.time() + TIMEOUT + while True: + if time.time() >= end_time: + break + if msg := bus.recv(PERIOD): + acc.append(msg) + if len(acc) >= 2: + first, last = acc[-2:] + delta = last.timestamp - first.timestamp + return round(delta, ndigits=2) == PERIOD + self.fail("Timed out") # Wait for frames to arrive; then check the result. - with condition: - condition.wait_for(periodicity, TIMEOUT) - self.assertTrue(all(v[0] == DATA1 for v in acc)) + wait_for_periodicity() + self.assertTrue(all([v.data == DATA1 for v in acc])) # Update task data, which may implicitly restart the timer. # Wait for frames to arrive; then check the result. task.update(DATA2) - with condition: - acc.clear() - condition.wait_for(periodicity, TIMEOUT) + acc.clear() + wait_for_periodicity() # Find the first message with new data, and verify that all subsequent # messages also carry the new payload. - data = [v[0] for v in acc] + data = [v.data for v in acc] + self.assertIn(DATA2, data) idx = data.index(DATA2) - self.assertTrue(all(v[0] == DATA2 for v in acc[idx:])) + self.assertTrue(all([v.data == DATA2 for v in acc[idx:]])) # Stop the task. task.stop() # A message may have been in flight when we stopped the timer, # so allow a single failure. bus = self.network.bus - msg = bus.recv(TIMEOUT) + msg = bus.recv(PERIOD) if msg is not None: - self.assertIsNone(bus.recv(TIMEOUT)) + self.assertIsNone(bus.recv(PERIOD)) class TestScanner(unittest.TestCase):