Skip to content

Commit

Permalink
Allow to set the bucket name from the cli
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Feb 27, 2024
1 parent 81ecb7c commit f4a7336
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ output {
# codec => rubydebug
#}

#pipe
#{
# id => "securityLake"
# command => "/usr/share/logstash/bin/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --outputfolder s3://"
#}

file {
id => "fileOutputPipeline"
path => "/tmp/indexer-to-file.json"
pipe
{
id => "securityLake"
#command => "/usr/share/logstash/bin/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --outputfolder s3://s3.ninja:9000/securitylake"
command => "/usr/share/logstash/bin/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
}

#file {
# id => "fileOutputPipeline"
# path => "/tmp/indexer-to-file.json"
#}

}
12 changes: 4 additions & 8 deletions integrations/amazon-security-lake/stdin_to_securitylake.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,11 @@ def s3authenticate(profile, endpoint=None, scheme='https'):
scheme=scheme)

return s3fs





def encode_parquet(list,foldername,filename):
def encode_parquet(list,bucketname,filename,filesystem):
try:
table = Table.from_pylist(list)
parquet.write_table(table, '{}.parquet'.format(filename), filesystem=localS3)
parquet.write_table(table, '{}/{}.parquet'.format(bucketname,filename), filesystem=filesystem)
except Exception as e:
logging.error(e)
raise
Expand All @@ -62,12 +58,12 @@ def get_elapsedseconds(reference_timestamp):
date = datetime.datetime.now(datetime.timezone.utc).strftime('%F_%H.%M.%S')
parser = argparse.ArgumentParser(description='STDIN to Security Lake pipeline')
parser.add_argument('-d','--debug', action='store_true', help='Activate debugging')
parser.add_argument('-b','--bucketname', type=str, action='store', help='S3 bucket name to write parquet files to')
parser.add_argument('-e','--s3endpoint', type=str, action='store', default=None, help='Hostname and port of the S3 destination (defaults to AWS\')')
parser.add_argument('-i','--pushinterval', type=int, action='store', default=299, help='Time interval in seconds for pushing data to Security Lake')
parser.add_argument('-l','--logoutput', type=str, default="/tmp/stdintosecuritylake.txt", help='File path of the destination file to write to')
parser.add_argument('-m','--maxlength', type=int, action='store', default=2000, help='Event number threshold for submission to Security Lake')
parser.add_argument('-n','--linebuffer', type=int, action='store', default=100, help='stdin line buffer length')
parser.add_argument('-o','--outputfolder', type=str, action='store', help='Folder or S3 bucket URL to dump parquet files to')
parser.add_argument('-p','--s3profile', type=str, action='store', default='default', help='AWS profile as stored in credentials file')
parser.add_argument('-s','--sleeptime', type=int, action='store', default=5, help='Input buffer polling interval')
args = parser.parse_args()
Expand Down Expand Up @@ -98,7 +94,7 @@ def get_elapsedseconds(reference_timestamp):
if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval:
logging.info('Writing data to parquet file')
s3fs = s3authenticate(args.s3profile,args.s3endpoint)
encode_parquet(output_buffer,args.outputfolder,'wazuh-{}'.format(date))
encode_parquet(output_buffer,args.bucketname,'wazuh-{}'.format(date),s3fs)
starttimestamp = datetime.datetime.now(datetime.timezone.utc)
output_buffer = []

Expand Down

0 comments on commit f4a7336

Please sign in to comment.