Skip to content

Commit

Permalink
Modify script to use converter class
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Feb 15, 2024
1 parent 34f295b commit fd63e9e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 43 deletions.
40 changes: 20 additions & 20 deletions integrations/amazon-security-lake/ocsf/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,30 @@ def convert(event: dict) -> dict:
"category_uid": 2,
"class_name": "Detection Finding",
"class_uid": 2004,
"count": event["_source"]["rule"]["firedtimes"],
"message": event["_source"]["rule"]["description"],
"count": event["rule"]["firedtimes"],
"message": event["rule"]["description"],
"finding_info": {
"analytic": {
"category": join(event["_source"]["rule"]["groups"]),
"name": event["_source"]["decoder"]["name"],
"category": join(event["rule"]["groups"]),
"name": event["decoder"]["name"],
"type_id": 1,
"uid": event["_source"]["rule"]["id"],
"uid": event["rule"]["id"],
},
"attacks": {
"tactic": {
"name": join(event["_source"]["rule"]["mitre"]["tactic"]),
"name": join(event["rule"]["mitre"]["tactic"]),
},
"technique": {
"name": join(event["_source"]["rule"]["mitre"]["technique"]),
"uid": join(event["_source"]["rule"]["mitre"]["id"]),
"name": join(event["rule"]["mitre"]["technique"]),
"uid": join(event["rule"]["mitre"]["id"]),
},
"version": "v13.1"
},
"title": event["_source"]["rule"]["description"],
"title": event["rule"]["description"],
"types": [
event["_source"]["input"]["type"]
event["input"]["type"]
],
"uid": event["_source"]['id']
"uid": event['id']
},
"metadata": {
"log_name": "Security events",
Expand All @@ -62,25 +62,25 @@ def convert(event: dict) -> dict:
},
"version": "1.1.0",
},
"raw_data": event["_source"]["full_log"],
"raw_data": event["full_log"],
"resources": [
{
"name": event["_source"]["agent"]["name"],
"uid": event["_source"]["agent"]["id"]
"name": event["agent"]["name"],
"uid": event["agent"]["id"]
},
],
"risk_score": event["_source"]["rule"]["level"],
"severity_id": normalize(event["_source"]["rule"]["level"]),
"risk_score": event["rule"]["level"],
"severity_id": normalize(event["rule"]["level"]),
"status_id": 99,
"time": event["_source"]["timestamp"],
"time": event["timestamp"],
"type_uid": 200401,
"unmapped": {
"data_sources": [
event["_index"],
event["_source"]["location"],
event["_source"]["manager"]["name"]
event["location"],
event["manager"]["name"]
],
"nist": event["_source"]["rule"]["nist_800_53"], # Array
"nist": event["rule"]["nist_800_53"], # Array
}
}

Expand Down
45 changes: 22 additions & 23 deletions integrations/amazon-security-lake/stdin_to_securitylake.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import logging
import time
import json
from datetime import datetime
import datetime
from pyarrow import parquet, Table, fs

import ocsf
from ocsf import converter

block_ending = { "block_ending": True }

Expand Down Expand Up @@ -43,52 +43,49 @@ def encode_parquet(list,bucket_name,folder):
table = Table.from_pylist(list)
parquet.write_to_dataset(table, root_path='s3://{}/{}'.format(bucket_name, folder))

def map_block(fileobject, length, mappings):
def map_block(fileobject, length):
output=[]
for line in range(0, length):
line = fileobject.readline()
if line == '':
output.append(block_ending)
break
alert = json.loads(line)
ocsf_mapped_alert = ocsf.convert(alert)
ocsf_mapped_alert = converter.convert(alert)
#map_to_ocsf(alert, mappings, ocsf_mapped_alert):
output.append(ocsf_mapped_alert)
output.append(ocsf_mapped_alert)
return output

def get_elapsedseconds(reference_timestamp):
current_time = datetime.now(tz='UTC')
current_time = datetime.datetime.now(datetime.timezone.utc)
return (current_time - reference_timestamp).total_seconds()

def parse_arguments():

if __name__ == "__main__":
clock = datetime.datetime.now(datetime.timezone.utc)
clockstr = clock.strftime('%F_%H.%M.%S')
parser = argparse.ArgumentParser(description='STDIN to Security Lake pipeline')
parser.add_argument('-b','--bucketname', action='store', help='Name of the output S3 bucket')
parser.add_argument('-f','--foldername', action='store', help='Name of the output S3 bucket\'s folder')
parser.add_argument('-i','--pushinterval', action='store', default=299, help='Time interval for pushing data to Security Lake')
parser.add_argument('-i','--pushinterval', action='store', default=299, help='Time interval in seconds for pushing data to Security Lake')
parser.add_argument('-m','--maxlength', action='store', default=20, help='Event number threshold for submission to Security Lake')
parser.add_argument('-n','--linebuffer', action='store', default=10, help='stdin line buffer length')
parser.add_argument('-s','--sleeptime', action='store', default=5, help='Input buffer polling interval')
parser.add_argument('-v','--ocsfschema', action='store', default='1.1.0', help='Version of the OCSF schema to use')
parser.add_argument('-x','--mapping', action='store', default='ocsf-mapping.json', help='Location of the Wazuh Alert to OCSF mapping (json formatted)')
debugging = parser.add_argument_group('debugging')
debugging.add_argument('-o','--output', type=str, default="/tmp/{}_stdintosecuritylake.txt".format(clockstr), help='File path of the destination file to write to')
debugging.add_argument('-d','--debug', action='store_true', help='Activate debugging')
parser.add_argument('-o','--output', type=str, default="/tmp/stdintosecuritylake.txt", help='File path of the destination file to write to')
parser.add_argument('-d','--debug', action='store_true', help='Activate debugging')
args = parser.parse_args()

if __name__ == "__main__":
clock = datetime.now(tz='UTC')
clockstr = clock.strftime('%F_%H.%M.%S')
parse_arguments()
logging.basicConfig(format='%(asctime)s %(message)s',filename=args.output, encoding='utf-8', level=logging.DEBUG)
logging.basicConfig(format='%(asctime)s %(message)s', filename=args.output, encoding='utf-8', level=logging.DEBUG)
logging.info('BUFFERING STDIN')

try:
with open(ocsf_mapping_filename) as jsonfile:
mappings = json.loads(jsonfile.read())
#with open(ocsf_mapping_filename) as jsonfile:
# mappings = json.loads(jsonfile.read())

with os.fdopen(sys.stdin.fileno(), 'rt', buffering=0) as stdin:
with os.fdopen(sys.stdin.fileno(), 'rt') as stdin:
output_buffer = []
starttimestamp = datetime.now(tz='UTC')
starttimestamp = datetime.datetime.now(datetime.timezone.utc)

try:
while True:
Expand All @@ -98,14 +95,14 @@ def parse_arguments():
### * https://arrow.apache.org/docs/python/ipc.html#reading-from-stream-and-file-format-for-pandas
### * https://stackoverflow.com/questions/52945609/pandas-dataframe-to-parquet-buffer-in-memory

current_block = map_block(stdin, args.linebuffer, mappings,args.ocsfschema)
current_block = map_block(stdin, args.linebuffer )
if current_block[-1] == block_ending :
output_buffer += current_block[0:current_block.index(block_ending)]
time.sleep(args.sleeptime)
if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval:
encode_parquet(output_buffer,args.bucketname,args.foldername)
logging.debug(json.dumps(output_buffer))
starttimestamp = datetime.now(tz='UTC')
starttimestamp = datetime.datetime.now(datetime.timezone.utc)
output_buffer = []
output_buffer.append(current_block)

Expand All @@ -117,4 +114,6 @@ def parse_arguments():

except Exception as e:
logging.error("Error running script")
logging.error(e)
raise
exit(1)

0 comments on commit fd63e9e

Please sign in to comment.