Skip to content

Commit

Permalink
Merge pull request #24 from interTwin-eu/dev-slangarita
Browse files Browse the repository at this point in the history
Kafka, SQS and S3
  • Loading branch information
SergioLangaritaBenitez authored Apr 18, 2024
2 parents f729232 + 032eb5e commit 5b364c4
Show file tree
Hide file tree
Showing 6 changed files with 990 additions and 22 deletions.
100 changes: 81 additions & 19 deletions cli/dcnios-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@
import yaml
from yaml.loader import SafeLoader
import json
from NifiManagement import *
from oscar_python.client import Client
import sys
from sources.auxiliaryFunctions import *
from sources.NifiManagement import *
from sources.aws import *
import boto3
import os
import argparse

folder="template/"
dcachefile=folder+"dcache.json"
oscarfile=folder+"InvokeOSCAR.json"
sqsfile=folder+"SQS_recive.json"
kafkafile=folder+"Kafka.json"
types=["dCache","OSCAR","S3","SQS","generic","Kafka"]
typesSSL=["Kafka"]

def updateComponent(type):
if "components" in type:
print("Process group: "+type["name"])
Expand All @@ -37,9 +47,9 @@ def updateComponent(type):

def doType(type,function):
if type in data["nifi"]:
for type in data["nifi"][type]:
function(type["name"])
print(str(function.__qualname__)+ " " + type["name"])
for singularoftype in data["nifi"][type]:
function(singularoftype["name"])
print(str(function.__qualname__)+ " " + singularoftype["name"])

def makeActionWithAllType(allType,function):
for type in allType:
Expand All @@ -49,12 +59,6 @@ def newProcessInfo(name):
print("New Process group: "+ str(name))


folder="template/"
dcachefile=folder+"dcache.json"
oscarfile=folder+"InvokeOSCAR.json"
types=["dcache","oscar","generic"]



parser = argparse.ArgumentParser(prog='ProgramName', description='What the program does', epilog='Text at the bottom of help')
parser.add_argument('option')
Expand All @@ -78,19 +82,21 @@ def newProcessInfo(name):
nifi_password=data["nifi"]["password"]
nifi=Nifi(nifi_endpoint,nifi_user,nifi_password)
if args.option == "apply":
if "dcache" in data["nifi"]:
for dcache in data["nifi"]["dcache"]:
nifi.create(dcache["name"],dcachefile)
if "dCache" in data["nifi"]:
for dcache in data["nifi"]["dCache"]:
dcachecontent=prepareforAll(dcachefile)
nifi.create(dcache["name"], dcachecontent )
command="simple-client.py --state /state/"+dcache["statefile"]+" --endpoint "+ \
dcache["endpoint"]+" --user "+dcache["user"]+" --password "+ \
dcache["password"]+" "+ dcache["folder"]
nifi.changeVariable(dcache["name"],"command",command)
newProcessInfo(dcache["name"])
updateComponent(dcache)

if "oscar" in data["nifi"]:
for oscar in data["nifi"]["oscar"]:
nifi.create(oscar["name"],oscarfile)
if "OSCAR" in data["nifi"]:
for oscar in data["nifi"]["OSCAR"]:
oscarcontent=prepareforAll(oscarfile)
nifi.create(oscar["name"],oscarcontent)
nifi.changeVariable(oscar["name"],"endpoint", oscar["endpoint"])
nifi.changeVariable(oscar["name"],"service", oscar["service"])
if "user" in oscar and "password" in oscar:
Expand All @@ -104,17 +110,73 @@ def newProcessInfo(name):
updateComponent(oscar)
if "generic" in data["nifi"]:
for generic in data["nifi"]["generic"]:
nifi.create(generic["name"],generic["file"])
genericcontent=prepareforAll(generic["file"])
nifi.create(generic["name"],genericcontent)
for variable in generic["variables"]:
nifi.changeVariable(generic["name"],variable,generic["variables"][variable])
newProcessInfo(generic["name"])
updateComponent(generic)

if "SQS" in data["nifi"]:
for sqs in data["nifi"]["SQS"]:
#Get credentials of AWS
getAWSCredentials(sqs)
#Create SQS
sqsDetails=createSQS(sqs)
#Prepare config
sqscontent=prepareforAll(sqsfile)
sqscontent=sqsPreparefile(sqscontent,sqs)
#Create object
nifi.create(sqs["name"],sqscontent)
nifi.changeVariable(sqs["name"],'queueurl',sqsDetails['QueueUrl'])
newProcessInfo(sqs["name"])
updateComponent(sqs)
if "S3" in data["nifi"]:
for s3 in data["nifi"]["S3"]:
#Get credentials of AWS
getAWSCredentials(s3)
#Create SQS
s3["queue_name"]= s3["AWS_S3_BUCKET"]+"_events"
sqsDetails=createSQS(s3)
#Create Notification from S3 event to SQS
s3NotificationSQS(s3)
#Prepare config
sqscontent=prepareforAll(sqsfile)
s3content=sqsPreparefile(sqscontent,s3)
#Create object
nifi.create(s3["name"],sqscontent)
nifi.changeVariable(s3["name"],'queueurl',sqsDetails['QueueUrl'])
newProcessInfo(s3["name"])
updateComponent(s3)
if "Kafka" in data["nifi"]:
for kafka in data["nifi"]["Kafka"]:
#Prepare config
kafkacontent=prepareforAll(kafkafile)
kafkacontent=kafkaPreparefile(kafkacontent,kafka)
#Set ssl context configuration
kafkacontent=ssl_context(kafkacontent,kafka["ssl_context"])
#Create object
nifi.create(kafka["name"],kafkacontent)
nifi.changeVariable(kafka["name"],"group_id", kafka["group_id"])
nifi.changeVariable(kafka["name"],"bootstrap_servers", kafka["bootstrap_servers"])
nifi.changeVariable(kafka["name"],"topic", kafka["topic"])
#enable SSL
nifi.enableSSL(kafka["name"])
newProcessInfo(kafka["name"])
updateComponent(kafka)
if "connection" in data["nifi"]:
for connection in data["nifi"]["connection"]:
nifi.makeConnection(connection["from"],connection["to"])
elif args.option == "delete":
makeActionWithAllType(typesSSL,nifi.disableSSL)
makeActionWithAllType(types,nifi.deleteProcess)
#Delete of SQS, not the notification
if "S3" in data["nifi"]:
for s3 in data["nifi"]["S3"]:
s3["queue_name"]= s3["AWS_S3_BUCKET"]+"_events"
deleteSQS(s3)
if "SQS" in data["nifi"]:
for sqs in data["nifi"]["S3"]:
deleteSQS(sqs)
elif args.option == "start":
makeActionWithAllType(types,nifi.startProcess)
elif args.option == "stop":
Expand Down
26 changes: 23 additions & 3 deletions cli/NifiManagement.py → cli/sources/NifiManagement.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ def getProcessGroup(self, process_groupName):
return None


def create(self,name,file):
def create(self,name,filecontent):
groupid=self.getProcessGroup(name)
if not groupid :
fields = {"groupName": name, "positionX": "-150","positionY": "-150","clientId" : "aaa",
"file": (file, open(file).read(), "application/json"),}
"file": ("namefile", json.dumps(filecontent).encode('utf-8'), "application/json"),}
body, header = encode_multipart_formdata(fields)
response= self.callHttp(requests.post,"/nifi-api/process-groups/root/process-groups/upload",body,header)

Expand Down Expand Up @@ -155,4 +155,24 @@ def executionNode(self, pg, process, node):

def nifiVersion(self):
response= self.callHttp(requests.get,"/nifi-api/system-diagnostics",'')
return response.json()["systemDiagnostics"]["aggregateSnapshot"]["versionInfo"]["niFiVersion"]
return response.json()["systemDiagnostics"]["aggregateSnapshot"]["versionInfo"]["niFiVersion"]

def enableSSL(self,name):
groupid=self.getProcessGroup(name)
response= self.callHttp(requests.get,"/nifi-api/flow/process-groups/"+groupid+"/controller-services",'')
for controller in response.json()["controllerServices"]:
if controller["parentGroupId"] == groupid:
ssl_context_id=controller["id"]
version=controller["revision"]["version"]
data='{"revision":{"clientId":"'+groupid+'","version":'+str(version)+'},"disconnectedNodeAcknowledged":false,"state":"ENABLED","uiOnly":true}'
response= self.callHttp(requests.put,"/nifi-api/controller-services/"+ ssl_context_id+"/run-status",data)

def disableSSL(self,name):
groupid=self.getProcessGroup(name)
response= self.callHttp(requests.get,"/nifi-api/flow/process-groups/"+groupid+"/controller-services",'')
for controller in response.json()["controllerServices"]:
if controller["parentGroupId"] == groupid:
ssl_context_id=controller["id"]
version=controller["revision"]["version"]
data='{"revision":{"clientId":"'+groupid+'","version":'+str(version)+'},"disconnectedNodeAcknowledged":false,"state":"DISABLED","uiOnly":true}'
response= self.callHttp(requests.put,"/nifi-api/controller-services/"+ ssl_context_id+"/run-status",data)
53 changes: 53 additions & 0 deletions cli/sources/auxiliaryFunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# dCNiOs
# Copyright (C) 2023 - GRyCAP - Universitat Politecnica de Valencia
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Apache 2.0 Licence as published by
# the Apache Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache 2.0 License for more details.
#
# You should have received a copy of the Apache 2.0 License
# along with this program. If not, see <https://www.apache.org/licenses/>.

#!/usr/bin/env python3

import json
import os

def addSensibleVariable(file,processorName,key,value):
for processor in file["flowContents"]["processors"]:
if processor["name"] == processorName:
processor["properties"][key]=value
return file


def prepareforAll(fileName):
with open(fileName) as f:
filecontent = json.load(f)
filecontent["snapshotMetadata"]={}
filecontent["snapshotMetadata"]["bucketIdentifier"]=""
filecontent["snapshotMetadata"]["flowIdentifier"]=""
filecontent["snapshotMetadata"]["version"]=-1
return filecontent


def kafkaPreparefile(filecontent,kafka):
filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","security.protocol",kafka["security_protocol"])
filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","sasl.mechanism",kafka["sasl_mechanism"])
filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","sasl.username",kafka["sasl_username"])
filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","sasl.password",kafka["sasl_password"])
if "separate_by_key" in kafka and kafka["separate_by_key"] == "true" :
filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","separate-by-key","true")
filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","message-demarcator",kafka["message_demarcator"])
else:
filecontent=addSensibleVariable(filecontent,"ConsumeKafka_2_6","separate-by-key","false")
return filecontent

def ssl_context(filecontent, variables):
for var in variables:
filecontent["flowContents"]["controllerServices"][0]["properties"][var.replace("_", " ")]=variables[var]
return filecontent
79 changes: 79 additions & 0 deletions cli/sources/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# dCNiOs
# Copyright (C) 2023 - GRyCAP - Universitat Politecnica de Valencia
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Apache 2.0 Licence as published by
# the Apache Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache 2.0 License for more details.
#
# You should have received a copy of the Apache 2.0 License
# along with this program. If not, see <https://www.apache.org/licenses/>.

#!/usr/bin/env python3

import json
import os
import boto3
from sources.auxiliaryFunctions import addSensibleVariable



def getAWSCredentials(configuration):
if "AWS_ACCESS_KEY_ID" in os.environ and os.environ["AWS_ACCESS_KEY_ID"] != "" and \
"AWS_SECRET_ACCESS_KEY" in os.environ and os.environ["AWS_SECRET_ACCESS_KEY"] != "" :
print("AWS Credentials: Credentials from environment")
configuration["AWS_ACCESS_KEY_ID"] = os.environ["AWS_ACCESS_KEY_ID"]
configuration["AWS_SECRET_ACCESS_KEY"] = os.environ["AWS_SECRET_ACCESS_KEY"]
elif os.path.exists(os.environ["HOME"]+"/.aws/credentials"):
print("AWS Credentials: Credentials from credentials file")
session = boto3.Session(profile_name="default")
credentials = session.get_credentials()
configuration["AWS_ACCESS_KEY_ID"]= credentials.access_key
configuration["AWS_SECRET_ACCESS_KEY"] =credentials.secret_key
elif "AWS_ACCESS_KEY_ID" in configuration and "AWS_SECRET_ACCESS_KEY" in configuration:
print("AWS Credentials: Credentials from configuration file")

def createSQS(configuration):
accountID=boto3.client('sts').get_caller_identity().get('Account')
sqsClient=boto3.client('sqs', region_name=configuration["AWS_DEFAULT_REGION"])
response = sqsClient.create_queue(QueueName=configuration["queue_name"], Attributes={
"SqsManagedSseEnabled":"false",
"Policy": '{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"__owner_statement","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::'+accountID+':root"},"Action":"SQS:*","Resource":"arn:aws:sqs:'+configuration["AWS_DEFAULT_REGION"]+':'+accountID+':'+configuration["queue_name"]+'"},{"Sid":"__sender_statement","Effect":"Allow","Principal":{"AWS":"*"},"Action":"SQS:SendMessage","Resource":"arn:aws:sqs:'+configuration["AWS_DEFAULT_REGION"]+':'+accountID+':'+configuration["queue_name"]+'"}]}'
})
return sqsClient.get_queue_url(QueueName=configuration["queue_name"])

def sqsPreparefile(filecontent,configuration):
filecontent=addSensibleVariable(filecontent,"GetSQS","Access Key",configuration["AWS_ACCESS_KEY_ID"])
filecontent=addSensibleVariable(filecontent,"GetSQS","Secret Key",configuration["AWS_SECRET_ACCESS_KEY"])
filecontent=addSensibleVariable(filecontent,"GetSQS","Region",configuration["AWS_DEFAULT_REGION"])
return filecontent

def deleteSQS(configuration):
sqsClient=boto3.client('sqs', region_name=configuration["AWS_DEFAULT_REGION"] )
queue = sqsClient.get_queue_url(QueueName=configuration["queue_name"])
response = sqsClient.delete_queue(QueueUrl=queue['QueueUrl'])


def s3NotificationSQS(configuration):
owner=boto3.client('sts').get_caller_identity().get('Account')
arn= "arn:aws:sqs:"+configuration["AWS_DEFAULT_REGION"]+ ":" +owner+":" + configuration["queue_name"]
s3 = boto3.resource('s3')
bucket_notification = s3.BucketNotification(configuration["AWS_S3_BUCKET"])
response = bucket_notification.put(
NotificationConfiguration={
'QueueConfigurations': [
{
'Id': configuration["AWS_S3_BUCKET"]+"_event",
'QueueArn': arn,
'Events': [
's3:ObjectCreated:*',
],
},
],
},
ExpectedBucketOwner=owner,
SkipDestinationValidation=False)
Loading

0 comments on commit 5b364c4

Please sign in to comment.