AsyncAioPikaManager connecting to RabbitMQ #1167
-
Hi, I took the example from this repo and modified it by what I found from other discussions from here: from aiohttp import web
import socketio
sio = socketio.AsyncServer(async_mode='aiohttp', channel='super.test', client_manager=socketio.AsyncAioPikaManager('amqp://guest:guest@localhost:5672//'))
app = web.Application()
sio.attach(app)
async def background_task():
"""Example of how to send server generated events to clients."""
count = 0
while True:
await sio.sleep(10)
count += 1
await sio.emit('my_response', {'data': 'Server generated event'})
async def index(request):
with open('app.html') as f:
return web.Response(text=f.read(), content_type='text/html')
@sio.event
async def my_event(sid, message):
await sio.emit('my_response', {'data': message['data']}, room=sid)
@sio.event
async def my_broadcast_event(sid, message):
await sio.emit('my_response', {'data': message['data']})
@sio.event
async def join(sid, message):
sio.enter_room(sid, message['room'])
await sio.emit('my_response', {'data': 'Entered room: ' + message['room']},
room=sid)
@sio.event
async def leave(sid, message):
sio.leave_room(sid, message['room'])
await sio.emit('my_response', {'data': 'Left room: ' + message['room']},
room=sid)
@sio.event
async def close_room(sid, message):
await sio.emit('my_response',
{'data': 'Room ' + message['room'] + ' is closing.'},
room=message['room'])
await sio.close_room(message['room'])
@sio.event
async def my_room_event(sid, message):
await sio.emit('my_response', {'data': message['data']},
room=message['room'])
@sio.event
async def disconnect_request(sid):
await sio.disconnect(sid)
@sio.event
async def connect(sid, environ):
await sio.emit('my_response', {'data': 'Connected', 'count': 0}, room=sid)
@sio.event
def disconnect(sid):
print('Client disconnected')
# app.router.add_static('/static', 'static')
# app.router.add_get('/', index)
async def init_app():
sio.start_background_task(background_task)
return app
if __name__ == '__main__':
web.run_app(init_app()) This application starts and connects to my local rabbitmq server. From the documentation I thought the
Did I missunderstood the documentation? When I publish messages to the rabibtmq, I don't see them being accepted by the server. I figure I need to register some sort of callback? But I don't find any information about that in the documentation. Further, I did not find any information on how to publish to the rabbitmq. Any help is highly appreciated. Thank you very much, |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 10 replies
-
Socket.IO creates its own queue and manages it fully. You should not write to the queue directly, the queue is used internally for communication of the processes running Socket.IO code. If you want to emit to your clients from a process that is not a server, use the documented approach. |
Beta Was this translation helpful? Give feedback.
-
Okay, thank you for your replay. I still haven't fully understood how it works. Here is my updated version: socketio server: import logging
import logging.handlers
import socketio
from aiohttp import web
logger = logging.getLogger("socket io server")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(name)s] [%(levelname)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info("Starting socket io...")
# create socketio server
sio = socketio.AsyncServer(async_mode='aiohttp', client_manager=socketio.AsyncAioPikaManager('amqp://guest:guest@localhost:5672//'), logger=logger)
app = web.Application()
sio.attach(app)
# define event handlers
@sio.event
def connect(sid, environ):
logging.info("Client connected: %s", sid)
# sid join room 'test'
sio.enter_room(sid, 'test')
sio.emit("message", "Hello from the server!", room=sid)
@sio.event
def disconnect(sid):
logging.info("Client disconnected: %s", sid)
async def init_app():
# sio.start_background_task(background_task)
return app
web.run_app(init_app()) rabbitmq consumer import logging
import logging.handlers
import socketio
import pika
sio = socketio.AsyncServer(async_mode='aiohttp', client_manager=socketio.AsyncAioPikaManager('amqp://guest:guest@localhost:5672//'))
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").propagate = False
logger = logging.getLogger("Rabbit MQ consumer")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(name)s] [%(levelname)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare the exchange
channel.exchange_declare(exchange='super.test', exchange_type='fanout')
# Declare the queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind the queue to the exchange
channel.queue_bind(exchange='super.test', queue=queue_name)
# Define the callback function
def callback(ch, method, properties, body):
logger.info("Received message: %s", body)
sio.emit("message", body.decode("utf-8"))
logger.info("Message sent to socket io")
# Consume messages from the queue
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming() Am I correct, that the idea is, that the 2 processes communicate via RabbitMQ. So when I call in my rabbitmq consumer When I publish to the exchange What am I missing? |
Beta Was this translation helpful? Give feedback.
Hi,
wanted to come back to this and report that with the help of a friend finally got a working solution:
server.py