Issue
In Python, I would like to know how to wait for the first of either queue.get()
or event.wait()
.
At the moment, I am using asyncio.wait()
to achieve this, but this is producing a deprecation warning. I do not understand how I should alter my code so that it will be compatible with future versions of Python.
The following code is functional, however it gives the warning DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11.
import random
import asyncio
event = asyncio.Event()
queue = asyncio.Queue()
async def producer():
for i in range(5):
print(f"Putting {i}")
await queue.put(i)
await asyncio.sleep(random.random())
# Check if we should terminate
if event.is_set():
break
print("Producer done")
async def terminator():
await asyncio.sleep(random.random() * 5)
print("Terminating")
event.set()
print("Terminator done")
async def consumer():
while True:
print(f"Waiting on the result of either the queue or the event")
done, _ = await asyncio.wait(
[queue.get(), event.wait()],
return_when=asyncio.FIRST_COMPLETED
)
# Check if we should terminate
if event.is_set():
break
# Otherwise, we got a queue item
item = done.pop().result()
print(f"got {item}")
print("Consumer done")
async def main():
await asyncio.gather(producer(), terminator(), consumer())
asyncio.run(main())
Example output:
Putting 0
Waiting on the result of either the queue or the event
got 0
Waiting on the result of either the queue or the event
Putting 1
got 1
Waiting on the result of either the queue or the event
Terminating
Terminator done
Consumer done
Producer done
Solution
I made the following solution, please check and advise, I left many comments to explain what I do and what potential problem I found in your code:
import asyncio
async def producer(queue: asyncio.Queue, event: asyncio.Event) -> None:
for i in range(5):
print(f"Putting {i}")
await queue.put(i)
await asyncio.sleep(1)
if event.is_set(): # Check if we should terminate
break
print("Producer done")
async def terminator(event: asyncio.Event) -> None:
await asyncio.sleep(3)
print("Terminating")
event.set()
print("Terminator done")
async def consumer(queue: asyncio.Queue, event: asyncio.Event) -> None:
while True:
print(f"Waiting on the result of either the queue or the event")
# you should create Tasks, rather than just sending coroutines to asyncio.wait
# It was the reason of Warning message
task_queue_get = asyncio.create_task(queue.get())
task_event_get = asyncio.create_task(event.wait())
done_tasks, pending_tasks = await asyncio.wait(
[task_queue_get, task_event_get],
return_when=asyncio.FIRST_COMPLETED
)
# Check if we should terminate
if event.is_set():
[i.cancel() for i in pending_tasks] # cancel pending tasks
# return_exceptions=True - prevent raise of asyncio.CancelledError
await asyncio.gather(*pending_tasks, return_exceptions=True)
break
# Otherwise, we got a queue item
try:
res = [i.result() for i in done_tasks]
# Attention!
# only one task is expected, but can cause errors,
# if result has exception instead of normal result inside
item = res[0]
print(f"got {item}")
except Exception as ex:
print("Unexpected Exception!!!")
raise ex
[i.cancel() for i in pending_tasks] # cancel pending tasks
# return_exceptions=True - prevent raise of asyncio.CancelledError
await asyncio.gather(*pending_tasks, return_exceptions=True)
print("Consumer done")
async def main():
# event and queue should be created inside main,
# otherwise could cause "different loops" conflict
event = asyncio.Event()
queue = asyncio.Queue()
await asyncio.gather(
producer(queue=queue, event=event),
terminator(event=event),
consumer(queue=queue, event=event),
)
if __name__ == '__main__':
asyncio.run(main())
Answered By - Artiom Kozyrev
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.