Skip to content

Commit

Permalink
Migrate from #147
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexRuiz7 committed Mar 12, 2024
1 parent 69ea807 commit 0b5716e
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ input {
}
}'
target => "_source"
ecs_compatibility => disabled
schedule => "* * * * *"
}
}
Expand All @@ -26,5 +27,6 @@ output {
message_format => "%{_source}"
ttl => "10"
command => "/usr/bin/env python3 /usr/local/bin/run.py -d"
# command => "/usr/share/logstash/bin/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
}
}
146 changes: 145 additions & 1 deletion integrations/amazon-security-lake/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,151 @@
#!/env/bin/python3.9
# vim: bkc=yes bk wb

import transform
import os
import sys
import argparse
import logging
import time
import json
import datetime
import boto3
import transform
from pyarrow import parquet, Table, fs


logging.basicConfig(format='%(asctime)s %(message)s',
encoding='utf-8', level=logging.DEBUG)

BLOCK_ENDING = {"block_ending": True}


def create_arg_parser():
parser = argparse.ArgumentParser(
description='STDIN to Security Lake pipeline')
parser.add_argument('-d', '--debug', action='store_true',
help='Activate debugging')
parser.add_argument('-b', '--bucketname', type=str, action='store',
help='S3 bucket name to write parquet files to')
parser.add_argument('-e', '--s3endpoint', type=str, action='store', default=None,
help='Hostname and port of the S3 destination (defaults to AWS\')')
parser.add_argument('-i', '--pushinterval', type=int, action='store', default=299,
help='Time interval in seconds for pushing data to Security Lake')
parser.add_argument('-l', '--logoutput', type=str, default="/tmp/stdintosecuritylake.txt",
help='File path of the destination file to write to')
parser.add_argument('-m', '--maxlength', type=int, action='store', default=2000,
help='Event number threshold for submission to Security Lake')
parser.add_argument('-n', '--linebuffer', type=int, action='store',
default=100, help='stdin line buffer length')
parser.add_argument('-p', '--s3profile', type=str, action='store',
default='default', help='AWS profile as stored in credentials file')
parser.add_argument('-s', '--sleeptime', type=int, action='store',
default=5, help='Input buffer polling interval')
return parser


def check_fd_open(file):
return file.closed


def s3authenticate(profile, endpoint=None, scheme='https'):
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials()

if endpoint != None:
scheme = 'http'

s3fs = fs.S3FileSystem(
endpoint_override=endpoint,
access_key=credentials.access_key,
secret_key=credentials.secret_key,
scheme=scheme)

return s3fs


def encode_parquet(list, bucketname, filename, filesystem):
try:
table = Table.from_pylist(list)
parquet.write_table(table, '{}/{}'.format(bucketname,
filename), filesystem=filesystem)
except Exception as e:
logging.error(e)
raise


def map_block(fileobject, length):
output = []
ocsf_mapped_alert = {}
for line in range(0, length):
line = fileobject.readline()
if line == '':
output.append(BLOCK_ENDING)
break
alert = json.loads(line)
ocsf_mapped_alert = converter.convert(alert)
output.append(ocsf_mapped_alert)
return output


def timedelta(reference_timestamp):
current_time = datetime.datetime.now(datetime.timezone.utc)
return (current_time - reference_timestamp).total_seconds()


def utctime():
return datetime.datetime.now(datetime.timezone.utc)


if __name__ == "__main__":
try:
args = create_arg_parser().parse_args()
logging.info('BUFFERING STDIN')

with os.fdopen(sys.stdin.fileno(), 'rt') as stdin:
output_buffer = []
loop_start_time = utctime()

try:
s3fs = s3authenticate(args.s3profile, args.s3endpoint)
while True:

current_block = map_block(stdin, args.linebuffer)

if current_block[-1] == BLOCK_ENDING:
output_buffer += current_block[0:-1]
time.sleep(args.sleeptime)
else:
output_buffer += current_block

buffer_length = len(output_buffer)

if buffer_length == 0:
continue

elapsed_seconds = timedelta(loop_start_time)

if buffer_length > args.maxlength or elapsed_seconds > args.pushinterval:
logging.info(
'Elapsed seconds: {}'.format(elapsed_seconds))
loop_start_time = utctime()
timestamp = loop_start_time.strftime('%F_%H.%M.%S')
filename = 'wazuh-{}.parquet'.format(timestamp)
logging.info(
'Writing data to s3://{}/{}'.format(args.bucketname, filename))
encode_parquet(
output_buffer, args.bucketname, filename, s3fs)
output_buffer = []

except KeyboardInterrupt:
logging.info("Keyboard Interrupt issued")
exit(0)

logging.info('FINISHED RETRIEVING STDIN')

except Exception as e:
logging.error("Error running script")
logging.error(e)
raise


def _test():
Expand Down
5 changes: 5 additions & 0 deletions integrations/docker/amazon-security-lake.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
version: "3.8"
name: "amazon-security-lake"

services:
events-generator:
image: wazuh/indexer-events-generator
Expand Down Expand Up @@ -89,6 +90,10 @@ services:
volumes:
- ../amazon-security-lake/logstash/pipeline:/usr/share/logstash/pipeline
- ./certs/root-ca.pem:/usr/share/logstash/root-ca.pem
- ../amazon-security-lake/run.py:/usr/share/logstash/bin/run.py
- ../amazon-security-lake/transform/:/usr/share/logstash/bin/transform/
- ../amazon-security-lake/ocsf/:/usr/share/logstash/bin/ocsf/
- ./credentials:/usr/share/logstash/.aws/credentials
# command: tail -f /dev/null
command: /usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-integrator.conf --path.settings /etc/logstash --config.reload.automatic

Expand Down

0 comments on commit 0b5716e

Please sign in to comment.