Issue
Can someone please explain to me why the below code doesn't work and how I can refactor it, so it does work.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def io_bound(device):
sleep(5)
return 5
async def sleepy_time(result):
await asyncio.sleep(5)
print(result)
async def main(loop):
futures = [loop.run_in_executor(executor, io_bound, x) for x in range(6)]
for f in asyncio.as_completed(futures):
result = await f
task = asyncio.create_task(sleepy_time(result))
await task
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(
max_workers=3,
)
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
Inside the asyncio.as_completed stanza, if I just print the result, it prints 3 results concurrently - which is working as I expected. However, when I await another task - I would expect that asyncio would start that task and move onto the next future, but it doesn't; it blocks until the asyncio.sleep(5) has run and then moves on the next future.
I've tested putting two tasks inside the as_completed stanza and they run concurrently.
How can I get the above code to run the second set of tasks concurrently?
Solution
To run the second set of tasks concurrently, make it into a collection of tasks and then use asyncio.wait() to await them.
async def main(loop):
futures = [loop.run_in_executor(executor, io_bound, x) for x in range(6)]
tasks = []
for f in asyncio.as_completed(futures):
result = await f
tasks.append(asyncio.create_task(sleepy_time(result)))
await asyncio.wait(tasks)
When you await something in async function it blocks further execution of function body until await resolves.
While await asyncio.sleep() also allows other tasks in event loop to run.
From python docs:
sleep()
always suspends the current task, allowing other tasks to run.
And now asyncio.create_task():
Wrap the coro coroutine into a
Task
and schedule its execution. Return the Task object.
That means the created task is going to be run by event loop regardless whether you await it or not.
You can test that out by switching
await asyncio.wait(tasks)
to
await asyncio.sleep(10)
Also, if event loop stops before task could be run you get "Task was destroyed but it is pending!" error.
That might happen, if event loop had no chance to run the tasks. For example, if in your async def main() you created tasks with create_task(), but never had awaited anything at all.
Answered By - ark-key
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.