Producer send MQTT msg every second with a "status" field containing a random value between 0 and 6 to RabbitMQ. FasApi server consumes the MQTT msg from RabbitMQ and stores in MongoDB with timestamps. It provides an endpoint that accept timestamp range and return the count of each status within the specified time range using aggregate pipeline.
Docker server setup for RabbitMQ, MongoDB:
- Install docker desktop
- Navigate to directory that contains docker-compose file
- Run command to download images - docker-compose pull
- Run command to start containers in detached mode - docker-compose up -d
- Rabbitmq GUI management will be up and running on port 15672
- Mongodb admin GUI will be up and running on port 8081
Server setup for Fastapi:
- Install python 3.12
- Navigate to directory of fastapi app
- Run command to install dependencies - pip install -r requirements.txt
- Run command to start fastapi server - uvicorn main:app
- FastApi app will be up and running on port 8000
- API swagger UI - localhost:8000/docs
Producer (client script) :
- Install paho library by running cmd - pip install paho-mqtt
- Run the python file present in producer folder, it will start sending msgs to Rabbitmq every second. (RabbitMQ server should be up and running)
API endpoint to get aggregate data from mongodb - /aggregate
Date and Time Format - YYYY-MM-DD HH:MM:SS.SSS
Example -
start - 2024-07-05 09:00:00.000
end - 2024-07-05 09:05:00.000
response - [ { "Status": 5, "Count": 14 }, { "Status": 2, "Count": 14 }, { "Status": 6, "Count": 13 }, { "Status": 3, "Count": 11 }, { "Status": 0, "Count": 10 }, { "Status": 1, "Count": 10 }, { "Status": 4, "Count": 10 } ]
Default username/password for RabbitMQ GUI Management - user/password
Default username/password for MongoDB GUI Admin Panel - admin/pass