I started learning asynchronous development in Python 3 and faced the following dilemma:

Suppose I have tasks, and inside I check that if this task is completed for a long time, then I “postpone” it (transferring control to the next corute in the event loop):

import asyncio import time async def calculate(value): if value > 10000: await asyncio.sleep(0.0001) data = value ** value print('Calculated {}'.format(value)) return data async def main(): task_1 = calculate(98230) task_2 = calculate(16780) task_3 = calculate(656) task_4 = calculate(1078) tasks_list = [task_1, task_2, task_3, task_4] finished, unfinished = await asyncio.wait(tasks_list, loop=loop, return_when=asyncio.ALL_COMPLETED) for unfinished_task in unfinished: unfinished_task.cancel() return finished if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() result = loop.run_until_complete(main()) loop.close() print('Exec time: ' + str(time.time() - start)) 

And accordingly, the question is how can I check whether other tasks are “ready” to start calculations for the current one, for example:

 async def calculate(value): if value > 10000: while not loop.other_tasks_done(): await asyncio.sleep(0.0001) data = value ** value print('Calculated {}'.format(value)) return data 
  • one
    I do not know, simply, for example, whether it is or not, but I will immediately note that asyncio is not intended to work with heavy calculations. By the way, await asyncio.sleep(0) also transfers control to other corintines - andreymal
  • Thank you for your comment. By code, yes, this is more like an example, some kind of "imitation" of a long answer. - KOTJlETA
  • put a semaphore. It will not allow new tasks to be launched with the same semaphore - eri

1 answer 1

It makes sense to build a queue of tasks if they are not very asynchronous, blocking some resources (network bandwidth, memory, interrupts or disk).

If you take mathematics like data = value ** value , then the task is synchronous and blocks percents. In this case, it is better to send heavy tasks to *Executor . With ordinary tasks, everything is simple, but asynchronous will have to be performed in a separate event loop .

 def run(corofn, *args): loop = asyncio.new_event_loop() try: coro = corofn(*args) return loop.run_until_complete(coro) finally: loop.close() async def calculate(value): ... executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) tasks_list = [] for value in values: tasks_list.append(loop.run_in_executor(executor, run, calculate, value)) finished, unfinished = await asyncio.wait(tasks_list, ... 

In the example, you can remove run if there are no asynchronous calls in calculate . max_workers means that tasks will be carried out according to 3.

Another approach is to pause tasks before execution. For this there are semaphores . It is more suitable for working with the network and other input-output, but it will also cope well with the tasks on the processor. With memory, you need to work manually before releasing the semaphore.

Create sema = asyncio.Semaphore(3)

In the function you add the acquire and release

 async def calculate(sema, value): await sema.acquire() if value > 10000: await asyncio.sleep(0.0001) data = value ** value sema.release() print('Calculated {}'.format(value)) return data 

Tasks will be locked until the first 3 are executed.

  • Thanks for the extended answer! - KOTJlETA