Skip to content

Latest commit

 

History

History
157 lines (115 loc) · 5.96 KB

README.md

File metadata and controls

157 lines (115 loc) · 5.96 KB

Project Scenario 🎩

You are a data engineer at a data analytics consulting company. You have been assigned to a project that aims to de-congest the national highways by analyzing the road traffic data from different toll plazas. As a vehicle passes a toll plaza, the vehicle’s data like vehicle_id, vehicle_type, toll_plaza_id, and timestamp are streamed to Kafka. Your job is to create a data pipeline that collects the streaming data and loads it into a database.

Objectives📝

  • In this project you will create a streaming data pipe by performing these steps:

    • Start a MySQL Database server.
    • Create a table to hold the toll data.
    • Start the Kafka server.
    • Install the Kafka Python driver.
    • Install the MySQL Python driver.
    • Create a topic named toll in Kafka.
    • Download the streaming data generator program.
    • Customize the generator program to steam to toll topic.
    • Download and customize streaming data consumer.
    • Customize the consumer program to write into a MySQL database table.
    • Verify that streamed data is being collected in the database table.

Reach/Follow me on 🚀

linkedIn    googleEmail    facebook


Prepare the lab environment 📦

  • Step 1: Download Kafka.
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
  • Step 2: Extract Kafka.
tar -xzf kafka_2.12-2.8.0.tgz
  • Step 3: Start MySQL server.
start_mysql
  • Step 4: Connect to the mysql server. Make sure you use the password given to you when the MySQL server starts.
mysql --host=127.0.0.1 --port=3306 --user=root --password=yourpassword
  • Step 5: Create a database named tolldata. At the ‘mysql>’ prompt, run the command below to create the database.
create database tolldata;
  • Step 6: Create a table named livetolldata with the schema to store the data generated by the traffic simulator. Run the following command to create the table:
use tolldata;

create table livetolldata(timestamp datetime,vehicle_id int,vehicle_type char(15),toll_plaza_id smallint);

Note: This is the table where you would store all the streamed data that comes from kafka. Each row is a record of when a vehicle has passed through a certain toll plaza along with its type and anonymized id.

  • Step 7: Disconnect from MySQL server.
exit
  • Step 8: Install the python module kafka-python using the pip command.
python3 -m pip install kafka-python

Note: This python module will help you to communicate with kafka server. It can used to send and receive messages from kafka.

  • Step 9: Install the python module mysql-connector-python using the pip command.
python3 -m pip install mysql-connector-python==8.0.31

Directions 🗺

Start Kafka with the following tasks

  1. Start Zookeeper
  2. Start Kafka server
  3. Create a topic named toll
  4. Download the Toll Traffic Simulator
  • Download the toll_traffic_generator.py from the url given below using ‘wget’.
https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py

  1. Open the toll_traffic_generator.py and set the topic to toll.
  2. Task 2.6 - Run the Toll Traffic Simulator
  • Run the toll_traffic_generator.py.

Hint : python3 <pythonfilename> runs a python program on your terminal.


  1. Configure streaming_data_reader.py
  • Download the streaming_data_reader.py from the url below using ‘wget’.
https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py
  • Open the streaming_data_reader.py and modify the following details so that the program can connect to your mysql server.

TOPIC

DATABASE

USERNAME

PASSWORD


  1. Run streaming_data_reader.py
python3 streaming_data_reader.py
  1. Health check of the streaming data pipeline.
  • If you have done all the steps till here correctly, the streaming toll data will get stored in the table livetolldata.

Try: List the top 10 rows in the table livetolldata.

SnapShot and Results 📸

  • I provided my solution for this project a Bash file script go and check it out.

  • After implementations your results of the Kafka pipeline should look like this:

    • Simulator output of task 6 simulator_output

    • Stream reader to run python file. Output of task 7
      streaming_reader_code

    • Data reader to start workflow with Kafka. Output of task 8
      data_reader_output

    • After running all previous commands correctly your final result should be like this. Output of task 9
      output_rows

Contributing 📝

Contributions are welcome! Please open an issue or pull request for any changes or improvements.