Skip to content

Commit

Permalink
Avoid storing the block ending in the output buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Feb 6, 2024
1 parent 6ac3c99 commit 4ad01c2
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions integrations/stdin_to_securitylake.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from datetime import datetime
from pyarrow import parquet, Table

chunk_ending = { "chunk_ending": True }
block_ending = { "block_ending": True }

def map_to_ocsf():
## Code that translates fields to OCSF

def encode_parquet(list):
table = Table.from_pylist(list)
Expand All @@ -19,12 +22,12 @@ def push_to_s3(parquet):
## Fill with AWS S3 code
pass

def read_chunk(fileobject,length):
def read_block(fileobject,length):
output=[]
for i in range(0,length):
line = fileobject.readline()
if line == '':
output.append(chunk_ending)
output.append(block_ending)
break
output.append(json.loads(line))
return output
Expand Down Expand Up @@ -59,14 +62,16 @@ def parse_arguments():

try:
while True:
output_buffer.append(read_chunk(stdin,args.linebuffer))
if output_buffer[len(output_buffer)-1] == chunk_ending :
current_block = read_block(stdin,args.linebuffer)
if current_block[-1] == block_ending :
output_buffer += current_block[0:current_block.index(block_ending)]
time.sleep(args.sleeptime)
if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval:
push_to_s3(encode_parquet(output_buffer))
logging.debug(json.dumps(output_buffer))
starttimestamp = datetime.now(tz='UTC')
output_buffer = []
output_buffer.append(current_block)

except KeyboardInterrupt:
logging.info("Keyboard Interrupt issued")
Expand Down

0 comments on commit 4ad01c2

Please sign in to comment.