Skip to content

Commit

Permalink
Write parquet directly to bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Feb 6, 2024
1 parent 17e5dfb commit 0b5adc9
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions integrations/stdin_to_securitylake.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
def map_to_ocsf():
## Code that translates fields to OCSF

def encode_parquet(list):
def encode_parquet(list,bucket_name,folder):
### We can write directly to S3 from pyarrow:
### https://arrow.apache.org/docs/python/filesystems.html#s3
### https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html#pyarrow.fs.S3FileSystem.open_output_stream
###
### Credentials can be stored in /root/.aws/credentials
### https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html

table = Table.from_pylist(list)
parquet.write_table(table, '/tmp/{}.parquet'.format(clockstr))
parquet.write_to_dataset(table, root_path='s3://{}/{}'.format(bucket_name,folder))

def read_block(fileobject,length):
output=[]
Expand All @@ -42,10 +43,12 @@ def get_elapsedseconds(reference_timestamp):

def parse_arguments():
parser = argparse.ArgumentParser(description='STDIN to Security Lake pipeline')
parser.add_argument('-n','--linebuffer', action='store', default=10 help='stdin line buffer length')
parser.add_argument('-m','--maxlength', action='store', default=20 help='Event number threshold for submission to Security Lake')
parser.add_argument('-s','--sleeptime', action='store', default=5 help='Input buffer polling interval')
parser.add_argument('-i','--pushinterval', action='store', default=299 help='Time interval for pushing data to Security Lake')
parser.add_argument('-b','--bucketname', action='store', help='Name of the output S3 bucket')
parser.add_argument('-f','--foldername', action='store', help='Name of the output S3 bucket\'s folder')
parser.add_argument('-i','--pushinterval', action='store', default=299, help='Time interval for pushing data to Security Lake')
parser.add_argument('-m','--maxlength', action='store', default=20, help='Event number threshold for submission to Security Lake')
parser.add_argument('-n','--linebuffer', action='store', default=10, help='stdin line buffer length')
parser.add_argument('-s','--sleeptime', action='store', default=5, help='Input buffer polling interval')
debugging = parser.add_argument_group('debugging')
debugging.add_argument('-o','--output', type=str, default="/tmp/{}_stdintosecuritylake.txt".format(clockstr), help='File path of the destination file to write to')
debugging.add_argument('-d','--debug', action='store_true', help='Activate debugging')
Expand Down Expand Up @@ -77,7 +80,7 @@ def parse_arguments():
output_buffer += current_block[0:current_block.index(block_ending)]
time.sleep(args.sleeptime)
if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval:
encode_parquet(output_buffer)
encode_parquet(output_buffer,args.bucketname,args.foldername)
logging.debug(json.dumps(output_buffer))
starttimestamp = datetime.now(tz='UTC')
output_buffer = []
Expand Down

0 comments on commit 0b5adc9

Please sign in to comment.