You are a data engineer at a data analytics consulting company. You have been assigned to a project that aims to de-congest the national highways by analyzing the road traffic data from different toll plazas. As a vehicle passes a toll plaza, the vehicle’s data like vehicle_id
, vehicle_type
, toll_plaza_id
, and timestamp are streamed to Kafka. Your job is to create a data pipeline that collects the streaming data and loads it into a database.
-
In this project you will create a streaming data pipe by performing these steps:
- Start a MySQL Database server.
- Create a table to hold the toll data.
- Start the Kafka server.
- Install the Kafka Python driver.
- Install the MySQL Python driver.
- Create a topic named toll in Kafka.
- Download the streaming data generator program.
- Customize the generator program to steam to toll topic.
- Download and customize streaming data consumer.
- Customize the consumer program to write into a MySQL database table.
- Verify that streamed data is being collected in the database table.
- Step 1: Download Kafka.
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
- Step 2: Extract Kafka.
tar -xzf kafka_2.12-2.8.0.tgz
- Step 3: Start MySQL server.
start_mysql
- Step 4: Connect to the mysql server. Make sure you use the password given to you when the MySQL server starts.
mysql --host=127.0.0.1 --port=3306 --user=root --password=yourpassword
- Step 5: Create a database named
tolldata
. At the ‘mysql>’ prompt, run the command below to create the database.
create database tolldata;
- Step 6: Create a table named
livetolldata
with the schema to store the data generated by the traffic simulator. Run the following command to create the table:
use tolldata;
create table livetolldata(timestamp datetime,vehicle_id int,vehicle_type char(15),toll_plaza_id smallint);
Note: This is the table where you would store all the streamed data that comes from kafka. Each row is a record of when a vehicle has passed through a certain toll plaza along with its type and anonymized id.
- Step 7: Disconnect from MySQL server.
exit
- Step 8: Install the python module
kafka-python
using the pip command.
python3 -m pip install kafka-python
Note: This python module will help you to communicate with kafka server. It can used to send and receive messages from kafka.
- Step 9: Install the python module
mysql-connector-python
using the pip command.
python3 -m pip install mysql-connector-python==8.0.31
Start Kafka with the following tasks
- Start Zookeeper
- Start Kafka server
- Create a topic named
toll
- Download the Toll Traffic Simulator
- Download the
toll_traffic_generator.py
from the url given below using ‘wget’.
https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py
- Open the
toll_traffic_generator.p
y and set the topic totoll
. - Task 2.6 - Run the Toll Traffic Simulator
- Run the
toll_traffic_generator.py
.
Hint :
python3 <pythonfilename>
runs a python program on your terminal.
- Configure
streaming_data_reader.py
- Download the
streaming_data_reader.py
from the url below using ‘wget’.
https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py
- Open the
streaming_data_reader.py
and modify the following details so that the program can connect to your mysql server.
TOPIC
DATABASE
USERNAME
PASSWORD
- Run
streaming_data_reader.py
python3 streaming_data_reader.py
- Health check of the streaming data pipeline.
- If you have done all the steps till here correctly, the streaming toll data will get stored in the table
livetolldata
.
Try: List the top 10 rows in the table
livetolldata
.
-
I provided my solution for this project a Bash file script go and check it out.
-
After implementations your results of the Kafka pipeline should look like this:
Contributions are welcome! Please open an issue or pull request for any changes or improvements.