Skip to content

Commit

Permalink
Merge pull request #26 from interTwin-eu/dev-slangarita
Browse files Browse the repository at this point in the history
fix typo and update documentation
  • Loading branch information
SergioLangaritaBenitez authored Jul 5, 2024
2 parents 4e5523e + f21c1dd commit f38790a
Show file tree
Hide file tree
Showing 38 changed files with 90 additions and 98 deletions.
38 changes: 20 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
# dCNiOS
# DCNiOS

[dCache](http://dcache.org) is a system for storing and retrieving huge amounts of data, distributed among a large number of heterogeneous server nodes, under a single virtual filesystem tree with a variety of standard access methods.

DCNiOS is an open-source command-line tool to easily manage the creation of event-driven data processing flows. DCNiOS, Data Connector through Apache NiFi for OSCAR, facilitates the creation of event-driven processes connecting a Storage System like [dCache](http://dcache.org) or [S3](https://aws.amazon.com/s3) to a scalable OSCAR cluster by employing predefined dataflows that are processed by Apache NiFi.

[Apache NiFi](http://nifi.apache.org) is a reliable system to process and distribute data through powerful and scalable directed graphs of data routing, transformation, and system mediation logic.

[OSCAR](https://oscar.grycap.net) is an open-source platform for serverless event-driven data processing of containerized applications across the computing continuum.

Together with [dCNiOS](http://github.com/grycap/dcnios) (dCache + NiFi + OSCAR), you can manage the creation of event-driven data processing flows. As shown in the figure, when files are uploaded to dCache, events are ingested in Apache NiFi, which can queue them up depending on the (modifiable at runtime) ingestion rate, to be then delegated for processing into a scalable OSCAR cluster, where a user-defined application based on a Docker image can process the data file.

<img align="right" src="docpage/docs/images/dcnios-workflow.png" alt="dCNiOS Workflow" width="400"></left>

Therefore, dCNiOS has been made to interact with NiFi and deploy a complete dataflow. It uses HTTP calls to communicate with a Nifi cluster, which can be automatically deployed by the [Infrastructure Manager (IM)](https://im.egi.eu). Apache NiFi is deployed on a dynamically provisioned Kubernetes cluster running with a custom Docker image named `ghcr.io/grycap/nifi-sse:latest`. This new image includes a client for the [dCache SSE Event Interface](https://www.dcache.org/manuals/UserGuide-8.2/frontend.shtml#storage-events), kindly provided by Paul Millar in [GitHub](https://github.com/paulmillar/dcache-sse). It does not require a Nifi registry.

All the dataflow information is described in a YAML file, and by executing the dCNiOS command-line interface, this dataflow is deployed on Nifi.

From predefined recipes (ProcessGroup in Nifi, .json files) created before,
Together with [DCNiOS](http://github.com/grycap/dcnios) (Data Connector + NiFi + OSCAR), you can manage the creation of event-driven data processing flows. As shown in the figure, when an event occurs in the external component, dCache in this case, events are ingested in Apache NiFi, which can queue them up depending on the (modifiable at runtime) ingestion rate, to be then delegated for processing into a scalable OSCAR cluster, where a user-defined application based on a Docker image can process the data file.

dCNiOS inserts a general flow and changes the variables to create a concrete workflow.
<img align="right" src="docpage/docs/images/dcnios-workflow.png" alt="DCNiOS Workflow" width="400"></left>

By default, two process group recipes have been created:
Therefore, DCNiOS has been made to interact with NiFi and deploy a complete dataflow. It uses HTTP calls to communicate with a Nifi cluster, which can be automatically deployed by the [Infrastructure Manager (IM)](https://im.egi.eu). Apache NiFi is deployed on a dynamically provisioned Kubernetes. It does not require a Nifi registry.

All the dataflow information is described in a YAML file, and by executing the DCNiOS command-line interface, this dataflow is deployed on Nifi.

1. dcache, which is an active listener for a dCache instance. The [Server-sent Events SSE](https://en.wikipedia.org/wiki/Server-sent_events) client actively listens for these events in a user-defined folder in dCache. When a file is uploaded to that folder in dCache, NiFi will introduce the event in the dataflow.
2. InvokeOSCAR, an HTTP call to invoke an OSCAR service asynchronously. OSCAR supports this events specification to let the user decide whether the file should be pre-staged into the execution sandbox to locally process the data within an OSCAR job or to delegate the processing of the event into an external tool, such as a workflow orchestration platform, thus reducing data movements across the systems.
From predefined recipes (ProcessGroup in Nifi, .json files) created before, DCNiOS inserts a general flow and changes the variables to create a concrete workflow.


## Getting Started
Expand All @@ -48,7 +41,7 @@ Install all the requirements defined in `requirements.txt`
pip install -r requeriments.txt
```

Or only install the minimal requirements that dCNiOS needs.
Or only install the minimal requirements that DCNiOS needs.


``` bash
Expand All @@ -73,14 +66,23 @@ There is only one version in maintenance:

## Licensing

dCNiOS is licensed under the Apache License, Version 2.0. See LICENSE for the full license text.
DCNiOS is licensed under the Apache License, Version 2.0. See LICENSE for the full license text.

## Acknowledgements

This work was supported by the project “An interdisciplinary Digital Twin Engine for science’’ (interTwin), which has received funding from the European Union’s Horizon Europe Programme under Grant 101058386.

<img src="docpage/docs/images/inter-twin.png" alt="dCNiOS Workflow" width="200" >
<img src="docpage/docs/images/inter-twin.png" alt="DCNiOS Workflow" width="200" >

## More information

You can find more [information](https://oscar.grycap.net/blog/data-driven-processing-with-dcache-nifi-oscar/ ) in the [OSCAR's blog.](https://oscar.grycap.net/blog/)



<a href="https://eu.badgr.com/public/assertions/0vLlQBANQzyHMOrmcsck3w?identity__url=https:%2F%2Fgithub.com%2FEOSC-synergy%2Foscar.assess.sqaaas%2Fcommit%2F10254d15a9230f45c84dae22f3711653162faf78">
<img align="left" src="docpage/docs/images/badge_software_silver.png" alt="Silver Badge" width="90">
</a>

This software has received a silver badge according to the [Software Quality Baseline criteria](https://www.eosc-synergy.eu/for-developers/) defined by the [EOSC-Synergy](https://www.eosc-synergy.eu) project. Please acknowledge the use of DCNiOS by citing the following scientific
publications ([preprints available](https://www.grycap.upv.es/gmolto/publications)):
20 changes: 10 additions & 10 deletions cli/alterations/alteration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,31 @@

# !/usr/bin/env python3

from apis import NifiManagment
from apis import nifiManagment
from apis import auxiliaryFunctions


def createMerge(nifiConfiguration,information, name):
nameCompose= nameActionReturn(information["action"],name)
merge=auxiliaryFunctions.prepareforAll("./template/Alterations/Merge.json",information)
merge = auxiliaryFunctions.addSensibleVariable(merge, "MergeContent", "Maximum Number of Entries", information["maxMessages"])
merge=auxiliaryFunctions.prepareforAll("./template/alterations/Merge.json",information)
merge = auxiliaryFunctions.addSensitiveVariable(merge, "MergeContent", "Maximum Number of Entries", information["maxMessages"])
nifiConfiguration.create(nameCompose, merge)
nifiConfiguration.changeSchedule(nameCompose, "MergeContent", information["windowSeconds"])


def createDecode(nifiConfiguration,information,name):
nameCompose= nameActionReturn(information["action"],name)
merge=auxiliaryFunctions.prepareforAll("./template/Alterations/Encode_Decode.json",information)
merge = auxiliaryFunctions.addSensibleVariable(merge, "EncodeContent", "Mode", "Decode")
merge=auxiliaryFunctions.prepareforAll("./template/alterations/Encode_Decode.json",information)
merge = auxiliaryFunctions.addSensitiveVariable(merge, "EncodeContent", "Mode", "Decode")
if "Encoding" in information:
merge = auxiliaryFunctions.addSensibleVariable(merge, "EncodeContent", "Encoding", information["Encoding"])
merge = auxiliaryFunctions.addSensitiveVariable(merge, "EncodeContent", "Encoding", information["Encoding"])
nifiConfiguration.create(nameCompose, merge)

def createEncode(nifiConfiguration,information,name):
nameCompose= nameActionReturn(information["action"],name)
merge=auxiliaryFunctions.prepareforAll("./template/Alterations/Encode_Decode.json",information)
merge=auxiliaryFunctions.prepareforAll("./template/alterations/Encode_Decode.json",information)
if "Encoding" in information:
merge = auxiliaryFunctions.addSensibleVariable(merge, "EncodeContent", "Encoding", information["Encoding"])
merge = auxiliaryFunctions.addSensitiveVariable(merge, "EncodeContent", "Encoding", information["Encoding"])
nifiConfiguration.create(nameCompose, merge)


Expand All @@ -53,10 +53,10 @@ def createAlteration(nifiConfiguration,allInformation):
createEncode(nifiConfiguration,alter,name)
elif alter["action"]=="Decode":
createDecode(nifiConfiguration,alter,name)
conectAlteration(nifiConfiguration,allInformation)
connectAlteration(nifiConfiguration,allInformation)


def conectAlteration(nifiConfiguration,allInformation):
def connectAlteration(nifiConfiguration,allInformation):
name=allInformation["name"]
for index,step in enumerate(allInformation["alterations"]):
if index == 0:
Expand Down
2 changes: 1 addition & 1 deletion cli/apis/auxiliaryFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from alterations import alteration


def addSensibleVariable(file, processorName, key, value):
def addSensitiveVariable(file, processorName, key, value):
for processor in file["flowContents"]["processors"]:
if processor["name"] == processorName:
processor["properties"][key] = value
Expand Down
10 changes: 5 additions & 5 deletions cli/apis/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import json
import os
import boto3
from apis.auxiliaryFunctions import addSensibleVariable
from apis.auxiliaryFunctions import addSensitiveVariable
from apis import auxiliaryFunctions
from apis import NifiManagment
from apis import nifiManagment

def getAWSCredentials(configuration):
if "AWS_ACCESS_KEY_ID" in os.environ and os.environ["AWS_ACCESS_KEY_ID"] != "" \
Expand Down Expand Up @@ -76,11 +76,11 @@ def createSQSQueue(configuration):


def awsCredentialPreparefile(filecontent, configuration,processorName):
filecontent = addSensibleVariable(filecontent, processorName, "Access Key",
filecontent = addSensitiveVariable(filecontent, processorName, "Access Key",
configuration["AWS_ACCESS_KEY_ID"])
filecontent = addSensibleVariable(filecontent, processorName, "Secret Key",
filecontent = addSensitiveVariable(filecontent, processorName, "Secret Key",
configuration["AWS_SECRET_ACCESS_KEY"])
filecontent = addSensibleVariable(filecontent, processorName, "Region",
filecontent = addSensitiveVariable(filecontent, processorName, "Region",
configuration["AWS_DEFAULT_REGION"])
return filecontent

Expand Down
File renamed without changes.
9 changes: 3 additions & 6 deletions cli/dcnios-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
import yaml
from yaml.loader import SafeLoader
import json
#from oscar_python.client import Client
from apis.auxiliaryFunctions import *
from apis.NifiManagment import *
from apis.nifiManagment import *
from apis.aws import *
from sources.dcache import *
from destinations.OSCAR import *
from sources.Kafka import *
from destinations.oscar import *
from sources.kafka import *
from sources.generic import *
#import boto3
#import os
import env
import argparse

Expand Down
2 changes: 1 addition & 1 deletion cli/destinations/OSCAR.py → cli/destinations/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# !/usr/bin/env python3

from apis import auxiliaryFunctions
from apis import NifiManagment
from apis import nifiManagment
from oscar_python.client import Client


Expand Down
6 changes: 3 additions & 3 deletions cli/destinations/s3sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
# !/usr/bin/env python3

from apis import auxiliaryFunctions
from apis import NifiManagment
from apis import nifiManagment
from apis import aws




def createPutS3(nifiConfiguration,s3Info,s3content):
if "MinIO_Endpoint" in s3Info:
auxiliaryFunctions.addSensibleVariable(s3content, "PutS3Object", "Endpoint Override URL", s3Info["MinIO_Endpoint"])
auxiliaryFunctions.addSensitiveVariable(s3content, "PutS3Object", "Endpoint Override URL", s3Info["MinIO_Endpoint"])
s3Info["AWS_DEFAULT_REGION"]="us-east-1"
else:
aws.getAWSCredentials(s3Info)
s3content = aws.awsCredentialPreparefile(s3content, s3Info,"PutS3Object")
auxiliaryFunctions.addSensibleVariable(s3content, "PutS3Object", "Bucket", s3Info["AWS_S3_BUCKET"])
auxiliaryFunctions.addSensitiveVariable(s3content, "PutS3Object", "Bucket", s3Info["AWS_S3_BUCKET"])
nifiConfiguration.create(s3Info["name"], s3content)

8 changes: 4 additions & 4 deletions cli/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

from sources.dcache import createDcache
from sources.s3sqs import createGetS3,createGetSQS
from sources.Kafka import createKafka
from sources.kafka import createKafka
from sources.generic import createGeneric
from destinations.OSCAR import createOSCAR
from destinations.oscar import createOSCAR
from destinations.s3sqs import createPutS3

folderSource = "template/Sources/"
folderSource = "template/sources/"

kafkafile = folderSource+"Kafka.json"
dcachefile = folderSource+"dcache.json"
sqsfile = folderSource+"SQS_recive.json"

folderDestination = "template/Destinations/"
folderDestination = "template/destinations/"
putS3file= folderDestination+ "PutS3.json"
oscarfile = folderDestination+"InvokeOSCAR.json"

Expand Down
2 changes: 1 addition & 1 deletion cli/sources/dcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# !/usr/bin/env python3

from apis import auxiliaryFunctions
from apis import NifiManagment
from apis import nifiManagment



Expand Down
2 changes: 1 addition & 1 deletion cli/sources/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# !/usr/bin/env python3

from apis import auxiliaryFunctions
from apis import NifiManagment
from apis import nifiManagment

def createGeneric(nifiConfiguration,genricInfo,genericcontent):
nifiConfiguration.create(genricInfo["name"], genericcontent)
Expand Down
20 changes: 10 additions & 10 deletions cli/sources/Kafka.py → cli/sources/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# !/usr/bin/env python3

from apis import auxiliaryFunctions
from apis import NifiManagment
from apis import nifiManagment

def createKafka(nifiConfiguration,kafkaInfo,kafkacontent):
# Prepare config
Expand Down Expand Up @@ -45,32 +45,32 @@ def createKafka(nifiConfiguration,kafkaInfo,kafkacontent):

def kafkaPreparefile(filecontent, kafka):
if "security_protocol" not in kafka:
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"security.protocol", "SASL_SSL")
else:
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"security.protocol",
kafka["security_protocol"])
if "" not in kafka:
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"sasl.mechanism", "PLAIN")
else:
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"sasl.mechanism",
kafka["sasl_mechanism"])
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"sasl.username",
kafka["sasl_username"])
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"sasl.password",
kafka["sasl_password"])
if "separate_by_key" in kafka and kafka["separate_by_key"] == "true":
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"separate-by-key", "true")
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"message-demarcator",
kafka["message_demarcator"])
else:
filecontent = auxiliaryFunctions.addSensibleVariable(filecontent, "ConsumeKafka_2_6",
filecontent = auxiliaryFunctions.addSensitiveVariable(filecontent, "ConsumeKafka_2_6",
"separate-by-key", "false")
return filecontent
3 changes: 1 addition & 2 deletions cli/sources/s3sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import json
import os
import boto3
from apis.auxiliaryFunctions import addSensibleVariable
from apis import auxiliaryFunctions
from apis import NifiManagment
from apis import nifiManagment
from apis import aws


Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion docpage/docs/03.- Sources/S3.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sidebar_position: 3
---
# S3

The S3 Source captures an ObjectCreated event from an AWS S3 bucket. DCNiOS creates S3 bucket event redirections to SQS queue. Then, Apache NiFi captures the event and introduces it to the dataflow. The whole pipeline is created using DCNiOS.But, SQS queue is deleted with DCNiOS, but the Event Notification in the S3 section needs to be removed manually.
The S3 Source captures an ObjectCreated event from an AWS S3 bucket. DCNiOS creates S3 bucket event redirections to SQS queue. Then, Apache NiFi captures the event and introduces it to the dataflow. The whole pipeline is created using DCNiOS. But, SQS queue is deleted with DCNiOS, but the Event Notification in the S3 section needs to be removed manually.

The S3 Source requires:
- An identifier name of the process. It must be unique. Required.
Expand Down
4 changes: 2 additions & 2 deletions docpage/docs/AWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ DCNiOS can use some AWS as input. A valid pair of AWS Access Key and AWS Secret


AWS_DEFAULT_REGION is mandatory in any Source that uses AWS in the configuration file. These ProcessGroups can employ AWS credentials:
- [SQS](/docs/0.2.- Sources/SQS)
- [S3](/docs/0.2.- Sources/S3)
- [SQS](/docs/Sources/SQS)
- [S3](/docs/Sources/S3)

Loading

0 comments on commit f38790a

Please sign in to comment.