Skip to content

Commit

Permalink
Merge pull request #25 from interTwin-eu/dev-slangarita
Browse files Browse the repository at this point in the history
change on kafka and update documentation
  • Loading branch information
SergioLangaritaBenitez authored Jun 10, 2024
2 parents 5b364c4 + 20cda7a commit 4e5523e
Show file tree
Hide file tree
Showing 40 changed files with 3,402 additions and 1,647 deletions.
68 changes: 68 additions & 0 deletions cli/alterations/alteration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# dCNiOs
# Copyright (C) 2023 - GRyCAP - Universitat Politecnica de Valencia
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Apache 2.0 Licence as published by
# the Apache Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache 2.0 License for more details.
#
# You should have received a copy of the Apache 2.0 License
# along with this program. If not, see <https://www.apache.org/licenses/>.

# !/usr/bin/env python3

from apis import NifiManagment
from apis import auxiliaryFunctions


def createMerge(nifiConfiguration,information, name):
nameCompose= nameActionReturn(information["action"],name)
merge=auxiliaryFunctions.prepareforAll("./template/Alterations/Merge.json",information)
merge = auxiliaryFunctions.addSensibleVariable(merge, "MergeContent", "Maximum Number of Entries", information["maxMessages"])
nifiConfiguration.create(nameCompose, merge)
nifiConfiguration.changeSchedule(nameCompose, "MergeContent", information["windowSeconds"])


def createDecode(nifiConfiguration,information,name):
nameCompose= nameActionReturn(information["action"],name)
merge=auxiliaryFunctions.prepareforAll("./template/Alterations/Encode_Decode.json",information)
merge = auxiliaryFunctions.addSensibleVariable(merge, "EncodeContent", "Mode", "Decode")
if "Encoding" in information:
merge = auxiliaryFunctions.addSensibleVariable(merge, "EncodeContent", "Encoding", information["Encoding"])
nifiConfiguration.create(nameCompose, merge)

def createEncode(nifiConfiguration,information,name):
nameCompose= nameActionReturn(information["action"],name)
merge=auxiliaryFunctions.prepareforAll("./template/Alterations/Encode_Decode.json",information)
if "Encoding" in information:
merge = auxiliaryFunctions.addSensibleVariable(merge, "EncodeContent", "Encoding", information["Encoding"])
nifiConfiguration.create(nameCompose, merge)



def createAlteration(nifiConfiguration,allInformation):
name=allInformation["name"]
for alter in allInformation["alterations"]:
if alter["action"]=="Merge":
createMerge(nifiConfiguration,alter,name)
elif alter["action"]=="Encode":
createEncode(nifiConfiguration,alter,name)
elif alter["action"]=="Decode":
createDecode(nifiConfiguration,alter,name)
conectAlteration(nifiConfiguration,allInformation)


def conectAlteration(nifiConfiguration,allInformation):
name=allInformation["name"]
for index,step in enumerate(allInformation["alterations"]):
if index == 0:
nifiConfiguration.makeConnection(name,nameActionReturn(step["action"],name))
else:
nifiConfiguration.makeConnection(nameActionReturn(allInformation["alterations"][index-1]["action"],name),nameActionReturn(step["action"],name))

def nameActionReturn(nameAction,nameSource):
return nameAction+ " of "+ nameSource
234 changes: 234 additions & 0 deletions cli/apis/NifiManagment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# dCNiOs
# Copyright (C) 2023 - GRyCAP - Universitat Politecnica de Valencia
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Apache 2.0 licence as published by
# the Apache Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache 2.0 License for more details.
#
# You should have received a copy of the Apache 2.0 License
# along with this program. If not, see <https://www.apache.org/licenses/>.

import requests
import json
import urllib3
from requests.auth import HTTPBasicAuth
from urllib3 import encode_multipart_formdata
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class Nifi:

def __init__(self, nifi_endPoint, user, password):
self.nifi_url = nifi_endPoint # +":"+nifi_Port
self.basic = HTTPBasicAuth(user, password)

def callHttp(self, type, link, data, header='application/json'):
try:
response = type(self.nifi_url+link,
headers={'Content-Type': header},
data=data, auth=self.basic, verify=False)
return response
except requests.exceptions as e: # This is the correct syntax
print(e)

def startProcess(self, name):
process_groupid = self.getProcessGroup(name)
link = "/nifi-api/flow/process-groups/" + process_groupid
data = '{"id":"' + process_groupid + '","state":"RUNNING"}'
response = self.callHttp(requests.put, link, data)

def stopProcess(self, name):
process_groupid = self.getProcessGroup(name)
link = "/nifi-api/flow/process-groups/" + process_groupid
data = '{"id":"' + process_groupid + '","state":"STOPPED"}'
response = self.callHttp(requests.put, link, data)

def makeConnection(self, fromName, toName):
fromGroupid = self.getProcessGroup(fromName)
toGroupid = self.getProcessGroup(toName)
response = self.callHttp(requests.get,
"/nifi-api/process-groups/root/connections",
'')
for connection in response.json()["connections"]:
if connection["component"]["source"]["groupId"] == fromGroupid \
and connection["component"]["destination"]["groupId"] == \
toGroupid:
return connection["id"]
response = self.callHttp(requests.get, "/nifi-api/process-groups/"
+ toGroupid + "/input-ports", '')
destinationid = response.json()["inputPorts"][0]["id"]

response = self.callHttp(requests.get, "/nifi-api/process-groups/"
+ fromGroupid + "/output-ports", '')
sourceid = response.json()["outputPorts"][0]["id"]
link = "/nifi-api/process-groups/root/connections"
data = '{"revision":{"version": 0},"component": {' \
+ ' "source": { "id": "' \
+ sourceid + '", "groupId": "' + fromGroupid \
+ '", "type": "OUTPUT_PORT" }, "destination": { "id": "' \
+ destinationid + '", "groupId": "' + toGroupid \
+ '", "type": "INPUT_PORT" } } }'
response = self.callHttp(requests.post, link, data)
return response.json()["id"]

def deleteProcess(self, name):
groupid = self.getProcessGroup(name)
if not groupid:
return None
process_group = groupid
response = self.callHttp(requests.get,
"/nifi-api/process-groups/root/connections", '')
for connection in response.json()["connections"]:
if connection["component"]["source"]["groupId"] == process_group \
or connection["component"]["destination"]["groupId"] == process_group:
link = "/nifi-api/connections/" + connection["id"] \
+ "?version=" + str(connection["revision"]["version"])
response = self.callHttp(requests.delete, link, '')

response = self.callHttp(requests.get, "/nifi-api/process-groups/"
+ process_group, '')
version = str(response.json()["revision"]["version"])
response = self.callHttp(requests.delete, "/nifi-api/process-groups/"
+ process_group + "?version=" + version, '')

def getProcessGroup(self, process_groupName):
response = self.callHttp(requests.get,
"/nifi-api/process-groups"
+ "/root/process-groups", '')
for pg in response.json()["processGroups"]:
if (pg["component"]["name"] == process_groupName):
return pg["id"]
return None

def create(self, name, filecontent):
groupid = self.getProcessGroup(name)
if not groupid:
fields = {"groupName": name, "positionX": "-150",
"positionY": "-150", "clientId": "aaa",
"file": ("namefile",
json.dumps(filecontent).encode('utf-8'),
"application/json"), }
body, header = encode_multipart_formdata(fields)
response = self.callHttp(requests.post,
"/nifi-api/process-groups"
+ "/root/process-groups/upload",
body,
header)

def changeVariable(self, name, key, value):
id = self.getProcessGroup(name)
response = self.callHttp(requests.get,
"/nifi-api/process-groups/"
+ id + "/variable-registry",
'')
PGRversion = response.json()["processGroupRevision"]["version"]
data = '{"processGroupRevision": { "version": ' \
+ str(PGRversion) + '}, \
"variableRegistry": {"variables": [ { "variable": { \
"name": "' + key + '", \
"value": "' + value + '" \
} } ], "processGroupId": "' + id + '" }}'
response = self.callHttp(requests.put, "/nifi-api/process-groups/"
+ id + "/variable-registry", data)

def updateProcessor(self, pg, process, accion):
id = self.getProcessGroup(pg)
response = self.callHttp(requests.get,
"/nifi-api/flow/process-groups/" + id, '')
components = response.json()["processGroupFlow"]["flow"]["processors"]
found = False
for comp in components:
if comp["component"]["name"] == process:
processid = comp["component"]["id"]
state = comp["component"]["state"]
found = True
if not found:
return False
else:
if state == "RUNNING":
self.stopProcess(pg)
response = self.callHttp(requests.get, "/nifi-api/processors/"
+ processid, '')
version = response.json()["revision"]["version"]
execution = response.json()["component"]["config"]["executionNode"]
data = '{"component": {"id":"' + str(processid) + '", "name":"' \
+ str(process) + '",\
"config":{'+accion+' }, "state":"STOPPED"},\
"revision":{"version":' + str(version) \
+ '}, "disconnectedNodeAcknowledged":false}'
response = self.callHttp(requests.put, "/nifi-api/processors/"
+ processid, data)
if state == "RUNNING":
self.startProcess(pg)
return True

def changeSchedule(self, pg, process, seconds):
self.updateProcessor(pg, process, '"schedulingPeriod":"'
+ str(seconds) + ' sec"')

def executionNode(self, pg, process, node):
if node == "PRIMARY" or node == "ALL":
self.updateProcessor(pg, process, '"executionNode":"'+node+'"')
else:
pass

def nifiVersion(self):
response = self.callHttp(requests.get,
"/nifi-api/system-diagnostics", '')
infoversion = response.json()["systemDiagnostics"]["aggregateSnapshot"]
return infoversion["versionInfo"]["niFiVersion"]

def enableSSL(self, name):
groupid = self.getProcessGroup(name)
response = self.callHttp(requests.get, "/nifi-api/flow/process-groups/"
+ groupid + "/controller-services", '')
for controller in response.json()["controllerServices"]:
if controller["parentGroupId"] == groupid:
ssl_context_id = controller["id"]
version = controller["revision"]["version"]
data = '{"revision":{"clientId":"' + groupid + '","version":' \
+ str(version) + '},"disconnectedNodeAcknowledged":' \
+ 'false,"state":"ENABLED","uiOnly":true}'
response = self.callHttp(requests.put,
"/nifi-api/controller-services/"
+ ssl_context_id+"/run-status", data)

def disableSSL(self, name):
groupid = self.getProcessGroup(name)
response = self.callHttp(requests.get, "/nifi-api/flow/process-groups/"
+ groupid + "/controller-services", '')
for controller in response.json()["controllerServices"]:
if controller["parentGroupId"] == groupid:
ssl_context_id = controller["id"]
version = controller["revision"]["version"]
data = '{"revision":{"clientId":"' + groupid \
+ '","version":' + str(version) + '},"' \
+ 'disconnectedNodeAcknowledged":false,' \
+ '"state":"DISABLED","uiOnly":true}'
response = self.callHttp(requests.put,
"/nifi-api/controller-services/"
+ ssl_context_id + "/run-status",
data)

def updateComponent(self, type):
if "components" in type:
print("Process group: "+type["name"])
for component in type["components"]:
print("\t- Process: " + component["name"])
if "seconds" in component:
self.changeSchedule(type["name"], component["name"],
component["seconds"])
print("\t New schedule time: "
+ str(component["seconds"]) + " seconds")
if "node" in component:
self.executionNode(type["name"],
component["name"], component["node"])
print("\t Now executing in node: " + component["node"])

def newProcessInfo(self, name):
print("New Process group: " + str(name))
72 changes: 72 additions & 0 deletions cli/apis/auxiliaryFunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# dCNiOs
# Copyright (C) 2023 - GRyCAP - Universitat Politecnica de Valencia
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Apache 2.0 Licence as published by
# the Apache Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache 2.0 License for more details.
#
# You should have received a copy of the Apache 2.0 License
# along with this program. If not, see <https://www.apache.org/licenses/>.

# !/usr/bin/env python3

import json
import os
from alterations import alteration


def addSensibleVariable(file, processorName, key, value):
for processor in file["flowContents"]["processors"]:
if processor["name"] == processorName:
processor["properties"][key] = value
return file


def prepareforAll(fileName, info):
with open(fileName) as f:
filecontent = json.load(f)
filecontent["snapshotMetadata"] = {}
filecontent["snapshotMetadata"]["bucketIdentifier"] = ""
filecontent["snapshotMetadata"]["flowIdentifier"] = ""
filecontent["snapshotMetadata"]["version"] = -1
if checkExistSsl_context(info):
filecontent = ssl_context(filecontent,info["ssl_context"])
return filecontent

def postJob(info,nifi):
if checkExistSsl_context(info):
print("Nifi enable ssl "+ info["name"])
nifi.enableSSL(info["name"])
nifi.newProcessInfo(info["name"])
nifi.updateComponent(info)
if "alterations" in info:
alteration.createAlteration(nifi,info)



def checkExistSsl_context(info):
if 'ssl_context' in info:
return True
else:
return False

def ssl_context(filecontent, variables):
if filecontent["flowContents"]["controllerServices"]==[]:
introduceSSL(filecontent)
else:
pass
for var in variables:
change = filecontent["flowContents"]["controllerServices"][0]
change["properties"][var.replace("_", " ")] = variables[var]
return filecontent


def introduceSSL(filecontent):
with open("apis/sslExample") as f:
sslcontent = json.load(f)
filecontent["flowContents"]["controllerServices"] = sslcontent
Loading

0 comments on commit 4e5523e

Please sign in to comment.