From 4ad01c2bd9eba6f9c99ba03b8b65f31dbae7ae62 Mon Sep 17 00:00:00 2001 From: Fede Tux Date: Tue, 6 Feb 2024 13:59:12 -0300 Subject: [PATCH] Avoid storing the block ending in the output buffer --- integrations/stdin_to_securitylake.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/integrations/stdin_to_securitylake.py b/integrations/stdin_to_securitylake.py index e11c23378b15b..034b729c1208d 100755 --- a/integrations/stdin_to_securitylake.py +++ b/integrations/stdin_to_securitylake.py @@ -9,7 +9,10 @@ from datetime import datetime from pyarrow import parquet, Table -chunk_ending = { "chunk_ending": True } +block_ending = { "block_ending": True } + +def map_to_ocsf(): + ## Code that translates fields to OCSF def encode_parquet(list): table = Table.from_pylist(list) @@ -19,12 +22,12 @@ def push_to_s3(parquet): ## Fill with AWS S3 code pass -def read_chunk(fileobject,length): +def read_block(fileobject,length): output=[] for i in range(0,length): line = fileobject.readline() if line == '': - output.append(chunk_ending) + output.append(block_ending) break output.append(json.loads(line)) return output @@ -59,14 +62,16 @@ def parse_arguments(): try: while True: - output_buffer.append(read_chunk(stdin,args.linebuffer)) - if output_buffer[len(output_buffer)-1] == chunk_ending : + current_block = read_block(stdin,args.linebuffer) + if current_block[-1] == block_ending : + output_buffer += current_block[0:current_block.index(block_ending)] time.sleep(args.sleeptime) if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval: push_to_s3(encode_parquet(output_buffer)) logging.debug(json.dumps(output_buffer)) starttimestamp = datetime.now(tz='UTC') output_buffer = [] + output_buffer.append(current_block) except KeyboardInterrupt: logging.info("Keyboard Interrupt issued")