Skip to content

Commit

Permalink
add flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
genesiscrew committed May 10, 2024
1 parent a339789 commit 3f629f1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
3 changes: 2 additions & 1 deletion requirements_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pytest>=7.3.1
pytest-asyncio>=0.21.0
pytest-timeout>=2.1.0
pytest-mock>=3.10.0
pytest-cov>=4.1.0
pytest-cov>=4.1.0
flake8==5.0.4
51 changes: 38 additions & 13 deletions tests/test_uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def create_serial_conn(loop, protocol_factory, url, *args, **kwargs):

return fut

mocker.patch("serial_asyncio.create_serial_connection", new=create_serial_conn)
mocker.patch(
"serial_asyncio.create_serial_connection", new=create_serial_conn
)

return device, serial_interface

Expand All @@ -67,7 +69,9 @@ def test_uart_rx_basic(connected_uart):
test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10)
test_frame = test_command.to_frame()
test_frame = ll_checksum(test_frame)
test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize()
test_frame_bytes = Frame(
test_frame.ll_header, test_frame.hl_packet
).serialize()

uart.data_received(test_frame_bytes)

Expand All @@ -87,7 +91,9 @@ def test_uart_rx_byte_by_byte(connected_uart):
test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10)
test_frame = test_command.to_frame()
test_frame = ll_checksum(test_frame)
test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize()
test_frame_bytes = Frame(
test_frame.ll_header, test_frame.hl_packet
).serialize()

for byte in test_frame_bytes:
uart.data_received(bytes([byte]))
Expand All @@ -101,7 +107,9 @@ def test_uart_rx_byte_by_byte_garbage(connected_uart):
test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10)
test_frame = test_command.to_frame()
test_frame = ll_checksum(test_frame)
test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize()
test_frame_bytes = Frame(
test_frame.ll_header, test_frame.hl_packet
).serialize()

data = b""
data += bytes.fromhex("58 4a 72 35 51 da 60 ed 1f")
Expand All @@ -125,7 +133,9 @@ def test_uart_rx_big_garbage(connected_uart):
test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10)
test_frame = test_command.to_frame()
test_frame = ll_checksum(test_frame)
test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize()
test_frame_bytes = Frame(
test_frame.ll_header, test_frame.hl_packet
).serialize()

data = b""
data += bytes.fromhex("58 4a 72 35 51 da 60 ed 1f")
Expand All @@ -148,7 +158,9 @@ def test_uart_rx_corrupted_fcs(connected_uart):
test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10)
test_frame = test_command.to_frame()
test_frame = ll_checksum(test_frame)
test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize()
test_frame_bytes = Frame(
test_frame.ll_header, test_frame.hl_packet
).serialize()

# Almost, but not quite
uart.data_received(test_frame_bytes[:-1])
Expand All @@ -163,12 +175,18 @@ def test_uart_rx_sof_stress(connected_uart):
test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10)
test_frame = test_command.to_frame()
test_frame = ll_checksum(test_frame)
test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize()
test_frame_bytes = Frame(
test_frame.ll_header, test_frame.hl_packet
).serialize()

# We include an almost-valid frame and many stray SoF markers
uart.data_received(b"\xFE" + b"\xFE" + b"\xFE" + test_frame_bytes[:-1] + b"\x00")
uart.data_received(
b"\xFE" + b"\xFE" + b"\xFE" + test_frame_bytes[:-1] + b"\x00"
)
uart.data_received(b"\xFE\xFE\x00\xFE\x01")
uart.data_received(b"\xFE" + b"\xFE" + b"\xFE" + test_frame_bytes + b"\x00\x00")
uart.data_received(
b"\xFE" + b"\xFE" + b"\xFE" + test_frame_bytes + b"\x00\x00"
)

# We should see the valid frame exactly once
znp.frame_received.assert_called_once_with(test_frame)
Expand All @@ -181,14 +199,18 @@ def test_uart_frame_received_error(connected_uart, mocker):
test_command = c.NcpConfig.GetModuleVersion.Req(TSN=10)
test_frame = test_command.to_frame()
test_frame = ll_checksum(test_frame)
test_frame_bytes = Frame(test_frame.ll_header, test_frame.hl_packet).serialize()
test_frame_bytes = Frame(
test_frame.ll_header, test_frame.hl_packet
).serialize()

# Errors thrown by znp.frame_received should not impact how many frames are handled
# Errors thrown by znp.frame_received should
# not impact how many frames are handled
uart.data_received(test_frame_bytes * 3)

# We should have received all three frames
assert znp.frame_received.call_count == 3


@pytest.mark.asyncio
async def test_connection_lost(dummy_serial_conn, mocker, event_loop):
device, _ = dummy_serial_conn
Expand All @@ -207,11 +229,14 @@ async def test_connection_lost(dummy_serial_conn, mocker, event_loop):
# Losing a connection propagates up to the ZNP object
assert (await conn_lost_fut) == exception


@pytest.mark.asyncio
async def test_connection_made(dummy_serial_conn, mocker):
device, _ = dummy_serial_conn
znp = mocker.Mock()

await znp_uart.connect(conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp)
await znp_uart.connect(
conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp
)

znp.connection_made.assert_called_once_with()
znp.connection_made.assert_called_once_with()

0 comments on commit 3f629f1

Please sign in to comment.