Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CLI filter parsing, add tests #1805

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 51 additions & 24 deletions can/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,26 @@
import re
import sys
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Sequence, Tuple, Union
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Sequence,
Tuple,
Union,
)

import can
from can import Bus, BusState, Logger, SizedRotatingLogger
from can.typechecking import TAdditionalCliArgs
from can.util import cast_from_string

from . import Bus, BusState, Logger, SizedRotatingLogger
from .typechecking import CanFilter, CanFilters

if TYPE_CHECKING:
from can.io import BaseRotatingLogger
from can.io.generic import MessageWriter
from can.typechecking import CanFilter


def _create_base_argument_parser(parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -60,10 +69,7 @@ def _create_base_argument_parser(parser: argparse.ArgumentParser) -> None:


def _append_filter_argument(
parser: Union[
argparse.ArgumentParser,
argparse._ArgumentGroup,
],
parser: Union[argparse.ArgumentParser, argparse._ArgumentGroup],
*args: str,
**kwargs: Any,
) -> None:
Expand All @@ -78,16 +84,17 @@ def _append_filter_argument(
"\n <can_id>~<can_mask> (matches when <received_can_id> & mask !="
" can_id & mask)"
"\nFx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:"
"\n python -m can.viewer -f 100:7FC 200:7F0"
"\n python -m can.viewer --filter 100:7FC 200:7F0"
"\nNote that the ID and mask are always interpreted as hex values",
metavar="{<can_id>:<can_mask>,<can_id>~<can_mask>}",
nargs=argparse.ONE_OR_MORE,
default="",
action=_CanFilterAction,
dest="can_filters",
**kwargs,
)


def _create_bus(parsed_args: Any, **kwargs: Any) -> can.BusABC:
def _create_bus(parsed_args: argparse.Namespace, **kwargs: Any) -> can.BusABC:
logging_level_names = ["critical", "error", "warning", "info", "debug", "subdebug"]
can.set_logging_level(logging_level_names[min(5, parsed_args.verbosity)])

Expand All @@ -100,16 +107,27 @@ def _create_bus(parsed_args: Any, **kwargs: Any) -> can.BusABC:
config["fd"] = True
if parsed_args.data_bitrate:
config["data_bitrate"] = parsed_args.data_bitrate
if getattr(parsed_args, "can_filters", None):
config["can_filters"] = parsed_args.can_filters

return Bus(parsed_args.channel, **config)


def _parse_filters(parsed_args: Any) -> CanFilters:
can_filters: List[CanFilter] = []
class _CanFilterAction(argparse.Action):
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Union[str, Sequence[Any], None],
option_string: Optional[str] = None,
) -> None:
if not isinstance(values, list):
raise argparse.ArgumentError(None, "Invalid filter argument")

print(f"Adding filter(s): {values}")
can_filters: List[CanFilter] = []

if parsed_args.filter:
print(f"Adding filter(s): {parsed_args.filter}")
for filt in parsed_args.filter:
for filt in values:
if ":" in filt:
parts = filt.split(":")
can_id = int(parts[0], base=16)
Expand All @@ -122,12 +140,10 @@ def _parse_filters(parsed_args: Any) -> CanFilters:
raise argparse.ArgumentError(None, "Invalid filter argument")
can_filters.append({"can_id": can_id, "can_mask": can_mask})

return can_filters
setattr(namespace, self.dest, can_filters)


def _parse_additional_config(
unknown_args: Sequence[str],
) -> Dict[str, Union[str, int, float, bool]]:
def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs:
for arg in unknown_args:
if not re.match(r"^--[a-zA-Z\-]*?=\S*?$", arg):
raise ValueError(f"Parsing argument {arg} failed")
Expand All @@ -142,12 +158,18 @@ def _split_arg(_arg: str) -> Tuple[str, str]:
return args


def main() -> None:
def _parse_logger_args(
args: List[str],
) -> Tuple[argparse.Namespace, TAdditionalCliArgs]:
"""Parse command line arguments for logger script."""

parser = argparse.ArgumentParser(
description="Log CAN traffic, printing messages to stdout or to a "
"given file.",
)

# Generate the standard arguments:
# Channel, bitrate, data_bitrate, interface, app_name, CAN-FD support
_create_base_argument_parser(parser)

parser.add_argument(
Expand Down Expand Up @@ -200,13 +222,18 @@ def main() -> None:
)

# print help message when no arguments were given
if len(sys.argv) < 2:
if not args:
parser.print_help(sys.stderr)
raise SystemExit(errno.EINVAL)

results, unknown_args = parser.parse_known_args()
results, unknown_args = parser.parse_known_args(args)
additional_config = _parse_additional_config([*results.extra_args, *unknown_args])
bus = _create_bus(results, can_filters=_parse_filters(results), **additional_config)
return results, additional_config


def main() -> None:
results, additional_config = _parse_logger_args(sys.argv[1:])
bus = _create_bus(results, **additional_config)

if results.active:
bus.state = BusState.ACTIVE
Expand Down
15 changes: 15 additions & 0 deletions can/typechecking.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@
"""

import gzip
import struct
import sys
import typing

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias


if typing.TYPE_CHECKING:
import os

Expand Down Expand Up @@ -40,6 +48,13 @@ class CanFilterExtended(typing.TypedDict):

BusConfig = typing.NewType("BusConfig", typing.Dict[str, typing.Any])

# Used by CLI scripts
TAdditionalCliArgs: TypeAlias = typing.Dict[str, typing.Union[str, int, float, bool]]
TDataStructs: TypeAlias = typing.Dict[
typing.Union[int, typing.Tuple[int, ...]],
typing.Union[struct.Struct, typing.Tuple, None],
]


class AutoDetectedConfig(typing.TypedDict):
interface: str
Expand Down
25 changes: 9 additions & 16 deletions can/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@
import struct
import sys
import time
from typing import Dict, List, Tuple, Union
from typing import Dict, List, Tuple

from can import __version__

from .logger import (
from can.logger import (
_append_filter_argument,
_create_base_argument_parser,
_create_bus,
_parse_additional_config,
_parse_filters,
)
from can.typechecking import TAdditionalCliArgs, TDataStructs

logger = logging.getLogger("can.viewer")

Expand Down Expand Up @@ -391,7 +390,9 @@ def _fill_text(self, text, width, indent):
return super()._fill_text(text, width, indent)


def parse_args(args: List[str]) -> Tuple:
def _parse_viewer_args(
args: List[str],
) -> Tuple[argparse.Namespace, TDataStructs, TAdditionalCliArgs]:
# Parse command line arguments
parser = argparse.ArgumentParser(
"python -m can.viewer",
Expand Down Expand Up @@ -489,8 +490,6 @@ def parse_args(args: List[str]) -> Tuple:

parsed_args, unknown_args = parser.parse_known_args(args)

can_filters = _parse_filters(parsed_args)

# Dictionary used to convert between Python values and C structs represented as Python strings.
# If the value is 'None' then the message does not contain any data package.
#
Expand All @@ -511,9 +510,7 @@ def parse_args(args: List[str]) -> Tuple:
# similarly the values
# are divided by the value in order to convert from real units to raw integer values.

data_structs: Dict[
Union[int, Tuple[int, ...]], Union[struct.Struct, Tuple, None]
] = {}
data_structs: TDataStructs = {}
if parsed_args.decode:
if os.path.isfile(parsed_args.decode[0]):
with open(parsed_args.decode[0], encoding="utf-8") as f:
Expand Down Expand Up @@ -544,16 +541,12 @@ def parse_args(args: List[str]) -> Tuple:
additional_config = _parse_additional_config(
[*parsed_args.extra_args, *unknown_args]
)
return parsed_args, can_filters, data_structs, additional_config
return parsed_args, data_structs, additional_config


def main() -> None:
parsed_args, can_filters, data_structs, additional_config = parse_args(sys.argv[1:])

if can_filters:
additional_config.update({"can_filters": can_filters})
parsed_args, data_structs, additional_config = _parse_viewer_args(sys.argv[1:])
bus = _create_bus(parsed_args, **additional_config)

curses.wrapper(CanViewer, bus, data_structs) # type: ignore[attr-defined,unused-ignore]


Expand Down
2 changes: 1 addition & 1 deletion doc/other-tools.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Other CAN bus tools
Other CAN Bus Tools
===================

In order to keep the project maintainable, the scope of the package is limited to providing common
Expand Down
4 changes: 2 additions & 2 deletions doc/scripts.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Scripts
=======
Command Line Tools
==================

The following modules are callable from ``python-can``.

Expand Down
35 changes: 33 additions & 2 deletions test/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import can
import can.logger

from .config import *


class TestLoggerScriptModule(unittest.TestCase):
def setUp(self) -> None:
Expand Down Expand Up @@ -108,6 +106,39 @@ def test_log_virtual_sizedlogger(self):
self.assertSuccessfullCleanup()
self.mock_logger_sized.assert_called_once()

def test_parse_logger_args(self):
args = self.baseargs + [
"--bitrate",
"250000",
"--fd",
"--data_bitrate",
"2000000",
"--receive-own-messages=True",
]
results, additional_config = can.logger._parse_logger_args(args[1:])
assert results.interface == "virtual"
assert results.bitrate == 250_000
assert results.fd is True
assert results.data_bitrate == 2_000_000
assert additional_config["receive_own_messages"] is True

def test_parse_can_filters(self):
expected_can_filters = [{"can_id": 0x100, "can_mask": 0x7FC}]
results, additional_config = can.logger._parse_logger_args(
["--filter", "100:7FC", "--bitrate", "250000"]
)
assert results.can_filters == expected_can_filters

def test_parse_can_filters_list(self):
expected_can_filters = [
{"can_id": 0x100, "can_mask": 0x7FC},
{"can_id": 0x200, "can_mask": 0x7F0},
]
results, additional_config = can.logger._parse_logger_args(
["--filter", "100:7FC", "200:7F0", "--bitrate", "250000"]
)
assert results.can_filters == expected_can_filters

def test_parse_additional_config(self):
unknown_args = [
"--app-name=CANalyzer",
Expand Down
Loading