Skip to content

Commit

Permalink
encode_parquet() function fixed to handle lists of dictionaries
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Feb 6, 2024
1 parent 116b22b commit 288c40a
Showing 1 changed file with 29 additions and 36 deletions.
65 changes: 29 additions & 36 deletions integrations/stdin_to_securitylake.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
import argparse
import logging
import time
import json
from datetime import datetime
from pyarrow import json
import pyarrow.parquet as pq
from pyarrow import json, parquet, Table

def encode_parquet(json_list):
for json in json_list:
### read_json is meant for files, need to change it to read from a string
### https://arrow.apache.org/docs/python/json.html
table = json.read_json(json)
pq.write_table(table, 'parquet/output.parquet')
chunk_ending = { "chunk_ending": True }

def encode_parquet(list):
table = Table.from_pylist(list)
pq.write_table(table, '/tmp/{}.parquet'.format(clockstr))

def push_to_s3(parquet):
## Fill with AWS S3 code
Expand All @@ -24,63 +23,57 @@ def read_chunk(fileobject,length):
output=[]
for i in range(0,length):
line = fileobject.readline()
if line is '':
output.append(line)
if line == '':
output.append(chunk_ending)
break
output.append(line)
output.append(json.loads(line))
return output

def get_elapsedtime(reference_timestamp):
def get_elapsedseconds(reference_timestamp):
current_time = datetime.now(tz='UTC')
return (current_time - reference_timestamp).total_seconds()

if __name__ == "__main__":

clock = datetime.now(tz='UTC')
clockstr = clock.strftime('%F_%H:%M:%S')

def parse_arguments():
parser = argparse.ArgumentParser(description='STDIN to Security Lake pipeline')

parser.add_argument('-n','--linebuffer', action='store', default=10 help='Lines to buffer')
parser.add_argument('-m','--maxlength', action='store', default=20 help='Lines to buffer')
parser.add_argument('-s','--sleeptime', action='store', default=5 help='Lines to buffer')
parser.add_argument('-i','--pushinterval', action='store', default=299 help='Lines to buffer')

parser.add_argument('-n','--linebuffer', action='store', default=10 help='stdin line buffer length')
parser.add_argument('-m','--maxlength', action='store', default=20 help='Event number threshold for submission to Security Lake')
parser.add_argument('-s','--sleeptime', action='store', default=5 help='Input buffer polling interval')
parser.add_argument('-i','--pushinterval', action='store', default=299 help='Time interval for pushing data to Security Lake')
debugging = parser.add_argument_group('debugging')
debugging.add_argument('-o','--output', type=str, default="/tmp/{}_stdintosecuritylake.txt".format(clockstr), help='File path of the destination file to write to')
debugging.add_argument('-d','--debug', action='store_true', help='Activate debugging')

args = parser.parse_args()

logging.basicConfig(format='%(asctime)s %(message)s',filename=args.output, encoding='utf-8', level=logging.DEBUG)
logging.debug("Running main()")
logging.debug("Current time is " + str(clockstr) )

if __name__ == "__main__":
clock = datetime.now(tz='UTC')
clockstr = clock.strftime('%F_%H.%M.%S')
parse_arguments()
logging.basicConfig(format='%(asctime)s %(message)s',filename=args.output, encoding='utf-8', level=logging.DEBUG)
logging.info('BUFFERING STDIN')

try:
logging.info('BUFFERING STDIN')

with os.fdopen(sys.stdin.fileno(), 'rt', buffering=0) as stdin:

output_buffer = []

starttimestamp = datetime.now(tz='UTC')

try:
while True:
output_buffer.append(read_chunk(stdin,args.linebuffer))
if output_buffer[len(output_buffer)-1] is '':
if output_buffer[len(output_buffer)-1] == chunk_ending :
time.sleep(args.sleeptime)
if len(output_buffer) > args.maxlength or get_elapsedtime(starttimestamp) > args.pushinterval:
encode_parquet(output_buffer)
logging.debug(output_buffer)
if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval:
push_to_s3(encode_parquet(output_buffer))
logging.debug(json.dumps(output_buffer))
starttimestamp = datetime.now(tz='UTC')
output_buffer = []

except KeyboardInterrupt:
logging.info("Keyboard Interrupt issued")
exit(0)


logging.info('FINISHED RETRIEVING STDIN')

except Exception as e:
logging.error("Error running script")
exit(1)

0 comments on commit 288c40a

Please sign in to comment.