Skip to content

Commit

Permalink
towards replicating the rmq setup in dev
Browse files Browse the repository at this point in the history
  • Loading branch information
stan-dot committed Jul 25, 2024
1 parent add0e8f commit 7a3cfc6
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 1 deletion.
24 changes: 24 additions & 0 deletions src/event_emitter/start.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import stomp
import time
from some_event_library import SomeEvent # Replace with your actual library and event

class MyListener(stomp.ConnectionListener):
def on_error(self, frame):
print('received an error:', frame.body)

def on_message(self, frame):
print('received a message:', frame.body)

def send_event():
conn = stomp.Connection([('localhost', 5672)])
conn.set_listener('', MyListener())
conn.start()
conn.connect('user', 'password', wait=True)

event = SomeEvent(param1="value1", param2="value2") # Replace with actual event data
conn.send(body=event.json(), destination='/queue/test')

conn.disconnect()

if __name__ == "__main__":
send_event()
10 changes: 10 additions & 0 deletions src/message_bus/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: '3'
services:
rabbitmq:
image: rabbitmq:management
ports:
- "5672:5672" # AMQP port
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: password
1 change: 1 addition & 0 deletions src/message_bus/rabbitmq_plugins
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[rabbitmq_management,rabbitmq_stomp].
46 changes: 46 additions & 0 deletions src/plotting_server/simple_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from fastapi import FastAPI, WebSocket
import stomp
import json
from event_model import StreamData, StreamDatum

app = FastAPI()
clients = set()

class STOMPListener(stomp.ConnectionListener):
def on_error(self, frame):
print(f'Error: {frame.body}')

def on_message(self, frame):
print(f'Received message: {frame.body}')
message = frame.body
for client in clients:
client.send_json(json.loads(message))

@app.websocket("/ws/colors")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
clients.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception as e:
print(f"WebSocket error: {e}")
finally:
clients.remove(websocket)

def start_stomp_listener():
conn = stomp.Connection([('localhost', 5672)])
conn.set_listener('', STOMPListener())
conn.start()
conn.connect('user', 'password', wait=True)
conn.subscribe(destination='/queue/test', id=1, ack='auto')

if __name__ == "__main__":
import uvicorn
from threading import Thread

# Start the STOMP listener in a separate thread
thread = Thread(target=start_stomp_listener)
thread.start()

uvicorn.run(app, host="0.0.0.0", port=8000)
2 changes: 1 addition & 1 deletion src/plotting_server/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import matplotlib.pyplot as plt
import io
import asyncio

from event_model import StreamData, StreamDatum

app = FastAPI()

Expand Down

0 comments on commit 7a3cfc6

Please sign in to comment.