diff --git a/test/test_network.py b/test/test_network.py index ed440326..d488a2bc 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -1,5 +1,5 @@ -import time import unittest +from threading import Event import canopen import can @@ -10,7 +10,8 @@ class TestNetwork(unittest.TestCase): def setUp(self): network = canopen.Network() - network.add_node(2, SAMPLE_EDS) + with self.assertLogs(): + network.add_node(2, SAMPLE_EDS) network.add_node(3, network[2].object_dictionary) self.network = network @@ -31,7 +32,10 @@ def test_notify(self): def test_send(self): bus = can.interface.Bus(interface="virtual", channel=1) + self.addCleanup(bus.shutdown) + self.network.connect(interface="virtual", channel=1) + self.addCleanup(self.network.disconnect) # Send standard ID self.network.send_message(0x123, [1, 2, 3, 4, 5, 6, 7, 8]) @@ -48,33 +52,43 @@ def test_send(self): self.assertEqual(msg.arbitration_id, 0x12345) self.assertTrue(msg.is_extended_id) - bus.shutdown() - self.network.disconnect() - - def test_send_perodic(self): - bus = can.interface.Bus(interface="virtual", channel=1) - self.network.connect(interface="virtual", channel=1) - - task = self.network.send_periodic(0x123, [1, 2, 3], 0.01) - time.sleep(0.1) - # FIXME: This test is a little fragile, as the number of elements - # depends on the timing of the machine. - print(f"Queue size: {bus.queue.qsize()}") - self.assertTrue(9 <= bus.queue.qsize() <= 13) - msg = bus.recv(0) - self.assertIsNotNone(msg) - self.assertSequenceEqual(msg.data, [1, 2, 3]) - # Update data - task.update([4, 5, 6]) - time.sleep(0.02) - while msg is not None and msg.data == b'\x01\x02\x03': - msg = bus.recv(0) - self.assertIsNotNone(msg) - self.assertSequenceEqual(msg.data, [4, 5, 6]) + def test_send_periodic(self): + DATA1 = bytes([1, 2, 3]) + DATA2 = bytes([4, 5, 6]) + COB_ID = 0x123 + PERIOD = 0.1 + self.network.connect( + interface="virtual", + channel=1, + receive_own_messages=True + ) + self.addCleanup(self.network.disconnect) + + acc = [] + event = Event() + + def hook(_, data, ts): + acc.append((data, ts)) + event.set() + + self.network.subscribe(COB_ID, hook) + self.addCleanup(self.network.unsubscribe, COB_ID) + + task = self.network.send_periodic(COB_ID, DATA1, PERIOD) + self.addCleanup(task.stop) + + event.wait(PERIOD*2) + + # Update task data. + task.update(DATA2) + event.clear() + event.wait(PERIOD*2) task.stop() - bus.shutdown() - self.network.disconnect() + data = [v[0] for v in acc] + self.assertEqual(data, [DATA1, DATA2]) + ts = [v[1] for v in acc] + self.assertAlmostEqual(ts[1]-ts[0], PERIOD, places=1) class TestScanner(unittest.TestCase):