Issue
I need to implement the algorithm using asyncio with the following conditions (behaviour):
- check the list of arguments is not empty, if empty finish execution
- pop next argument from the list of arguments create coroutine with
- this argument and schedule it for execution "at same time" can't be
- executed no more than 'async_level' coroutines when coroutine finish
- execution -> go to the step 1
It is necessary not to plan to complete all tasks at once (as with asyncio.gather), but to do it in parts. When the next task finishes execution, a new one takes its place.
I tried to do it with asyncio.as_completed() but it doesn't actually work as expected:
async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]
tasks = {asyncio.create_task(job(param)) for param in params[0: async_level]}
params = iter(params[async_level:])
while True:
# NOTE: It wont work, because you can't add task in 'tasks' after 'as_completed' is invoked, so execution actually ends when the last coroutine in the 'as_completed' ends
for task in asyncio.as_completed(tasks):
print(f"len(tasks) = {len(tasks)}")
await task
try:
param = next(params)
tasks.add(asyncio.create_task(job(param)))
except StopIteration:
print("StopIteration")
break
Also, I tried to implement it using asyncio.BoundedSemaphore, but first two conditions are not met:
async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]
async def semaphore_job(name, _asyncio_semaphore):
async with _asyncio_semaphore:
await job(name)
asyncio_semaphore = asyncio.BoundedSemaphore(async_level)
jobs = []
# NOTE: This variant schedule all jobs at ones and it's significant drawback because the count of jobs can be overwhelmed
for param in params:
jobs.append(asyncio.ensure_future(semaphore_job(param, asyncio_semaphore)))
await asyncio.gather(*jobs)
I would be grateful for any of your help.
Solution
It seems I found the solution myself:
import asyncio
from typing import Callable
from random import randrange
from asyncio import Semaphore, ensure_future, get_event_loop
async def job(name, time_range=10):
timeout = randrange(time_range)
print(f"Task '{name}' started with timeout {timeout}")
await asyncio.sleep(timeout)
print(f"Task '{name}' finished")
return name
async def custom_executor(func: Callable, args: list, async_level: int = 4):
""" Asynchronously executes no more that 'async_level' callables specified by 'func' with corresponding 'args' """
loop = get_event_loop()
sync = Semaphore()
todo = set(args)
doing = set()
def _schedule_task():
if todo:
arg = todo.pop()
fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
f = ensure_future(fr, loop=loop)
f.add_done_callback(_on_completion)
doing.add(f)
def _on_completion(f):
doing.remove(f)
sync.release()
_schedule_task()
for _ in range(min(async_level, len(todo))):
_schedule_task()
while True:
if not doing:
break
await sync.acquire()
async def main():
await custom_executor(job, [(1, 3), 7, (8, 2), 12, 5])
if __name__ == '__main__':
asyncio.run(main())
But if you know a better way, please share!
Answered By - Evgen
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.