Skip to content

Commit

Permalink
refactor filter parsing, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zariiii9003 committed Jun 24, 2024
1 parent 2c90f9f commit 85910bc
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 81 deletions.
75 changes: 51 additions & 24 deletions can/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,26 @@
import re
import sys
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Sequence, Tuple, Union
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Sequence,
Tuple,
Union,
)

import can
from can import Bus, BusState, Logger, SizedRotatingLogger
from can.typechecking import TAdditionalCliArgs
from can.util import cast_from_string

from . import Bus, BusState, Logger, SizedRotatingLogger
from .typechecking import CanFilter, CanFilters

if TYPE_CHECKING:
from can.io import BaseRotatingLogger
from can.io.generic import MessageWriter
from can.typechecking import CanFilter


def _create_base_argument_parser(parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -60,10 +69,7 @@ def _create_base_argument_parser(parser: argparse.ArgumentParser) -> None:


def _append_filter_argument(
parser: Union[
argparse.ArgumentParser,
argparse._ArgumentGroup,
],
parser: Union[argparse.ArgumentParser, argparse._ArgumentGroup],
*args: str,
**kwargs: Any,
) -> None:
Expand All @@ -78,16 +84,17 @@ def _append_filter_argument(
"\n <can_id>~<can_mask> (matches when <received_can_id> & mask !="
" can_id & mask)"
"\nFx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:"
"\n python -m can.viewer -f 100:7FC 200:7F0"
"\n python -m can.viewer --filter 100:7FC 200:7F0"
"\nNote that the ID and mask are always interpreted as hex values",
metavar="{<can_id>:<can_mask>,<can_id>~<can_mask>}",
nargs=argparse.ONE_OR_MORE,
default="",
action=_CanFilterAction,
dest="can_filters",
**kwargs,
)


def _create_bus(parsed_args: Any, **kwargs: Any) -> can.BusABC:
def _create_bus(parsed_args: argparse.Namespace, **kwargs: Any) -> can.BusABC:
logging_level_names = ["critical", "error", "warning", "info", "debug", "subdebug"]
can.set_logging_level(logging_level_names[min(5, parsed_args.verbosity)])

Expand All @@ -100,16 +107,27 @@ def _create_bus(parsed_args: Any, **kwargs: Any) -> can.BusABC:
config["fd"] = True
if parsed_args.data_bitrate:
config["data_bitrate"] = parsed_args.data_bitrate
if getattr(parsed_args, "can_filters", None):
config["can_filters"] = parsed_args.can_filters

return Bus(parsed_args.channel, **config)


def _parse_filters(parsed_args: Any) -> CanFilters:
can_filters: List[CanFilter] = []
class _CanFilterAction(argparse.Action):
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Union[str, Sequence[Any], None],
option_string: Optional[str] = None,
) -> None:
if not isinstance(values, list):
raise argparse.ArgumentError(None, "Invalid filter argument")

print(f"Adding filter(s): {values}")
can_filters: List[CanFilter] = []

if parsed_args.filter:
print(f"Adding filter(s): {parsed_args.filter}")
for filt in parsed_args.filter:
for filt in values:
if ":" in filt:
parts = filt.split(":")
can_id = int(parts[0], base=16)
Expand All @@ -122,12 +140,10 @@ def _parse_filters(parsed_args: Any) -> CanFilters:
raise argparse.ArgumentError(None, "Invalid filter argument")
can_filters.append({"can_id": can_id, "can_mask": can_mask})

return can_filters
setattr(namespace, self.dest, can_filters)


def _parse_additional_config(
unknown_args: Sequence[str],
) -> Dict[str, Union[str, int, float, bool]]:
def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs:
for arg in unknown_args:
if not re.match(r"^--[a-zA-Z\-]*?=\S*?$", arg):
raise ValueError(f"Parsing argument {arg} failed")
Expand All @@ -142,12 +158,18 @@ def _split_arg(_arg: str) -> Tuple[str, str]:
return args


def main() -> None:
def _parse_logger_args(
args: List[str],
) -> Tuple[argparse.Namespace, TAdditionalCliArgs]:
"""Parse command line arguments for logger script."""

parser = argparse.ArgumentParser(
description="Log CAN traffic, printing messages to stdout or to a "
"given file.",
)

# Generate the standard arguments:
# Channel, bitrate, data_bitrate, interface, app_name, CAN-FD support
_create_base_argument_parser(parser)

parser.add_argument(
Expand Down Expand Up @@ -200,13 +222,18 @@ def main() -> None:
)

# print help message when no arguments were given
if len(sys.argv) < 2:
if not args:
parser.print_help(sys.stderr)
raise SystemExit(errno.EINVAL)

results, unknown_args = parser.parse_known_args()
results, unknown_args = parser.parse_known_args(args)
additional_config = _parse_additional_config([*results.extra_args, *unknown_args])
bus = _create_bus(results, can_filters=_parse_filters(results), **additional_config)
return results, additional_config


def main() -> None:
results, additional_config = _parse_logger_args(sys.argv[1:])
bus = _create_bus(results, **additional_config)

if results.active:
bus.state = BusState.ACTIVE
Expand Down
15 changes: 15 additions & 0 deletions can/typechecking.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@
"""

import gzip
import struct
import sys
import typing

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias


if typing.TYPE_CHECKING:
import os

Expand Down Expand Up @@ -40,6 +48,13 @@ class CanFilterExtended(typing.TypedDict):

BusConfig = typing.NewType("BusConfig", typing.Dict[str, typing.Any])

# Used by CLI scripts
TAdditionalCliArgs: TypeAlias = typing.Dict[str, typing.Union[str, int, float, bool]]
TDataStructs: TypeAlias = typing.Dict[
typing.Union[int, typing.Tuple[int, ...]],
typing.Union[struct.Struct, typing.Tuple, None],
]


class AutoDetectedConfig(typing.TypedDict):
interface: str
Expand Down
25 changes: 9 additions & 16 deletions can/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@
import struct
import sys
import time
from typing import Dict, List, Tuple, Union
from typing import Dict, List, Tuple

from can import __version__

from .logger import (
from can.logger import (
_append_filter_argument,
_create_base_argument_parser,
_create_bus,
_parse_additional_config,
_parse_filters,
)
from can.typechecking import TAdditionalCliArgs, TDataStructs

logger = logging.getLogger("can.viewer")

Expand Down Expand Up @@ -391,7 +390,9 @@ def _fill_text(self, text, width, indent):
return super()._fill_text(text, width, indent)


def parse_args(args: List[str]) -> Tuple:
def _parse_viewer_args(
args: List[str],
) -> Tuple[argparse.Namespace, TDataStructs, TAdditionalCliArgs]:
# Parse command line arguments
parser = argparse.ArgumentParser(
"python -m can.viewer",
Expand Down Expand Up @@ -489,8 +490,6 @@ def parse_args(args: List[str]) -> Tuple:

parsed_args, unknown_args = parser.parse_known_args(args)

can_filters = _parse_filters(parsed_args)

# Dictionary used to convert between Python values and C structs represented as Python strings.
# If the value is 'None' then the message does not contain any data package.
#
Expand All @@ -511,9 +510,7 @@ def parse_args(args: List[str]) -> Tuple:
# similarly the values
# are divided by the value in order to convert from real units to raw integer values.

data_structs: Dict[
Union[int, Tuple[int, ...]], Union[struct.Struct, Tuple, None]
] = {}
data_structs: TDataStructs = {}
if parsed_args.decode:
if os.path.isfile(parsed_args.decode[0]):
with open(parsed_args.decode[0], encoding="utf-8") as f:
Expand Down Expand Up @@ -544,16 +541,12 @@ def parse_args(args: List[str]) -> Tuple:
additional_config = _parse_additional_config(
[*parsed_args.extra_args, *unknown_args]
)
return parsed_args, can_filters, data_structs, additional_config
return parsed_args, data_structs, additional_config


def main() -> None:
parsed_args, can_filters, data_structs, additional_config = parse_args(sys.argv[1:])

if can_filters:
additional_config.update({"can_filters": can_filters})
parsed_args, data_structs, additional_config = _parse_viewer_args(sys.argv[1:])
bus = _create_bus(parsed_args, **additional_config)

curses.wrapper(CanViewer, bus, data_structs) # type: ignore[attr-defined,unused-ignore]


Expand Down
35 changes: 33 additions & 2 deletions test/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import can
import can.logger

from .config import *


class TestLoggerScriptModule(unittest.TestCase):
def setUp(self) -> None:
Expand Down Expand Up @@ -108,6 +106,39 @@ def test_log_virtual_sizedlogger(self):
self.assertSuccessfullCleanup()
self.mock_logger_sized.assert_called_once()

def test_parse_logger_args(self):
args = self.baseargs + [
"--bitrate",
"250000",
"--fd",
"--data_bitrate",
"2000000",
"--receive-own-messages=True",
]
results, additional_config = can.logger._parse_logger_args(args[1:])
assert results.interface == "virtual"
assert results.bitrate == 250_000
assert results.fd is True
assert results.data_bitrate == 2_000_000
assert additional_config["receive_own_messages"] is True

def test_parse_can_filters(self):
expected_can_filters = [{"can_id": 0x100, "can_mask": 0x7FC}]
results, additional_config = can.logger._parse_logger_args(
["--filter", "100:7FC", "--bitrate", "250000"]
)
assert results.can_filters == expected_can_filters

def test_parse_can_filters_list(self):
expected_can_filters = [
{"can_id": 0x100, "can_mask": 0x7FC},
{"can_id": 0x200, "can_mask": 0x7F0},
]
results, additional_config = can.logger._parse_logger_args(
["--filter", "100:7FC", "200:7F0", "--bitrate", "250000"]
)
assert results.can_filters == expected_can_filters

def test_parse_additional_config(self):
unknown_args = [
"--app-name=CANalyzer",
Expand Down
Loading

0 comments on commit 85910bc

Please sign in to comment.