Skip to content

Commit

Permalink
Tests: harden TestNetwork (#505)
Browse files Browse the repository at this point in the history
- make tests deterministic
- use cleanup hooks
- check the periodicity of heartbeats, by number of triggers
- roughly check the interval
  • Loading branch information
erlend-aasland committed Jul 9, 2024
1 parent 5207b32 commit 3aa509d
Showing 1 changed file with 41 additions and 27 deletions.
68 changes: 41 additions & 27 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
import unittest
from threading import Event

import canopen
import can
Expand All @@ -10,7 +10,8 @@ class TestNetwork(unittest.TestCase):

def setUp(self):
network = canopen.Network()
network.add_node(2, SAMPLE_EDS)
with self.assertLogs():
network.add_node(2, SAMPLE_EDS)
network.add_node(3, network[2].object_dictionary)
self.network = network

Expand All @@ -31,7 +32,10 @@ def test_notify(self):

def test_send(self):
bus = can.interface.Bus(interface="virtual", channel=1)
self.addCleanup(bus.shutdown)

self.network.connect(interface="virtual", channel=1)
self.addCleanup(self.network.disconnect)

# Send standard ID
self.network.send_message(0x123, [1, 2, 3, 4, 5, 6, 7, 8])
Expand All @@ -48,33 +52,43 @@ def test_send(self):
self.assertEqual(msg.arbitration_id, 0x12345)
self.assertTrue(msg.is_extended_id)

bus.shutdown()
self.network.disconnect()

def test_send_perodic(self):
bus = can.interface.Bus(interface="virtual", channel=1)
self.network.connect(interface="virtual", channel=1)

task = self.network.send_periodic(0x123, [1, 2, 3], 0.01)
time.sleep(0.1)
# FIXME: This test is a little fragile, as the number of elements
# depends on the timing of the machine.
print(f"Queue size: {bus.queue.qsize()}")
self.assertTrue(9 <= bus.queue.qsize() <= 13)
msg = bus.recv(0)
self.assertIsNotNone(msg)
self.assertSequenceEqual(msg.data, [1, 2, 3])
# Update data
task.update([4, 5, 6])
time.sleep(0.02)
while msg is not None and msg.data == b'\x01\x02\x03':
msg = bus.recv(0)
self.assertIsNotNone(msg)
self.assertSequenceEqual(msg.data, [4, 5, 6])
def test_send_periodic(self):
DATA1 = bytes([1, 2, 3])
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
self.network.connect(
interface="virtual",
channel=1,
receive_own_messages=True
)
self.addCleanup(self.network.disconnect)

acc = []
event = Event()

def hook(_, data, ts):
acc.append((data, ts))
event.set()

self.network.subscribe(COB_ID, hook)
self.addCleanup(self.network.unsubscribe, COB_ID)

task = self.network.send_periodic(COB_ID, DATA1, PERIOD)
self.addCleanup(task.stop)

event.wait(PERIOD*2)

# Update task data.
task.update(DATA2)
event.clear()
event.wait(PERIOD*2)
task.stop()

bus.shutdown()
self.network.disconnect()
data = [v[0] for v in acc]
self.assertEqual(data, [DATA1, DATA2])
ts = [v[1] for v in acc]
self.assertAlmostEqual(ts[1]-ts[0], PERIOD, places=1)


class TestScanner(unittest.TestCase):
Expand Down

0 comments on commit 3aa509d

Please sign in to comment.