Issue
I am trying to hook my websocket endpoint with rabbitmq (aio-pika). Goal is to have listener in that endpoint and on any new message from queue pass the message to browser client over websockets.
I tested the consumer with asyncio in a script with asyncio loop. Works as I followed and used aio-pika documentation. (source: https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/2-work-queues.html, worker.py)
However, when I use it in fastapi in websockets endpoint, I cant make it work. Somehow the listener:
await queue.consume(on_message)
is completely ignored.
This is my attempt (I put it all in one function, so its more readable):
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
print("Entering websockets")
await manager.connect(websocket)
print("got connection")
# params
queue_name = "task_events"
routing_key = "user_id.task"
con = "amqp://rabbitmq:rabbitmq@rabbit:5672/"
connection = await connect(con)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
exchange = await channel.declare_exchange(
"topic_logs",
ExchangeType.TOPIC,
)
# Declaring queue
queue = await channel.declare_queue(queue_name)
# Binding the queue to the exchange
await queue.bind(exchange, routing_key)
async def on_message(message: IncomingMessage):
async with message.process():
# here will be the message passed over websockets to browser client
print("sent", message.body)
try:
######### Not working as expected ###########
# await does not await and websockets finishes, as there is no loop
await queue.consume(on_message)
#############################################
################ This Alternative code atleast receives some messages #############
# If I use this part, I atleast get some messages, when I trigger a backend task that publishes new messages to the queue.
# It seems like the messages are somehow stuck and new task releases all stucked messages, but does not release new one.
while True:
await queue.consume(on_message)
await asyncio.sleep(1)
################## one part #############
except WebSocketDisconnect:
manager.disconnect(websocket)
I am quite new to async in python. I am not sure where is the problem and I cannot somehow implement async consuming loop while getting inspired with worker.py from aio-pika.
Solution
The solution was simply.
aio-pika queue.consume even though we use await is nonblocking, so this way we consume
consumer_tag = await queue.consume(on_message, no_ack=True)
and at the end of connection we cancel
await queue.cancel(consumer_tag)
The core of the solution for me, was to make something asyncio blocking, so I used this part of the code after consume
while True:
data = await websocket.receive_text()
x = await manager.send_message(data, websocket)
I dont use this code, but its useful as this part of the code waits for frontend websocket response. If this part of the code is missing, then what happens is that client connects just to get disconnected (the websocket endpoit is succefully executed), as there is nothing blocking
Answered By - Martin
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.