Skip to content

Commit

Permalink
Map alerts to OCSF as they are read
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Feb 7, 2024
1 parent c81239b commit 210541d
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions integrations/stdin_to_securitylake.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@

s3 = fs.S3FileSystem()

def map_to_ocsf(alert_dictionary,ocsf_mapping_filename):
ocsf_alert = {}
with open(ocsf_mapping_filename) as jsonfile:
mappings = json.loads(jsonfile.read())
def map_to_ocsf(alert_dictionary, mappings, ocsf_output):
ocsf_output = {}
### Put constants into the output alert
ocsf_alert |= mappings['constants']
ocsf_output |= mappings['constants']

for key in mappings['mappings']:
dotted_destination_field = mappings['mappings'].get(key)
Expand All @@ -27,7 +25,7 @@ def map_to_ocsf(alert_dictionary,ocsf_mapping_filename):
if len(depth_levels>1):
for field in depth_levels[1:]:
current_level = current_level[field]
ocsf_alert[key] = current_level
ocsf_output[key] = current_level

def encode_parquet(list,bucket_name,folder):
### We can write directly to S3 from pyarrow:
Expand All @@ -38,17 +36,19 @@ def encode_parquet(list,bucket_name,folder):
### https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html

table = Table.from_pylist(list)
parquet.write_to_dataset(table, root_path='s3://{}/{}'.format(bucket_name,folder))
parquet.write_to_dataset(table, root_path='s3://{}/{}'.format(bucket_name, folder))

def read_block(fileobject,length):
def map_block(fileobject, length, mappings):
output=[]
for line in range(0,length):
for line in range(0, length):
line = fileobject.readline()
if line == '':
output.append(block_ending)
break
alert = json.loads(line)
output.append(json.loads(line))
ocsf_mapped_alert = {}
map_to_ocsf(alert, mappings, ocsf_mapped_alert):
output.append(ocsf_mapped_alert)
return output

def get_elapsedseconds(reference_timestamp):
Expand Down Expand Up @@ -77,6 +77,8 @@ def parse_arguments():
logging.info('BUFFERING STDIN')

try:
with open(ocsf_mapping_filename) as jsonfile:
mappings = json.loads(jsonfile.read())

with os.fdopen(sys.stdin.fileno(), 'rt', buffering=0) as stdin:
output_buffer = []
Expand All @@ -90,7 +92,7 @@ def parse_arguments():
### * https://arrow.apache.org/docs/python/ipc.html#reading-from-stream-and-file-format-for-pandas
### * https://stackoverflow.com/questions/52945609/pandas-dataframe-to-parquet-buffer-in-memory

current_block = read_block(stdin,args.linebuffer)
current_block = map_block(stdin, args.linebuffer, mappings)
if current_block[-1] == block_ending :
output_buffer += current_block[0:current_block.index(block_ending)]
time.sleep(args.sleeptime)
Expand Down

0 comments on commit 210541d

Please sign in to comment.