diff --git a/requirements_test.txt b/requirements_test.txt index 82c2e0e..85c2944 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -2,4 +2,5 @@ pytest>=7.3.1 pytest-asyncio>=0.21.0 pytest-timeout>=2.1.0 pytest-mock>=3.10.0 -pytest-cov>=4.1.0 \ No newline at end of file +pytest-cov>=4.1.0 +flake8==5.0.4 \ No newline at end of file diff --git a/tests/test_uart.py b/tests/test_uart.py index 7dea5c2..2bc35b5 100644 --- a/tests/test_uart.py +++ b/tests/test_uart.py @@ -56,7 +56,9 @@ def create_serial_conn(loop, protocol_factory, url, *args, **kwargs): return fut - mocker.patch("serial_asyncio.create_serial_connection", new=create_serial_conn) + mocker.patch( + "serial_asyncio.create_serial_connection", new=create_serial_conn + ) return device, serial_interface @@ -67,7 +69,9 @@ def test_uart_rx_basic(connected_uart): test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10) test_frame = test_command.to_frame() test_frame = ll_checksum(test_frame) - test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize() + test_frame_bytes = Frame( + test_frame.ll_header, test_frame.hl_packet + ).serialize() uart.data_received(test_frame_bytes) @@ -87,7 +91,9 @@ def test_uart_rx_byte_by_byte(connected_uart): test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10) test_frame = test_command.to_frame() test_frame = ll_checksum(test_frame) - test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize() + test_frame_bytes = Frame( + test_frame.ll_header, test_frame.hl_packet + ).serialize() for byte in test_frame_bytes: uart.data_received(bytes([byte])) @@ -101,7 +107,9 @@ def test_uart_rx_byte_by_byte_garbage(connected_uart): test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10) test_frame = test_command.to_frame() test_frame = ll_checksum(test_frame) - test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize() + test_frame_bytes = Frame( + test_frame.ll_header, test_frame.hl_packet + ).serialize() data = b"" data += bytes.fromhex("58 4a 72 35 51 da 60 ed 1f") @@ -125,7 +133,9 @@ def test_uart_rx_big_garbage(connected_uart): test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10) test_frame = test_command.to_frame() test_frame = ll_checksum(test_frame) - test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize() + test_frame_bytes = Frame( + test_frame.ll_header, test_frame.hl_packet + ).serialize() data = b"" data += bytes.fromhex("58 4a 72 35 51 da 60 ed 1f") @@ -148,7 +158,9 @@ def test_uart_rx_corrupted_fcs(connected_uart): test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10) test_frame = test_command.to_frame() test_frame = ll_checksum(test_frame) - test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize() + test_frame_bytes = Frame( + test_frame.ll_header, test_frame.hl_packet + ).serialize() # Almost, but not quite uart.data_received(test_frame_bytes[:-1]) @@ -163,12 +175,18 @@ def test_uart_rx_sof_stress(connected_uart): test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10) test_frame = test_command.to_frame() test_frame = ll_checksum(test_frame) - test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize() + test_frame_bytes = Frame( + test_frame.ll_header, test_frame.hl_packet + ).serialize() # We include an almost-valid frame and many stray SoF markers - uart.data_received(b"\xFE" + b"\xFE" + b"\xFE" + test_frame_bytes[:-1] + b"\x00") + uart.data_received( + b"\xFE" + b"\xFE" + b"\xFE" + test_frame_bytes[:-1] + b"\x00" + ) uart.data_received(b"\xFE\xFE\x00\xFE\x01") - uart.data_received(b"\xFE" + b"\xFE" + b"\xFE" + test_frame_bytes + b"\x00\x00") + uart.data_received( + b"\xFE" + b"\xFE" + b"\xFE" + test_frame_bytes + b"\x00\x00" + ) # We should see the valid frame exactly once znp.frame_received.assert_called_once_with(test_frame) @@ -181,14 +199,18 @@ def test_uart_frame_received_error(connected_uart, mocker): test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10) test_frame = test_command.to_frame() test_frame = ll_checksum(test_frame) - test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize() + test_frame_bytes = Frame( + test_frame.ll_header, test_frame.hl_packet + ).serialize() - # Errors thrown by znp.frame_received should not impact how many frames are handled + # Errors thrown by znp.frame_received should + # not impact how many frames are handled uart.data_received(test_frame_bytes * 3) # We should have received all three frames assert znp.frame_received.call_count == 3 + @pytest.mark.asyncio async def test_connection_lost(dummy_serial_conn, mocker, event_loop): device, _ = dummy_serial_conn @@ -207,11 +229,14 @@ async def test_connection_lost(dummy_serial_conn, mocker, event_loop): # Losing a connection propagates up to the ZNP object assert (await conn_lost_fut) == exception + @pytest.mark.asyncio async def test_connection_made(dummy_serial_conn, mocker): device, _ = dummy_serial_conn znp = mocker.Mock() - await znp_uart.connect(conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp) + await znp_uart.connect( + conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp + ) - znp.connection_made.assert_called_once_with() \ No newline at end of file + znp.connection_made.assert_called_once_with()