From 08eba81f519e637e0c20d58560ca9d6637e0adfe Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Mon, 8 Jul 2024 15:42:45 +0200 Subject: [PATCH] Tests: establish test/util.py and test/test_nmt.py (#503) Give NMT tests their own file, and establish a util.py for the test suite for common stuff like the location of sample EDS files. Consistently use SAMPLE_EDS instead of EDS_PATH. --- test/__init__.py | 0 test/test_eds.py | 6 ++-- test/test_local.py | 84 +++++--------------------------------------- test/test_network.py | 8 ++--- test/test_nmt.py | 64 +++++++++++++++++++++++++++++++++ test/test_pdo.py | 9 +++-- test/test_sdo.py | 16 +++------ test/util.py | 5 +++ 8 files changed, 92 insertions(+), 100 deletions(-) create mode 100644 test/__init__.py create mode 100644 test/test_nmt.py create mode 100644 test/util.py diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/test_eds.py b/test/test_eds.py index 49d09ab7..d953b994 100644 --- a/test/test_eds.py +++ b/test/test_eds.py @@ -1,12 +1,10 @@ import os import unittest + import canopen from canopen.objectdictionary.eds import _signed_int_from_hex from canopen.utils import pretty_index - - -SAMPLE_EDS = os.path.join(os.path.dirname(__file__), 'sample.eds') -DATATYPES_EDS = os.path.join(os.path.dirname(__file__), 'datatypes.eds') +from .util import SAMPLE_EDS, DATATYPES_EDS class TestEDS(unittest.TestCase): diff --git a/test/test_local.py b/test/test_local.py index 387c9f88..9c5fc0c1 100644 --- a/test/test_local.py +++ b/test/test_local.py @@ -1,12 +1,8 @@ -import os -import unittest -import canopen -import logging import time +import unittest -# logging.basicConfig(level=logging.DEBUG) - -EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds') +import canopen +from .util import SAMPLE_EDS class TestSDO(unittest.TestCase): @@ -18,15 +14,15 @@ class TestSDO(unittest.TestCase): def setUpClass(cls): cls.network1 = canopen.Network() cls.network1.connect("test", interface="virtual") - cls.remote_node = cls.network1.add_node(2, EDS_PATH) + cls.remote_node = cls.network1.add_node(2, SAMPLE_EDS) cls.network2 = canopen.Network() cls.network2.connect("test", interface="virtual") - cls.local_node = cls.network2.create_node(2, EDS_PATH) + cls.local_node = cls.network2.create_node(2, SAMPLE_EDS) - cls.remote_node2 = cls.network1.add_node(3, EDS_PATH) + cls.remote_node2 = cls.network1.add_node(3, SAMPLE_EDS) - cls.local_node2 = cls.network2.create_node(3, EDS_PATH) + cls.local_node2 = cls.network2.create_node(3, SAMPLE_EDS) @classmethod def tearDownClass(cls): @@ -172,68 +168,6 @@ def test_callbacks(self): self.assertEqual(self._kwargs["data"], b"\x03\x04") -class TestNMT(unittest.TestCase): - """ - Test NMT slave. - """ - - @classmethod - def setUpClass(cls): - cls.network1 = canopen.Network() - cls.network1.connect("test", interface="virtual") - cls.remote_node = cls.network1.add_node(2, EDS_PATH) - - cls.network2 = canopen.Network() - cls.network2.connect("test", interface="virtual") - cls.local_node = cls.network2.create_node(2, EDS_PATH) - - cls.remote_node2 = cls.network1.add_node(3, EDS_PATH) - - cls.local_node2 = cls.network2.create_node(3, EDS_PATH) - - @classmethod - def tearDownClass(cls): - cls.network1.disconnect() - cls.network2.disconnect() - - def test_start_two_remote_nodes(self): - self.remote_node.nmt.state = 'OPERATIONAL' - # Line below is just so that we are sure the client have received the command - # before we do the check - time.sleep(0.1) - slave_state = self.local_node.nmt.state - self.assertEqual(slave_state, 'OPERATIONAL') - - self.remote_node2.nmt.state = 'OPERATIONAL' - # Line below is just so that we are sure the client have received the command - # before we do the check - time.sleep(0.1) - slave_state = self.local_node2.nmt.state - self.assertEqual(slave_state, 'OPERATIONAL') - - def test_stop_two_remote_nodes_using_broadcast(self): - # This is a NMT broadcast "Stop remote node" - # ie. set the node in STOPPED state - self.network1.send_message(0, [2, 0]) - - # Line below is just so that we are sure the slaves have received the command - # before we do the check - time.sleep(0.1) - slave_state = self.local_node.nmt.state - self.assertEqual(slave_state, 'STOPPED') - slave_state = self.local_node2.nmt.state - self.assertEqual(slave_state, 'STOPPED') - - def test_heartbeat(self): - # self.assertEqual(self.remote_node.nmt.state, 'INITIALISING') - # self.assertEqual(self.local_node.nmt.state, 'INITIALISING') - self.local_node.nmt.state = 'OPERATIONAL' - self.local_node.sdo[0x1017].raw = 100 - time.sleep(0.2) - self.assertEqual(self.remote_node.nmt.state, 'OPERATIONAL') - - self.local_node.nmt.stop_heartbeat() - class TestPDO(unittest.TestCase): """ Test PDO slave. @@ -243,11 +177,11 @@ class TestPDO(unittest.TestCase): def setUpClass(cls): cls.network1 = canopen.Network() cls.network1.connect("test", interface="virtual") - cls.remote_node = cls.network1.add_node(2, EDS_PATH) + cls.remote_node = cls.network1.add_node(2, SAMPLE_EDS) cls.network2 = canopen.Network() cls.network2.connect("test", interface="virtual") - cls.local_node = cls.network2.create_node(2, EDS_PATH) + cls.local_node = cls.network2.create_node(2, SAMPLE_EDS) @classmethod def tearDownClass(cls): diff --git a/test/test_network.py b/test/test_network.py index 03cbe61b..ed440326 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -1,18 +1,16 @@ import time -import os import unittest -import canopen +import canopen import can - -EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds') +from .util import SAMPLE_EDS class TestNetwork(unittest.TestCase): def setUp(self): network = canopen.Network() - network.add_node(2, EDS_PATH) + network.add_node(2, SAMPLE_EDS) network.add_node(3, network[2].object_dictionary) self.network = network diff --git a/test/test_nmt.py b/test/test_nmt.py new file mode 100644 index 00000000..50db1187 --- /dev/null +++ b/test/test_nmt.py @@ -0,0 +1,64 @@ +import time +import unittest + +import canopen +from .util import SAMPLE_EDS + + +class TestNmtSlave(unittest.TestCase): + def setUp(self): + self.network1 = canopen.Network() + self.network1.connect("test", interface="virtual") + self.remote_node = self.network1.add_node(2, SAMPLE_EDS) + + self.network2 = canopen.Network() + self.network2.connect("test", interface="virtual") + self.local_node = self.network2.create_node(2, SAMPLE_EDS) + self.remote_node2 = self.network1.add_node(3, SAMPLE_EDS) + self.local_node2 = self.network2.create_node(3, SAMPLE_EDS) + + def tearDown(self): + self.network1.disconnect() + self.network2.disconnect() + + def test_start_two_remote_nodes(self): + self.remote_node.nmt.state = "OPERATIONAL" + # Line below is just so that we are sure the client have received the command + # before we do the check + time.sleep(0.1) + slave_state = self.local_node.nmt.state + self.assertEqual(slave_state, "OPERATIONAL") + + self.remote_node2.nmt.state = "OPERATIONAL" + # Line below is just so that we are sure the client have received the command + # before we do the check + time.sleep(0.1) + slave_state = self.local_node2.nmt.state + self.assertEqual(slave_state, "OPERATIONAL") + + def test_stop_two_remote_nodes_using_broadcast(self): + # This is a NMT broadcast "Stop remote node" + # ie. set the node in STOPPED state + self.network1.send_message(0, [2, 0]) + + # Line below is just so that we are sure the slaves have received the command + # before we do the check + time.sleep(0.1) + slave_state = self.local_node.nmt.state + self.assertEqual(slave_state, "STOPPED") + slave_state = self.local_node2.nmt.state + self.assertEqual(slave_state, "STOPPED") + + def test_heartbeat(self): + self.assertEqual(self.remote_node.nmt.state, "INITIALISING") + self.assertEqual(self.local_node.nmt.state, "INITIALISING") + self.local_node.nmt.state = "OPERATIONAL" + self.local_node.sdo[0x1017].raw = 100 + time.sleep(0.2) + self.assertEqual(self.remote_node.nmt.state, "OPERATIONAL") + + self.local_node.nmt.stop_heartbeat() + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_pdo.py b/test/test_pdo.py index 32963cf3..7e9947f1 100644 --- a/test/test_pdo.py +++ b/test/test_pdo.py @@ -1,14 +1,13 @@ -import os.path import unittest -import canopen -EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds') +import canopen +from .util import SAMPLE_EDS class TestPDO(unittest.TestCase): def test_bit_mapping(self): - node = canopen.Node(1, EDS_PATH) + node = canopen.Node(1, SAMPLE_EDS) map = node.pdo.tx[1] map.add_variable('INTEGER16 value') # 0x2001 map.add_variable('UNSIGNED8 value', length=4) # 0x2002 @@ -56,7 +55,7 @@ def test_bit_mapping(self): self.assertEqual(node.pdo[0x1600][0x2002].raw, 0xf) def test_save_pdo(self): - node = canopen.Node(1, EDS_PATH) + node = canopen.Node(1, SAMPLE_EDS) node.tpdo.save() node.rpdo.save() diff --git a/test/test_sdo.py b/test/test_sdo.py index 1ec35dab..f399a33f 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -1,12 +1,10 @@ -import os import unittest -# import binascii + import canopen -from canopen.objectdictionary import ODVariable import canopen.objectdictionary.datatypes as dt +from canopen.objectdictionary import ODVariable +from .util import SAMPLE_EDS, DATATYPES_EDS -EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds') -DATAEDS_PATH = os.path.join(os.path.dirname(__file__), 'datatypes.eds') TX = 1 RX = 2 @@ -26,17 +24,15 @@ def _send_message(self, can_id, data, remote=False): """ next_data = self.data.pop(0) self.assertEqual(next_data[0], TX, "No transmission was expected") - # print(f"> {binascii.hexlify(data)} ({binascii.hexlify(next_data[1])})") self.assertSequenceEqual(data, next_data[1]) self.assertEqual(can_id, 0x602) while self.data and self.data[0][0] == RX: - # print(f"< {binascii.hexlify(self.data[0][1])}") self.network.notify(0x582, self.data.pop(0)[1], 0.0) def setUp(self): network = canopen.Network() network.send_message = self._send_message - node = network.add_node(2, EDS_PATH) + node = network.add_node(2, SAMPLE_EDS) node.sdo.RESPONSE_TIMEOUT = 0.01 self.network = network @@ -178,17 +174,15 @@ def _send_message(self, can_id, data, remote=False): """ next_data = self.data.pop(0) self.assertEqual(next_data[0], TX, "No transmission was expected") - # print("> %s (%s)" % (binascii.hexlify(data), binascii.hexlify(next_data[1]))) self.assertSequenceEqual(data, next_data[1]) self.assertEqual(can_id, 0x602) while self.data and self.data[0][0] == RX: - # print("< %s" % binascii.hexlify(self.data[0][1])) self.network.notify(0x582, self.data.pop(0)[1], 0.0) def setUp(self): network = canopen.Network() network.send_message = self._send_message - node = network.add_node(2, DATAEDS_PATH) + node = network.add_node(2, DATATYPES_EDS) node.sdo.RESPONSE_TIMEOUT = 0.01 self.node = node self.network = network diff --git a/test/util.py b/test/util.py new file mode 100644 index 00000000..a4395921 --- /dev/null +++ b/test/util.py @@ -0,0 +1,5 @@ +import os + + +DATATYPES_EDS = os.path.join(os.path.dirname(__file__), "datatypes.eds") +SAMPLE_EDS = os.path.join(os.path.dirname(__file__), "sample.eds")