This is an IoT project conducted by students of the UAS JOANNEUM as part of the bachelor's programme of Information Management. The goal was to build two IoT pipelines. One using Kafka, Kafka Connect and Kafka Streams for ingestion and stream processing. The other uses MQTT, RabbitMQ as broker and Apache Flink as stream processing engine. Both pipelines persist data in an InfluxDB (Time-Series DB) and visualize data with Grafana.
After both pipelines have been deployed, they are load tested with Jmeter.
-
@AnotherCodeArtist is the Mentor and guiding person of this project.
-
@GregorFernbach is responsible for MQTT Data ingestion and Streamprocessing with Apache Flink.
-
@gzei is responsible for the infrastructure (Kubernetes, NFS, Hardware and VMs).
-
@vollmerm17 is responsible for Kafka and Streamprocessing with Kafka Streams.
-
@lachchri16 is responsible for visualisation.
-
@cynze is responsible for the databases and the connections between Kafka and those databases.
All members are responsible for the loadtesting.
This system uses two different sets of components for each pipeline. A common base is the Kubernetes infrastructure.
For the first pipeline, apache kafka is used in combination with kafka streams in the following structure:
- Apache kafka: Serves as Broker for incoming requests from clients.
- Kafka Streams: Processes the incoming data.
- Kafka Connect: Responsible for persisting data (raw sensor values as well as processed data) in an InfluxDB.
For the second pipeline, RabbitMQ is used along with ApacheFlink in the following structure:
- RabbitMQ: Serves as Broker for incoming requests from clients.
- ApacheFlink: Processes the incoming data; also responsible for persisting data (raw sensor values as well as processed data) in an InfluxDB.
The combination of ApacheFlink with Apache kafka is also implemented.
The data stored inside the InfluxDB is visualized with Grafana. The overall goal is to compare the performance of these two pipelines.
For testing, a fictional usecase was chosen. This usecase includes measurements of particulate matter within a city.
- Debian 10.1 servers
- external NFS server for centralized storage
- SSDs are highly recommended
- Apache Flink
- Maven
- Java
- IntelliJ as IDE recommended
In this folder all necessary files for recreating the Kubernetes cluster are stored. This includes:
- The Ansible Playbook for installing the prerequisites
- An example cluster.yml used for our implementation. This cluster utilizes three nodes, where the first one is a controlplane and etcd node. the two remaining nodes are etcd nodes as well as worker nodes.
In this folder all files necessary for the stream processing are stored.
- UsecaseExample is the whole Streamprocessing, its calculating diffrent things of the raw sensordata and creates a new stream in new topic.
- UsecaseGenProducer is a Producer which can be used during the developing time (instead of JMeter)
- Serde: This is necessary for the serializing and deserializing of the stream (https://github.com/gwenshap/kafka-streams-stockstats/tree/master/src/main/java/com/shapira/examples/streams/stockstats/serde)
- Models: in this directory are the POJOs defined and a own datatype CustomPair (Because Tuples or Triplets are immutable and not useful in our algorithm)
- Namespace_kafka.json is to create a namespaces called kafka.
- Streams-Deployment.yaml is the deployment of streams in a pod.
- ConnectorInluxRaw.json is the KafkaConnect deployment useed for the raw sensordata to the InfluxDB.
- ConnectorInflux.json is the KafkaConnect deployment used for the processed sensordata to the InfluxDB.
The Connector used is from https://docs.lenses.io/connectors/sink/influx.html.
In Databases all files for redeploying the databases CrateDB, InfluxDB and OpenTSDB on Kubernetes are stored. For both pipelines, MQTT and Kafka, only the database InfluxDB was used. The file values.yaml
is mainly taken from the helm chart stable/influxdb.
Is the Stream Processing Engine for the MQTT pipeline.
For the deployment of Apache Flink the official 'yml' files from the homepage (https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html) have been used. Moreover they are accessible under ApacheFlink\Deployment.
Under ApacheFlink\StreamingJobs the source code of the Streaming Jobs can be found. There are:
- DetectionJob-rmq: Represents the whole RabbitMQ Streaming Job with Persisting raw Sensor Data, processed Sensor Data and Area Output with Telegram.
- DetectionJob-rmq-loadtest: Represents the partial RabbitMQ Streaming Job which only persists raw Sensor Data and processed Sensor Data for the load test.
- DetectionJob-kafka: Represents the whole Kafka Streaming Job with Persisting raw Sensor Data, processed Sensor Data and Area Output with Telegram.
- DetectionJob-kafka-loadtest: Represents the partial Kafka Streaming Job which only persists raw Sensor Data and processed Sensor Data for the load test.
In order to create '.jar' files out of the source code, you need maven installed and have to run 'mvn clean install'. Then you can upload the jar to flink and submit with your parallelism properties.
For configuring the RabbitMQ connector see the [Official Docs]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html and the [Source Code]: https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq.
For configuring the Apache Kafka connector see the [Official Docs]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html, and also the [Training from Veverica]: https://training.ververica.com/exercises/toFromKafka.html.
For configuring the InfluxDB connector see the [Apache Bahir Docs]: https://bahir.apache.org/docs/flink/current/flink-streaming-influxdb/ from which this connector comes from.
For configuring the telegram API (using Botfather) see the [Official Docs]: https://core.telegram.org/
In this folder a fork of the https://github.com/BrightTag/kafkameter is stored. The changes include:
- Support for generating message Keys.
- Support for setting Sensor IDs for the message.
- Inside the Generators folder, the loadgenerator implementation can be found. For build instructions please consult the corresponding .bat file.
This folder also includes the testplans (JMeter version 5.1.1) used for testing Kafka and MQTT.
For the MQTT tests the [Jmeter MQTT plugin]: https://github.com/emqx/mqtt-jmeter has been used. Also the MQTT tests use kafkameter and the Java loadgenerating classes in the background. Therefor you also need these.
- Set up one or more VMs with Debian 10.1
- Set up Ansible for those VMs
- Apply Ansible Playbook provided in the k8s folder of this repository
- Install rke 0.3.2 or newer on any node
- Set up ssh key authentication from this node to all nodes in the cluster
- Modify cluster.yml or generate new one (rke config)
- Apply this config with rke up
- Connect to the cluster with the generated kubectl config
- Install Helm and Tiller
- Use Helm to install nfs-client storage provider (https://github.com/kubernetes-incubator/external-storage/tree/master/nfs-client) and set as default storage class
- Install MetalLB LoadBalancer (https://metallb.universe.tf/installation/)
- Optional: Install Kubernetes Dashboard
- Create Namespace influxdb
- Install the helm chart (https://github.com/helm/charts/tree/master/stable/influxdb) and use the namespace
- Open InfluxDB Shell
- Create Databases
(1-7 like described in https://crate.io/a/run-your-first-cratedb-cluster-on-kubernetes-part-one/)
- Create Namespace crate
- Create
crate-internal-service.yaml
- Apply it and create a service
- Create
crate-external-service.yaml
- Apply it and create a service
- Create
crate-controller.yaml
- Apply it and create the stateful set
- Install Crash (Shell for CrateDB)
- Create Databases
- Create Namespace opentsdb
- Create Secret
- Create Persistent Volume Claim
opentsdb-pc-claim.yaml
- Apply PVC
- Create
opentsdb-deployment.yaml
- Apply it in order to deploy OpenTSDB
- Create
opentsdb-service.yaml
- Apply and create the service
- Create Databases
- Create namespace by using the provided '.yml' file
- Configure 'values.yml' file as you need
- Install the helm chart with 'helm install stable/rabbitmq-ha --name rmq-cluster -f .\rabbitmq-values_VX.yaml --namespace mqtt'
- Use and Edit the provided '.yml' files from [here]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html.
- Create namespace with provided '.yml' file
- kubectl create -f flink-configuration-configmap.yml -n flink
- kubectl create -f jobmanager-service.yml -n flink
- kubectl create -f jobmanager-deployment.yml -n flink
- kubectl create -f taskmanager-deployment.yml -n flink
- kubectl create -f jobmanager-rest-service.yml -n flink
- Download Helmcharts(https://github.com/confluentinc/cp-helm-charts)
- Modify Helmcharts ( enable NodePorts etc.)
- Create Namespace
- Install Kafka with Helm
- Create topics
- Deploy connector for topics (Deployment for InfluxDB is necessary before this step)
- Start JMeter or Producer(for small first tries)
- Check topics
-
Download values.yaml from stable/grafana
-
Change type of the service to NodePort:
service: type: NodePort port: 80 targetPort: 3000
-
Change image section to the following:
image: repository: gzei/grafanatest tag: latest pullPolicy: Always
-
Change persistence to the following:
persistence: type: pvc enabled: true storageClassName: nfs-client accessModes: - ReadWriteOnce size: 8Gi
-
Set initChownData to false
initChownData: enabled: false
-
Set adminUser and adminPassword to default credentials
adminUser: admin adminPassword: [default_pw]
-
Set plugin path of grafana.ini to /var/lib/plugins
grafana.ini: paths: data: /var/lib/grafana/data logs: /var/log/grafana plugins: /var/lib/plugins provisioning: /etc/grafana/provisioning
-
Install Grafana with the following command: (assuming the customized values.yaml file exists in the current working directory)
helm install stable/grafana -f values.yaml --name grafana --namespace grafana
-
Access Grafana GUI on 172.17.100.51:XXXXX
-
Add necessary Data Sources:
- InfluxDB (database: kafka)
- InfluxDB (database: mqtt)
-
Add new dashboard and create new panel
-
In Visualization choose "Sensor Map"
-
Add two queries like this:
-
FROM default SensorData WHERE + SELECT field(id) + field(avgPM2) + field(lat) + field(long) + GROUP BY + FORMAT AS Table
-
FROM default SensorAreas WHERE + SELECT field(tooHigh) + GROUP BY tag(area) + FORMAT AS Table
-
-
While inside the dashboard, change the time range in the top right corner to the value of your choice.
Hints:
- Test data can not be read by the plugin if the timestamps of all existing points in the used database are out of range!
- The gui needs a browser refresh to read the correct data after the time range has been changed!