From cfffceb1c8df66328aa2866090d883fc0ccbec66 Mon Sep 17 00:00:00 2001 From: Marco Zuppone Date: Fri, 16 Dec 2022 12:54:15 +0000 Subject: [PATCH 1/3] Devtest (#4) * type decorations added * initial commit * get_action_command_message introduced to decode the action or command from the audit entry * decoded action/command message added to syslog_entry * changed --forever option, fixed some bugs * decoding login about action and fraud codes amended to adhere more strictly with manuals * decoding login about action and fraud codes amended to adhere more strictly with manuals and version increased Co-authored-by: Marco Simone Zuppone --- .gitignore | 5 +- .run/All to Syslog.run.xml | 24 +++++++ README.md | 6 +- payShieldToSyslog.py | 133 ++++++++++++++++++++++++++++++++++--- 4 files changed, 154 insertions(+), 14 deletions(-) create mode 100644 .run/All to Syslog.run.xml diff --git a/.gitignore b/.gitignore index 757fee3..1ad0387 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -/.idea \ No newline at end of file +/.idea +/build/ +/dist/ +payShieldToSyslog.spec \ No newline at end of file diff --git a/.run/All to Syslog.run.xml b/.run/All to Syslog.run.xml new file mode 100644 index 0000000..e0286c0 --- /dev/null +++ b/.run/All to Syslog.run.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/README.md b/README.md index afe7a59..fd41e64 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ It requires **Python 3**. It was tested on **Python 3.10** ## Usage - usage: payShieldToSyslog.py [-h] [--port PORT] [--header HEADER] [--forever] [--decode] [--times TIMES] + usage: payShieldToSyslog.py [-h] [--port PORT] [--header HEADER] [--allentries] [--decode] [--times TIMES] [--proto {tcp,udp,tls}] [--keyfile KEYFILE] [--crtfile CRTFILE] [--syslog SYSLOG] [--syslogport SYSLOGPORT] host @@ -29,7 +29,7 @@ It requires **Python 3**. It was tested on **Python 3.10** ### Mutually exclusive parameters -**--times** and **--forever** are mutually exclusive. +**--times** and **--allentries** are mutually exclusive. ### Optional parameters @@ -47,7 +47,7 @@ It's only considered if the protocol is **tls**. **--header** the header string to prefix to the host command, if not specified the default value is **HEAD**. -**--forever** the test will run forever or if a result code different form 00 is returned from the payShield. Use **CTRL-C** to terminate it. +**--allentries** when specified all log entries are retrieved. In case of errors it terminates. Use **CTRL-C** to terminate it prematurely. **--times** how many times execute the test. If it is not specified the default value is **1** time. diff --git a/payShieldToSyslog.py b/payShieldToSyslog.py index eb32233..2f27ad3 100644 --- a/payShieldToSyslog.py +++ b/payShieldToSyslog.py @@ -31,7 +31,7 @@ import logging import logging.handlers -VERSION = "0.2" +VERSION = "0.3.1" # Begin Class @@ -209,11 +209,20 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None): print("Date: ", date_readable) syslog_entry = syslog_entry + " " + date_readable command_action_code = bin_entry[10:12] - print("Action Code", command_action_code.decode()) + print("Action Code / Command Code", command_action_code.decode()) syslog_entry = syslog_entry + " " + command_action_code.decode() bit_mask_str = str(bin(int(binascii.hexlify(bin_entry[12:14]).decode(), base=16))[2:]) print("Bit Mask", bit_mask_str) command_code_type = bit_mask_str[0:2] + response_error_code=bin_entry[14:16].decode() + if command_code_type != '10': # It is not a fraud event + command_action_message = get_action_command_message(command_action_code.decode(), command_code_type) + else: + # In case of fraud event the command that caused the event is in the 'command action field' and the reaction + # to decode is contained in the response error code field + command_action_message = command_action_code.decode() + ' caused ' + \ + get_action_command_message(response_error_code, command_code_type) + syslog_entry = syslog_entry + ' ' + command_action_message if command_code_type == '00': print("\tCommand code type: Host Command") syslog_entry = syslog_entry + " " + "HOST" @@ -226,6 +235,7 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None): elif command_code_type == '11': print("\tCommand code type: User Action") syslog_entry = syslog_entry + " " + "USER" + print("\tCommand/Action description:", command_action_message) if bit_mask_str[2:3] == '0': print("\tNot Archived") syslog_entry = syslog_entry + " " + "NOTA" @@ -239,7 +249,7 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None): print("\tRetrieved") syslog_entry = syslog_entry + " " + "RETR" print("\tUnused:", bit_mask_str[4:]) - print("Response Error Code:", bin_entry[14:16].decode()) + print("Response Error Code:", response_error_code) audit_MAC = binascii.hexlify(bin_entry[16:16 + 8]).decode().upper() print("Audit Record MAC:", audit_MAC) syslog_entry = syslog_entry + " " + audit_MAC @@ -255,7 +265,7 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None): return syslog_entry -def payshield_error_codes(error_code: str) -> str: +def get_payshield_error_message(error_code: str) -> str: """This function maps the result code with the error message. I derived the list of errors and messages from the following manual: payShield 10K Core Host Commands v1 @@ -372,6 +382,108 @@ def payshield_error_codes(error_code: str) -> str: return PAYSHIELD_ERROR_CODE.get(error_code, "Unknown error") +def get_action_command_message(code: str, code_type: str) -> str: + """This function maps the action/command code with its description. + I derived the list of actions/commands messages from the following manual: + payShield 10K Installation and User Guide 1.7a + Date: November 2022 + Doc. Number: 007-001512-007 + + Parameters + ---------- + code: str + The action/command code returned from the payShield 10k + code_type: str + The type of code: action type or command type + + Returns + ---------- + a string containing a descriptive message of the action/command code + """ + + CONSOLE_COMMAND_ACTIONS = { + '00': 'User actions performed using payShield Manager', + '01': 'AUDITLOG', + '02': 'AUDITOPTIONS', + '03': 'CLEARAUDIT', + '04': 'CLEARERR', + '05': 'EJECT', + '06': 'ERRLOG', + '07': 'GETCMDS', + '08': 'GETTIME', + '09': 'SETTIME', + '0A': 'A', + '0B': 'B', + '0C': 'C', + '0D': 'D', + '0E': 'F', + '0F': 'K', + '10': 'N', + '11': 'R', + '12': 'T', + '13': 'V', + '14': 'Z', + '15': '$', + '16': 'CONFIGCMDS', + '17': 'CONFIGPB', + '18': 'PING', + '19': 'TRACERT', + '1A': 'NETSTAT', + '1B': 'AUDITPRINT', + '1C': 'SYSLOG', + '1D': 'UTILCFG', + '1E': 'UTILENABLE', + '1F': 'UTISTATS', + '20': 'HEALTHENABLE', + '21': 'HEALTHSTATS', + '22': 'SNMP', + '23': 'SNMPADD', + '24': 'SNMPDEL', + '25': 'RESET', + '26': 'ROUTE', + '27': 'TRAP', + '28': 'TRAPADD', + '29': 'TRAPDEL', + '2A': 'CONFIGACL' + } + FRAUD_EVENT = { + '01': 'Limit for number of PIN verifications per minute exceeded', + '02': 'Limit for number of PIN verifications per hour exceeded', + '03': 'Limit for total number of failed PIN verifications exceeded' + } + AUDITED_USER_ACTIONS = { + 'A0': 'Authorization Cancelled', + 'A1': 'Authorization ON', + 'AA': 'Authorization Activity ON', + 'AC': 'Authorization Activity Cancelled', + 'AT': 'Authorization Timeout', + 'CL': 'Audit log cleared', + 'DE': 'Diagnostic Event(Selftest)', + 'KE': 'User authentication', + 'LE': 'LMK erased', + 'LF': 'License file load failure', + 'LL': 'LMK loaded', + 'LS': 'License file successfully loaded', + 'OE': 'Old LMK erased', + 'OF': 'Change to Offline', + 'OL': 'Old LMK loaded', + 'ON': 'Change to Online', + 'PW': 'Cycle power supply', + 'SE': 'Change to Secure', + 'UT': 'Utilization Reset' + } + message = '' + if code_type == '11': + message = AUDITED_USER_ACTIONS.get(code, "Unknown user action") + elif code_type == '10': + message = FRAUD_EVENT.get(code, "Unknown fraud action") + elif code_type == '01': + message = CONSOLE_COMMAND_ACTIONS.get(code, code) + elif code_type == '00': + message = code + return message + + def check_returned_command_verb(result_returned: bytes, head_len: int, command_sent: str) -> Tuple[int, str, str]: """ Checks if the command returned by the payShield is congruent to the command sent @@ -430,7 +542,7 @@ def check_return_message(result_returned: bytes, head_len: int) -> Tuple[str, st return "ZZ", "Unknown message result code parsing error" # try to describe the error - return ret_code, payshield_error_codes(ret_code) + return ret_code, get_payshield_error_message(ret_code) def test_printable(input_str): @@ -568,7 +680,7 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i parser = argparse.ArgumentParser( description="Dumps the Audit Log and eventually sends the entries to a syslog facility for the sake of " "testing and demonstration.", - epilog="For any questions, feedback, suggestions, send money (yes...it's a dream I know) you can contact the " + epilog="For any questions, feedback, suggestions, donations (yes...I'm a dreamer, I know) you can contact the " "author at msz@msz.eu") group = parser.add_mutually_exclusive_group() parser.add_argument("host", help="payShield IP address or hostname") @@ -578,7 +690,8 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i parser.add_argument("--header", help="the header string to prepend to the host command. If not specified the default is HEAD.", default="HEAD", type=str) - group.add_argument("--forever", help="if this option is specified the program runs for ever.", + group.add_argument("--allentries", help="when specified all log entries are retrieved or until an error is " + "returned.", action="store_true") parser.add_argument("--decode", help="if specified the reply of the payShield is interpreted " "if a decoder function for that command has been implemented.", @@ -628,7 +741,7 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i logger.setLevel(logging.DEBUG) syslog.setLevel(logging.INFO) logger.addHandler(syslog) - if args.forever: + if args.allentries: i = 1 while True: print("Iteration: ", i) @@ -643,7 +756,7 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i if return_code is None: print("Connection error with the host has occurred") else: - print("Return code: ", return_code) + print("Return code: ", get_payshield_error_message(return_code)) exit() print("") else: @@ -660,7 +773,7 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i if return_code is None: print("Connection error with the host has occurred") else: - print("Return code: ", return_code) + print("Return code: ", get_payshield_error_message(return_code)) exit() print("") print("DONE") From 4cb62a005b144d9c2c874395f29744bdaf682a64 Mon Sep 17 00:00:00 2001 From: Marco Zuppone Date: Fri, 16 Dec 2022 13:12:59 +0000 Subject: [PATCH 2/3] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fd41e64..41b98f7 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ It requires **Python 3**. It was tested on **Python 3.10** ## Version -**0.2** +**0.3.1** ## Usage From e25b7d8b42b8857194d6f0b599112d37d4dbae17 Mon Sep 17 00:00:00 2001 From: Marco Zuppone Date: Fri, 16 Dec 2022 15:26:18 +0000 Subject: [PATCH 3/3] Exe lacking sys import (#5) * type decorations added * initial commit * get_action_command_message introduced to decode the action or command from the audit entry * decoded action/command message added to syslog_entry * changed --forever option, fixed some bugs * decoding login about action and fraud codes amended to adhere more strictly with manuals * decoding login about action and fraud codes amended to adhere more strictly with manuals and version increased * missing sys import corrected. was causing issues on the exe version only. Co-authored-by: Marco Simone Zuppone --- .gitignore | 5 ++++- README.md | 4 +++- payShieldToSyslog.py | 9 +++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 1ad0387..4df81e5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ /.idea /build/ /dist/ -payShieldToSyslog.spec \ No newline at end of file +payShieldToSyslog.spec +/version.txt +/log.ico +/build.bat \ No newline at end of file diff --git a/README.md b/README.md index 41b98f7..9046bfb 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,9 @@ It requires **Python 3**. It was tested on **Python 3.10** ## Version -**0.3.1** + +**0.3.2** + ## Usage diff --git a/payShieldToSyslog.py b/payShieldToSyslog.py index 2f27ad3..d3ce76d 100644 --- a/payShieldToSyslog.py +++ b/payShieldToSyslog.py @@ -30,8 +30,9 @@ from types import FunctionType import logging import logging.handlers +from sys import exit # It is needed by the executable version -VERSION = "0.3.1" +VERSION = "0.3.2" # Begin Class @@ -214,7 +215,7 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None): bit_mask_str = str(bin(int(binascii.hexlify(bin_entry[12:14]).decode(), base=16))[2:]) print("Bit Mask", bit_mask_str) command_code_type = bit_mask_str[0:2] - response_error_code=bin_entry[14:16].decode() + response_error_code = bin_entry[14:16].decode() if command_code_type != '10': # It is not a fraud event command_action_message = get_action_command_message(command_action_code.decode(), command_code_type) else: @@ -474,9 +475,9 @@ def get_action_command_message(code: str, code_type: str) -> str: } message = '' if code_type == '11': - message = AUDITED_USER_ACTIONS.get(code, "Unknown user action") + message = AUDITED_USER_ACTIONS.get(code, "Unknown user action " + code) elif code_type == '10': - message = FRAUD_EVENT.get(code, "Unknown fraud action") + message = FRAUD_EVENT.get(code, "Unknown fraud action " + code) elif code_type == '01': message = CONSOLE_COMMAND_ACTIONS.get(code, code) elif code_type == '00':