Skip to content

Mohamed-fawzyy/Kafka-Pipeline

Repository files navigation

Project Scenario 🎩

You are a data engineer at a data analytics consulting company. You have been assigned to a project that aims to de-congest the national highways by analyzing the road traffic data from different toll plazas. As a vehicle passes a toll plaza, the vehicle’s data like vehicle_id, vehicle_type, toll_plaza_id, and timestamp are streamed to Kafka. Your job is to create a data pipeline that collects the streaming data and loads it into a database.

Objectives📝

  • In this project you will create a streaming data pipe by performing these steps:

    • Start a MySQL Database server.
    • Create a table to hold the toll data.
    • Start the Kafka server.
    • Install the Kafka Python driver.
    • Install the MySQL Python driver.
    • Create a topic named toll in Kafka.
    • Download the streaming data generator program.
    • Customize the generator program to steam to toll topic.
    • Download and customize streaming data consumer.
    • Customize the consumer program to write into a MySQL database table.
    • Verify that streamed data is being collected in the database table.

Reach/Follow me on 🚀

linkedIn    googleEmail    facebook


Prepare the lab environment 📦

  • Step 1: Download Kafka.
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
  • Step 2: Extract Kafka.
tar -xzf kafka_2.12-2.8.0.tgz
  • Step 3: Start MySQL server.
start_mysql
  • Step 4: Connect to the mysql server. Make sure you use the password given to you when the MySQL server starts.
mysql --host=127.0.0.1 --port=3306 --user=root --password=yourpassword
  • Step 5: Create a database named tolldata. At the ‘mysql>’ prompt, run the command below to create the database.
create database tolldata;
  • Step 6: Create a table named livetolldata with the schema to store the data generated by the traffic simulator. Run the following command to create the table:
use tolldata;

create table livetolldata(timestamp datetime,vehicle_id int,vehicle_type char(15),toll_plaza_id smallint);

Note: This is the table where you would store all the streamed data that comes from kafka. Each row is a record of when a vehicle has passed through a certain toll plaza along with its type and anonymized id.

  • Step 7: Disconnect from MySQL server.
exit
  • Step 8: Install the python module kafka-python using the pip command.
python3 -m pip install kafka-python

Note: This python module will help you to communicate with kafka server. It can used to send and receive messages from kafka.

  • Step 9: Install the python module mysql-connector-python using the pip command.
python3 -m pip install mysql-connector-python==8.0.31

Directions 🗺

Start Kafka with the following tasks

  1. Start Zookeeper
  2. Start Kafka server
  3. Create a topic named toll
  4. Download the Toll Traffic Simulator
  • Download the toll_traffic_generator.py from the url given below using ‘wget’.
https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py

  1. Open the toll_traffic_generator.py and set the topic to toll.
  2. Task 2.6 - Run the Toll Traffic Simulator
  • Run the toll_traffic_generator.py.

Hint : python3 <pythonfilename> runs a python program on your terminal.


  1. Configure streaming_data_reader.py
  • Download the streaming_data_reader.py from the url below using ‘wget’.
https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py
  • Open the streaming_data_reader.py and modify the following details so that the program can connect to your mysql server.

TOPIC

DATABASE

USERNAME

PASSWORD


  1. Run streaming_data_reader.py
python3 streaming_data_reader.py
  1. Health check of the streaming data pipeline.
  • If you have done all the steps till here correctly, the streaming toll data will get stored in the table livetolldata.

Try: List the top 10 rows in the table livetolldata.

SnapShot and Results 📸

  • I provided my solution for this project a Bash file script go and check it out.

  • After implementations your results of the Kafka pipeline should look like this:

    • Simulator output of task 6 simulator_output

    • Stream reader to run python file. Output of task 7
      streaming_reader_code

    • Data reader to start workflow with Kafka. Output of task 8
      data_reader_output

    • After running all previous commands correctly your final result should be like this. Output of task 9
      output_rows

Contributing 📝

Contributions are welcome! Please open an issue or pull request for any changes or improvements.

About

Creating Streaming Data Pipelines using Kafka.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published