Skip to content

Commit

Permalink
Merge branch 'test-sdo-variables' into sdorecord-skip-count
Browse files Browse the repository at this point in the history
  • Loading branch information
acolomb committed Aug 17, 2024
2 parents 27b3724 + 30c4071 commit 36150d7
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 62 deletions.
60 changes: 53 additions & 7 deletions test/sample.eds
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ DataType=0x0007
AccessType=ro
PDOMapping=0

[1018sub3]
ParameterName=Revision number
ObjectType=0x7
DataType=0x0007
AccessType=ro
PDOMapping=0
; [1018sub3] left out for testing

[1018sub4]
ParameterName=Serial number
Expand All @@ -123,11 +118,62 @@ SupportedObjects=3
[1003]
ParameterName=Pre-defined error field
ObjectType=0x8
CompactSubObj=255
SubNumber=9

[1003sub0]
ParameterName=Number of errors
ObjectType=0x7
DataType=0x0005
AccessType=rw
DefaultValue=3
PDOMapping=0

[1003sub1]
ParameterName=Pre-defined error field_1
ObjectType=0x7
DataType=0x0007
AccessType=ro
DefaultValue=0
PDOMapping=0

; [1003sub2] left out for testing

[1003sub3]
ParameterName=Pre-defined error field_3
ObjectType=0x7
DataType=0x0007
AccessType=ro
DefaultValue=0
PDOMapping=0

[1003sub4]
ParameterName=Pre-defined error field_4
ObjectType=0x7
DataType=0x0007
AccessType=ro
DefaultValue=0
PDOMapping=0

[1003sub5]
ParameterName=Pre-defined error field_5
ObjectType=0x7
DataType=0x0007
AccessType=ro
DefaultValue=0
PDOMapping=0

; [1003sub6] left out for testing

[1003sub7]
ParameterName=Pre-defined error field_7
ObjectType=0x7
DataType=0x0007
AccessType=ro
DefaultValue=0
PDOMapping=0

; [1003sub8] left out for testing

[1008]
ParameterName=Manufacturer device name
ObjectType=0x7
Expand Down
6 changes: 5 additions & 1 deletion test/test_eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test_relative_variable(self):
def test_record(self):
record = self.od['Identity object']
self.assertIsInstance(record, canopen.objectdictionary.ODRecord)
self.assertEqual(len(record), 5)
self.assertEqual(len(record), 4)
self.assertEqual(record.index, 0x1018)
self.assertEqual(record.name, 'Identity object')
var = record['Vendor-ID']
Expand Down Expand Up @@ -357,3 +357,7 @@ def verify_od(self, source, doctype):
f" mismatch on {pretty_index(evar.index, evar.subindex)}")

self.assertEqual(self.od.comments, exported_od.comments)


if __name__ == "__main__":
unittest.main()
4 changes: 4 additions & 0 deletions test/test_emcy.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,7 @@ def check(*args, res):
check(res=b'\x00\x00\x00\x00\x00\x00\x00\x00')
check(3, res=b'\x00\x00\x03\x00\x00\x00\x00\x00')
check(3, b"\xaa\xbb", res=b'\x00\x00\x03\xaa\xbb\x00\x00\x00')


if __name__ == "__main__":
unittest.main()
4 changes: 2 additions & 2 deletions test/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_segmented_download(self):
def test_slave_send_heartbeat(self):
# Setting the heartbeat time should trigger heartbeating
# to start
self.remote_node.sdo["Producer heartbeat time"].raw = 1000
self.remote_node.sdo["Producer heartbeat time"].raw = 100
state = self.remote_node.nmt.wait_for_heartbeat()
self.local_node.nmt.stop_heartbeat()
# The NMT master will change the state INITIALISING (0)
Expand All @@ -98,7 +98,7 @@ def test_slave_send_heartbeat(self):

def test_nmt_state_initializing_to_preoper(self):
# Initialize the heartbeat timer
self.local_node.sdo["Producer heartbeat time"].raw = 1000
self.local_node.sdo["Producer heartbeat time"].raw = 100
self.local_node.nmt.stop_heartbeat()
# This transition shall start the heartbeating
self.local_node.nmt.state = 'INITIALISING'
Expand Down
55 changes: 27 additions & 28 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import unittest
import threading
import time
import unittest

import canopen
import can
Expand Down Expand Up @@ -231,59 +232,57 @@ def test_network_send_periodic(self):
DATA1 = bytes([1, 2, 3])
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
PERIOD = 0.01
TIMEOUT = PERIOD * 10
self.network.connect(interface="virtual", receive_own_messages=True)
self.network.connect(interface="virtual")
self.addCleanup(self.network.disconnect)

acc = []
condition = threading.Condition()

def hook(_, data, ts):
with condition:
item = data, ts
acc.append(item)
condition.notify_all()
bus = can.Bus(interface="virtual")
self.addCleanup(bus.shutdown)

self.network.subscribe(COB_ID, hook)
self.addCleanup(self.network.unsubscribe, COB_ID)
acc = []

task = self.network.send_periodic(COB_ID, DATA1, PERIOD)
self.addCleanup(task.stop)

def periodicity():
def wait_for_periodicity():
# Check if periodicity is established; flakiness has been observed
# on macOS.
if len(acc) >= 2:
delta = acc[-1][1] - acc[-2][1]
return round(delta, ndigits=1) == PERIOD
return False
end_time = time.time() + TIMEOUT
while time.time() < end_time:
if msg := bus.recv(PERIOD):
acc.append(msg)
if len(acc) >= 2:
first, last = acc[-2:]
delta = last.timestamp - first.timestamp
if round(delta, ndigits=2) == PERIOD:
return
self.fail("Timed out")

# Wait for frames to arrive; then check the result.
with condition:
condition.wait_for(periodicity, TIMEOUT)
self.assertTrue(all(v[0] == DATA1 for v in acc))
wait_for_periodicity()
self.assertTrue(all([v.data == DATA1 for v in acc]))

# Update task data, which may implicitly restart the timer.
# Wait for frames to arrive; then check the result.
task.update(DATA2)
with condition:
acc.clear()
condition.wait_for(periodicity, TIMEOUT)
acc.clear()
wait_for_periodicity()
# Find the first message with new data, and verify that all subsequent
# messages also carry the new payload.
data = [v[0] for v in acc]
data = [v.data for v in acc]
self.assertIn(DATA2, data)
idx = data.index(DATA2)
self.assertTrue(all(v[0] == DATA2 for v in acc[idx:]))
self.assertTrue(all([v.data == DATA2 for v in acc[idx:]]))

# Stop the task.
task.stop()
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
bus = self.network.bus
msg = bus.recv(TIMEOUT)
msg = bus.recv(PERIOD)
if msg is not None:
self.assertIsNone(bus.recv(TIMEOUT))
self.assertIsNone(bus.recv(PERIOD))


class TestScanner(unittest.TestCase):
Expand Down
56 changes: 32 additions & 24 deletions test/test_nmt.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import time
import unittest

Expand Down Expand Up @@ -41,28 +42,28 @@ def test_state_set_invalid(self):

class TestNmtMaster(unittest.TestCase):
NODE_ID = 2
COB_ID = 0x700 + NODE_ID
PERIOD = 0.01
TIMEOUT = PERIOD * 2
TIMEOUT = PERIOD * 10

def setUp(self):
bus = can.ThreadSafeBus(
interface="virtual",
channel="test",
receive_own_messages=True,
)
net = canopen.Network(bus)
net = canopen.Network()
net.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
net.connect()
net.connect(interface="virtual")
with self.assertLogs():
node = net.add_node(self.NODE_ID, SAMPLE_EDS)

self.bus = bus
self.bus = can.Bus(interface="virtual")
self.net = net
self.node = node

def tearDown(self):
self.net.disconnect()
self.bus.shutdown()

def dispatch_heartbeat(self, code):
cob_id = 0x700 + self.NODE_ID
hb = can.Message(arbitration_id=cob_id, data=[code])
self.bus.send(hb)

def test_nmt_master_no_heartbeat(self):
with self.assertRaisesRegex(NmtError, "heartbeat"):
Expand All @@ -74,47 +75,54 @@ def test_nmt_master_on_heartbeat(self):
# Skip the special INITIALISING case.
for code in [st for st in NMT_STATES if st != 0]:
with self.subTest(code=code):
task = self.net.send_periodic(self.COB_ID, [code], self.PERIOD)
try:
actual = self.node.nmt.wait_for_heartbeat(self.TIMEOUT)
finally:
task.stop()
t = threading.Timer(0.01, self.dispatch_heartbeat, args=(code,))
t.start()
self.addCleanup(t.join)
actual = self.node.nmt.wait_for_heartbeat(0.1)
expected = NMT_STATES[code]
self.assertEqual(actual, expected)

def test_nmt_master_on_heartbeat_initialising(self):
task = self.net.send_periodic(self.COB_ID, [0], self.PERIOD)
self.addCleanup(task.stop)
def test_nmt_master_wait_for_bootup(self):
t = threading.Timer(0.01, self.dispatch_heartbeat, args=(0x00,))
t.start()
self.addCleanup(t.join)
self.node.nmt.wait_for_bootup(self.TIMEOUT)
self.assertEqual(self.node.nmt.state, "PRE-OPERATIONAL")

def test_nmt_master_on_heartbeat_initialising(self):
t = threading.Timer(0.01, self.dispatch_heartbeat, args=(0x00,))
t.start()
self.addCleanup(t.join)
state = self.node.nmt.wait_for_heartbeat(self.TIMEOUT)
self.assertEqual(state, "PRE-OPERATIONAL")

def test_nmt_master_on_heartbeat_unknown_state(self):
task = self.net.send_periodic(self.COB_ID, [0xcb], self.PERIOD)
self.addCleanup(task.stop)
t = threading.Timer(0.01, self.dispatch_heartbeat, args=(0xcb,))
t.start()
self.addCleanup(t.join)
state = self.node.nmt.wait_for_heartbeat(self.TIMEOUT)
# Expect the high bit to be masked out, and a formatted string to
# be returned.
self.assertEqual(state, "UNKNOWN STATE '75'")

def test_nmt_master_add_heartbeat_callback(self):
from threading import Event
event = Event()
event = threading.Event()
state = None
def hook(st):
nonlocal state
state = st
event.set()
self.node.nmt.add_heartbeat_callback(hook)
self.net.send_message(self.COB_ID, bytes([127]))

self.dispatch_heartbeat(0x7f)
self.assertTrue(event.wait(self.TIMEOUT))
self.assertEqual(state, 127)

def test_nmt_master_node_guarding(self):
self.node.nmt.start_node_guarding(self.PERIOD)
msg = self.bus.recv(self.TIMEOUT)
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, self.COB_ID)
self.assertEqual(msg.arbitration_id, 0x700 + self.NODE_ID)
self.assertEqual(msg.dlc, 0)

self.node.nmt.stop_node_guarding()
Expand Down
4 changes: 4 additions & 0 deletions test/test_od.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,7 @@ def test_subindexes(self):
self.assertEqual(array[1].name, "Test Variable")
self.assertEqual(array[2].name, "Test Variable 2")
self.assertEqual(array[3].name, "Test Variable_3")


if __name__ == "__main__":
unittest.main()
37 changes: 37 additions & 0 deletions test/test_sdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,43 @@
RX = 2


class TestSDOVariables(unittest.TestCase):
"""Some basic assumptions on the behavior of SDO variable objects.
Mostly what is stated in the API docs.
"""

def setUp(self):
node = canopen.LocalNode(1, SAMPLE_EDS)
self.sdo_node = node.sdo

def test_record_iter_length(self):
"""Assume the "highest subindex supported" entry is not counted.
Sub-objects without an OD entry should be skipped as well."""
record = self.sdo_node[0x1018]
subs = sum(1 for _ in iter(record))
self.assertEqual(len(record), 3)
self.assertEqual(subs, 3)

def test_array_iter_length(self):
"""Assume the "highest subindex supported" entry is not counted."""
array = self.sdo_node[0x1003]
subs = sum(1 for _ in iter(array))
self.assertEqual(len(array), 3)
self.assertEqual(subs, 3)
# Simulate more entries getting added dynamically
array[0].set_data(b'\x08')
subs = sum(1 for _ in iter(array))
self.assertEqual(subs, 8)

def test_array_members_dynamic(self):
"""Check if sub-objects missing from OD entry are generated dynamically."""
array = self.sdo_node[0x1003]
for var in array.values():
self.assertIsInstance(var, canopen.sdo.SdoVariable)


class TestSDO(unittest.TestCase):
"""
Test SDO traffic by example. Most are taken from
Expand Down

0 comments on commit 36150d7

Please sign in to comment.