This project aims to build a real-time data pipeline from a database to a transformation service. It utilizes Docker and several services defined in the docker-compose.yaml
file. Below are the details of each service:
-
debezium: Captures the row-level changes in particular table(s) from the database and publishes them to Kafka topics.
-
debezium-connector: Registers the debezium-postgresql configuration. It includes parameters like the database details (name, password, etc.), the list of tables to capture data from, the topic name (to which the changes will be published), and any Single Message Transformation (SMT) to apply to the messages/payload before they are sent to the Kafka topic.
-
zookeeper: Manages and maintains the Kafka clusters and offers functionalities like Leader election, Service Discovery, etc.
-
kafka: Sets up the Kafka broker for distributed streaming.
-
pyspark_consumer: Utilizes PySpark to consume the data change messages from Kafka topics, perform some transformation on them, and output them to another Kafka topic.
-
(optional) jupyter: For experimenting with the Pyspark code for transformation.
-
(optional) schema-registry: For managing and validating schemas used in Kafka topics. Schemas define the structure and format of data within Kafka messages. It helps ensure Data Consistency and Schema evolution.
docker compose up --build -d
docker exec -it kafka kafka-topics \
--list \
--bootstrap-server localhost:29092
docker exec -ti kafka kafka-console-consumer \
--topic rds_first_topic \
--bootstrap-server localhost:29092 \
--from-beginning
docker compose logs -f jupyter
Solution:
docker compose down
sudo lsof -i -P -n | grep <port number> # List who's using the port
sudo kill -9 <process_id>
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @connector.json
DBInstanceIdentifier = "database-1"
DBInstanceClass = "db.t3.micro"
Engine="postgres"
MasterUsername= "municipal_user",
DBParameterGroupName = "default.postgres15" -> "postgres15wal" <-- new
AvailabilityZone = "ap-south-1bregion = "ap-south-1"
"VpcSecurityGroups": [
{
"VpcSecurityGroupId": "sg-086ca293a4051c85f",
"Status": "active"
}
]