Skip to content

Commit

Permalink
Tests: establish test/util.py and test/test_nmt.py (#503)
Browse files Browse the repository at this point in the history
Give NMT tests their own file, and establish a util.py for the test
suite for common stuff like the location of sample EDS files.

Consistently use SAMPLE_EDS instead of EDS_PATH.
  • Loading branch information
erlend-aasland committed Jul 8, 2024
1 parent c413eaf commit 08eba81
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 100 deletions.
Empty file added test/__init__.py
Empty file.
6 changes: 2 additions & 4 deletions test/test_eds.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import os
import unittest

import canopen
from canopen.objectdictionary.eds import _signed_int_from_hex
from canopen.utils import pretty_index


SAMPLE_EDS = os.path.join(os.path.dirname(__file__), 'sample.eds')
DATATYPES_EDS = os.path.join(os.path.dirname(__file__), 'datatypes.eds')
from .util import SAMPLE_EDS, DATATYPES_EDS


class TestEDS(unittest.TestCase):
Expand Down
84 changes: 9 additions & 75 deletions test/test_local.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import os
import unittest
import canopen
import logging
import time
import unittest

# logging.basicConfig(level=logging.DEBUG)

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')
import canopen
from .util import SAMPLE_EDS


class TestSDO(unittest.TestCase):
Expand All @@ -18,15 +14,15 @@ class TestSDO(unittest.TestCase):
def setUpClass(cls):
cls.network1 = canopen.Network()
cls.network1.connect("test", interface="virtual")
cls.remote_node = cls.network1.add_node(2, EDS_PATH)
cls.remote_node = cls.network1.add_node(2, SAMPLE_EDS)

cls.network2 = canopen.Network()
cls.network2.connect("test", interface="virtual")
cls.local_node = cls.network2.create_node(2, EDS_PATH)
cls.local_node = cls.network2.create_node(2, SAMPLE_EDS)

cls.remote_node2 = cls.network1.add_node(3, EDS_PATH)
cls.remote_node2 = cls.network1.add_node(3, SAMPLE_EDS)

cls.local_node2 = cls.network2.create_node(3, EDS_PATH)
cls.local_node2 = cls.network2.create_node(3, SAMPLE_EDS)

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -172,68 +168,6 @@ def test_callbacks(self):
self.assertEqual(self._kwargs["data"], b"\x03\x04")


class TestNMT(unittest.TestCase):
"""
Test NMT slave.
"""

@classmethod
def setUpClass(cls):
cls.network1 = canopen.Network()
cls.network1.connect("test", interface="virtual")
cls.remote_node = cls.network1.add_node(2, EDS_PATH)

cls.network2 = canopen.Network()
cls.network2.connect("test", interface="virtual")
cls.local_node = cls.network2.create_node(2, EDS_PATH)

cls.remote_node2 = cls.network1.add_node(3, EDS_PATH)

cls.local_node2 = cls.network2.create_node(3, EDS_PATH)

@classmethod
def tearDownClass(cls):
cls.network1.disconnect()
cls.network2.disconnect()

def test_start_two_remote_nodes(self):
self.remote_node.nmt.state = 'OPERATIONAL'
# Line below is just so that we are sure the client have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node.nmt.state
self.assertEqual(slave_state, 'OPERATIONAL')

self.remote_node2.nmt.state = 'OPERATIONAL'
# Line below is just so that we are sure the client have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node2.nmt.state
self.assertEqual(slave_state, 'OPERATIONAL')

def test_stop_two_remote_nodes_using_broadcast(self):
# This is a NMT broadcast "Stop remote node"
# ie. set the node in STOPPED state
self.network1.send_message(0, [2, 0])

# Line below is just so that we are sure the slaves have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node.nmt.state
self.assertEqual(slave_state, 'STOPPED')
slave_state = self.local_node2.nmt.state
self.assertEqual(slave_state, 'STOPPED')

def test_heartbeat(self):
# self.assertEqual(self.remote_node.nmt.state, 'INITIALISING')
# self.assertEqual(self.local_node.nmt.state, 'INITIALISING')
self.local_node.nmt.state = 'OPERATIONAL'
self.local_node.sdo[0x1017].raw = 100
time.sleep(0.2)
self.assertEqual(self.remote_node.nmt.state, 'OPERATIONAL')

self.local_node.nmt.stop_heartbeat()

class TestPDO(unittest.TestCase):
"""
Test PDO slave.
Expand All @@ -243,11 +177,11 @@ class TestPDO(unittest.TestCase):
def setUpClass(cls):
cls.network1 = canopen.Network()
cls.network1.connect("test", interface="virtual")
cls.remote_node = cls.network1.add_node(2, EDS_PATH)
cls.remote_node = cls.network1.add_node(2, SAMPLE_EDS)

cls.network2 = canopen.Network()
cls.network2.connect("test", interface="virtual")
cls.local_node = cls.network2.create_node(2, EDS_PATH)
cls.local_node = cls.network2.create_node(2, SAMPLE_EDS)

@classmethod
def tearDownClass(cls):
Expand Down
8 changes: 3 additions & 5 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import time
import os
import unittest
import canopen

import canopen
import can

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')
from .util import SAMPLE_EDS


class TestNetwork(unittest.TestCase):

def setUp(self):
network = canopen.Network()
network.add_node(2, EDS_PATH)
network.add_node(2, SAMPLE_EDS)
network.add_node(3, network[2].object_dictionary)
self.network = network

Expand Down
64 changes: 64 additions & 0 deletions test/test_nmt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import time
import unittest

import canopen
from .util import SAMPLE_EDS


class TestNmtSlave(unittest.TestCase):
def setUp(self):
self.network1 = canopen.Network()
self.network1.connect("test", interface="virtual")
self.remote_node = self.network1.add_node(2, SAMPLE_EDS)

self.network2 = canopen.Network()
self.network2.connect("test", interface="virtual")
self.local_node = self.network2.create_node(2, SAMPLE_EDS)
self.remote_node2 = self.network1.add_node(3, SAMPLE_EDS)
self.local_node2 = self.network2.create_node(3, SAMPLE_EDS)

def tearDown(self):
self.network1.disconnect()
self.network2.disconnect()

def test_start_two_remote_nodes(self):
self.remote_node.nmt.state = "OPERATIONAL"
# Line below is just so that we are sure the client have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node.nmt.state
self.assertEqual(slave_state, "OPERATIONAL")

self.remote_node2.nmt.state = "OPERATIONAL"
# Line below is just so that we are sure the client have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node2.nmt.state
self.assertEqual(slave_state, "OPERATIONAL")

def test_stop_two_remote_nodes_using_broadcast(self):
# This is a NMT broadcast "Stop remote node"
# ie. set the node in STOPPED state
self.network1.send_message(0, [2, 0])

# Line below is just so that we are sure the slaves have received the command
# before we do the check
time.sleep(0.1)
slave_state = self.local_node.nmt.state
self.assertEqual(slave_state, "STOPPED")
slave_state = self.local_node2.nmt.state
self.assertEqual(slave_state, "STOPPED")

def test_heartbeat(self):
self.assertEqual(self.remote_node.nmt.state, "INITIALISING")
self.assertEqual(self.local_node.nmt.state, "INITIALISING")
self.local_node.nmt.state = "OPERATIONAL"
self.local_node.sdo[0x1017].raw = 100
time.sleep(0.2)
self.assertEqual(self.remote_node.nmt.state, "OPERATIONAL")

self.local_node.nmt.stop_heartbeat()


if __name__ == "__main__":
unittest.main()
9 changes: 4 additions & 5 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import os.path
import unittest
import canopen

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')
import canopen
from .util import SAMPLE_EDS


class TestPDO(unittest.TestCase):

def test_bit_mapping(self):
node = canopen.Node(1, EDS_PATH)
node = canopen.Node(1, SAMPLE_EDS)
map = node.pdo.tx[1]
map.add_variable('INTEGER16 value') # 0x2001
map.add_variable('UNSIGNED8 value', length=4) # 0x2002
Expand Down Expand Up @@ -56,7 +55,7 @@ def test_bit_mapping(self):
self.assertEqual(node.pdo[0x1600][0x2002].raw, 0xf)

def test_save_pdo(self):
node = canopen.Node(1, EDS_PATH)
node = canopen.Node(1, SAMPLE_EDS)
node.tpdo.save()
node.rpdo.save()

Expand Down
16 changes: 5 additions & 11 deletions test/test_sdo.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import os
import unittest
# import binascii

import canopen
from canopen.objectdictionary import ODVariable
import canopen.objectdictionary.datatypes as dt
from canopen.objectdictionary import ODVariable
from .util import SAMPLE_EDS, DATATYPES_EDS

EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds')
DATAEDS_PATH = os.path.join(os.path.dirname(__file__), 'datatypes.eds')

TX = 1
RX = 2
Expand All @@ -26,17 +24,15 @@ def _send_message(self, can_id, data, remote=False):
"""
next_data = self.data.pop(0)
self.assertEqual(next_data[0], TX, "No transmission was expected")
# print(f"> {binascii.hexlify(data)} ({binascii.hexlify(next_data[1])})")
self.assertSequenceEqual(data, next_data[1])
self.assertEqual(can_id, 0x602)
while self.data and self.data[0][0] == RX:
# print(f"< {binascii.hexlify(self.data[0][1])}")
self.network.notify(0x582, self.data.pop(0)[1], 0.0)

def setUp(self):
network = canopen.Network()
network.send_message = self._send_message
node = network.add_node(2, EDS_PATH)
node = network.add_node(2, SAMPLE_EDS)
node.sdo.RESPONSE_TIMEOUT = 0.01
self.network = network

Expand Down Expand Up @@ -178,17 +174,15 @@ def _send_message(self, can_id, data, remote=False):
"""
next_data = self.data.pop(0)
self.assertEqual(next_data[0], TX, "No transmission was expected")
# print("> %s (%s)" % (binascii.hexlify(data), binascii.hexlify(next_data[1])))
self.assertSequenceEqual(data, next_data[1])
self.assertEqual(can_id, 0x602)
while self.data and self.data[0][0] == RX:
# print("< %s" % binascii.hexlify(self.data[0][1]))
self.network.notify(0x582, self.data.pop(0)[1], 0.0)

def setUp(self):
network = canopen.Network()
network.send_message = self._send_message
node = network.add_node(2, DATAEDS_PATH)
node = network.add_node(2, DATATYPES_EDS)
node.sdo.RESPONSE_TIMEOUT = 0.01
self.node = node
self.network = network
Expand Down
5 changes: 5 additions & 0 deletions test/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import os


DATATYPES_EDS = os.path.join(os.path.dirname(__file__), "datatypes.eds")
SAMPLE_EDS = os.path.join(os.path.dirname(__file__), "sample.eds")

0 comments on commit 08eba81

Please sign in to comment.