Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into dev
Browse files Browse the repository at this point in the history
# Conflicts:
#	.gitignore
  • Loading branch information
Marco Simone Zuppone committed Dec 16, 2022
2 parents 4ccd6bc + e25b7d8 commit bbfd997
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 18 deletions.
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
/.idea
/build/
/dist/
payShieldToSyslog.spec
/.idea
24 changes: 24 additions & 0 deletions .run/All to Syslog.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="All to Syslog" type="PythonConfigurationType" factoryName="Python">
<module name="payShield2Syslog" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/payShieldToSyslog.py" />
<option name="PARAMETERS" value="192.168.0.36 --decode --allentries --proto udp --syslog 192.168.0.33" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ It requires **Python 3**. It was tested on **Python 3.10**

## Version

**0.2**

**0.3.2**


## Usage

usage: payShieldToSyslog.py [-h] [--port PORT] [--header HEADER] [--forever] [--decode] [--times TIMES]
usage: payShieldToSyslog.py [-h] [--port PORT] [--header HEADER] [--allentries] [--decode] [--times TIMES]
[--proto {tcp,udp,tls}] [--keyfile KEYFILE] [--crtfile CRTFILE] [--syslog SYSLOG]
[--syslogport SYSLOGPORT] host

Expand All @@ -29,7 +31,7 @@ It requires **Python 3**. It was tested on **Python 3.10**

### Mutually exclusive parameters

**--times** and **--forever** are mutually exclusive.
**--times** and **--allentries** are mutually exclusive.

### Optional parameters

Expand All @@ -47,7 +49,7 @@ It's only considered if the protocol is **tls**.

**--header** the header string to prefix to the host command, if not specified the default value is **HEAD**.

**--forever** the test will run forever or if a result code different form 00 is returned from the payShield. Use **CTRL-C** to terminate it.
**--allentries** when specified all log entries are retrieved. In case of errors it terminates. Use **CTRL-C** to terminate it prematurely.

**--times** how many times execute the test. If it is not specified the default value is **1** time.

Expand Down
134 changes: 124 additions & 10 deletions payShieldToSyslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
from types import FunctionType
import logging
import logging.handlers
from sys import exit # It is needed by the executable version

VERSION = "0.2"
VERSION = "0.3.2"


# Begin Class
Expand Down Expand Up @@ -209,11 +210,20 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None):
print("Date: ", date_readable)
syslog_entry = syslog_entry + " " + date_readable
command_action_code = bin_entry[10:12]
print("Action Code", command_action_code.decode())
print("Action Code / Command Code", command_action_code.decode())
syslog_entry = syslog_entry + " " + command_action_code.decode()
bit_mask_str = str(bin(int(binascii.hexlify(bin_entry[12:14]).decode(), base=16))[2:])
print("Bit Mask", bit_mask_str)
command_code_type = bit_mask_str[0:2]
response_error_code = bin_entry[14:16].decode()
if command_code_type != '10': # It is not a fraud event
command_action_message = get_action_command_message(command_action_code.decode(), command_code_type)
else:
# In case of fraud event the command that caused the event is in the 'command action field' and the reaction
# to decode is contained in the response error code field
command_action_message = command_action_code.decode() + ' caused ' + \
get_action_command_message(response_error_code, command_code_type)
syslog_entry = syslog_entry + ' ' + command_action_message
if command_code_type == '00':
print("\tCommand code type: Host Command")
syslog_entry = syslog_entry + " " + "HOST"
Expand All @@ -226,6 +236,7 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None):
elif command_code_type == '11':
print("\tCommand code type: User Action")
syslog_entry = syslog_entry + " " + "USER"
print("\tCommand/Action description:", command_action_message)
if bit_mask_str[2:3] == '0':
print("\tNot Archived")
syslog_entry = syslog_entry + " " + "NOTA"
Expand All @@ -239,7 +250,7 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None):
print("\tRetrieved")
syslog_entry = syslog_entry + " " + "RETR"
print("\tUnused:", bit_mask_str[4:])
print("Response Error Code:", bin_entry[14:16].decode())
print("Response Error Code:", response_error_code)
audit_MAC = binascii.hexlify(bin_entry[16:16 + 8]).decode().upper()
print("Audit Record MAC:", audit_MAC)
syslog_entry = syslog_entry + " " + audit_MAC
Expand All @@ -255,7 +266,7 @@ def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None):
return syslog_entry


def payshield_error_codes(error_code: str) -> str:
def get_payshield_error_message(error_code: str) -> str:
"""This function maps the result code with the error message.
I derived the list of errors and messages from the following manual:
payShield 10K Core Host Commands v1
Expand Down Expand Up @@ -372,6 +383,108 @@ def payshield_error_codes(error_code: str) -> str:
return PAYSHIELD_ERROR_CODE.get(error_code, "Unknown error")


def get_action_command_message(code: str, code_type: str) -> str:
"""This function maps the action/command code with its description.
I derived the list of actions/commands messages from the following manual:
payShield 10K Installation and User Guide 1.7a
Date: November 2022
Doc. Number: 007-001512-007
Parameters
----------
code: str
The action/command code returned from the payShield 10k
code_type: str
The type of code: action type or command type
Returns
----------
a string containing a descriptive message of the action/command code
"""

CONSOLE_COMMAND_ACTIONS = {
'00': 'User actions performed using payShield Manager',
'01': 'AUDITLOG',
'02': 'AUDITOPTIONS',
'03': 'CLEARAUDIT',
'04': 'CLEARERR',
'05': 'EJECT',
'06': 'ERRLOG',
'07': 'GETCMDS',
'08': 'GETTIME',
'09': 'SETTIME',
'0A': 'A',
'0B': 'B',
'0C': 'C',
'0D': 'D',
'0E': 'F',
'0F': 'K',
'10': 'N',
'11': 'R',
'12': 'T',
'13': 'V',
'14': 'Z',
'15': '$',
'16': 'CONFIGCMDS',
'17': 'CONFIGPB',
'18': 'PING',
'19': 'TRACERT',
'1A': 'NETSTAT',
'1B': 'AUDITPRINT',
'1C': 'SYSLOG',
'1D': 'UTILCFG',
'1E': 'UTILENABLE',
'1F': 'UTISTATS',
'20': 'HEALTHENABLE',
'21': 'HEALTHSTATS',
'22': 'SNMP',
'23': 'SNMPADD',
'24': 'SNMPDEL',
'25': 'RESET',
'26': 'ROUTE',
'27': 'TRAP',
'28': 'TRAPADD',
'29': 'TRAPDEL',
'2A': 'CONFIGACL'
}
FRAUD_EVENT = {
'01': 'Limit for number of PIN verifications per minute exceeded',
'02': 'Limit for number of PIN verifications per hour exceeded',
'03': 'Limit for total number of failed PIN verifications exceeded'
}
AUDITED_USER_ACTIONS = {
'A0': 'Authorization Cancelled',
'A1': 'Authorization ON',
'AA': 'Authorization Activity ON',
'AC': 'Authorization Activity Cancelled',
'AT': 'Authorization Timeout',
'CL': 'Audit log cleared',
'DE': 'Diagnostic Event(Selftest)',
'KE': 'User authentication',
'LE': 'LMK erased',
'LF': 'License file load failure',
'LL': 'LMK loaded',
'LS': 'License file successfully loaded',
'OE': 'Old LMK erased',
'OF': 'Change to Offline',
'OL': 'Old LMK loaded',
'ON': 'Change to Online',
'PW': 'Cycle power supply',
'SE': 'Change to Secure',
'UT': 'Utilization Reset'
}
message = ''
if code_type == '11':
message = AUDITED_USER_ACTIONS.get(code, "Unknown user action " + code)
elif code_type == '10':
message = FRAUD_EVENT.get(code, "Unknown fraud action " + code)
elif code_type == '01':
message = CONSOLE_COMMAND_ACTIONS.get(code, code)
elif code_type == '00':
message = code
return message


def check_returned_command_verb(result_returned: bytes, head_len: int, command_sent: str) -> Tuple[int, str, str]:
"""
Checks if the command returned by the payShield is congruent to the command sent
Expand Down Expand Up @@ -430,7 +543,7 @@ def check_return_message(result_returned: bytes, head_len: int) -> Tuple[str, st
return "ZZ", "Unknown message result code parsing error"

# try to describe the error
return ret_code, payshield_error_codes(ret_code)
return ret_code, get_payshield_error_message(ret_code)


def test_printable(input_str):
Expand Down Expand Up @@ -568,7 +681,7 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i
parser = argparse.ArgumentParser(
description="Dumps the Audit Log and eventually sends the entries to a syslog facility for the sake of "
"testing and demonstration.",
epilog="For any questions, feedback, suggestions, send money (yes...it's a dream I know) you can contact the "
epilog="For any questions, feedback, suggestions, donations (yes...I'm a dreamer, I know) you can contact the "
"author at [email protected]")
group = parser.add_mutually_exclusive_group()
parser.add_argument("host", help="payShield IP address or hostname")
Expand All @@ -578,7 +691,8 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i
parser.add_argument("--header",
help="the header string to prepend to the host command. If not specified the default is HEAD.",
default="HEAD", type=str)
group.add_argument("--forever", help="if this option is specified the program runs for ever.",
group.add_argument("--allentries", help="when specified all log entries are retrieved or until an error is "
"returned.",
action="store_true")
parser.add_argument("--decode", help="if specified the reply of the payShield is interpreted "
"if a decoder function for that command has been implemented.",
Expand Down Expand Up @@ -628,7 +742,7 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i
logger.setLevel(logging.DEBUG)
syslog.setLevel(logging.INFO)
logger.addHandler(syslog)
if args.forever:
if args.allentries:
i = 1
while True:
print("Iteration: ", i)
Expand All @@ -643,7 +757,7 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i
if return_code is None:
print("Connection error with the host has occurred")
else:
print("Return code: ", return_code)
print("Return code: ", get_payshield_error_message(return_code))
exit()
print("")
else:
Expand All @@ -660,7 +774,7 @@ def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, i
if return_code is None:
print("Connection error with the host has occurred")
else:
print("Return code: ", return_code)
print("Return code: ", get_payshield_error_message(return_code))
exit()
print("")
print("DONE")

0 comments on commit bbfd997

Please sign in to comment.