When the following code works (the maximum test and a small example is sketched), after the block_func method completes, the stream in which the method was executed does not end.

def block_func(): i = 1 async def starter(): future = asyncio.get_event_loop().run_in_executor( executor=None, func=functools.partial(block_func) ) while True: await asyncio.sleep(1) asyncio.ensure_future(starter()) loop = asyncio.get_event_loop() loop.run_forever() 

It does not help even the option of cleaning tasks:

 def block_func(): i = 1 async def starter(): future = asyncio.get_event_loop().run_in_executor( executor=None, func=functools.partial(block_func) ) while True: await asyncio.sleep(1) async def cleaner(): while True: for task in asyncio.Task.all_tasks(): try: if task.exception() or task.done(): asyncio.Task._all_tasks.remove(task) except asyncio.futures.CancelledError: asyncio.Task._all_tasks.remove(task) except asyncio.futures.InvalidStateError: pass # :TODO: Чистим последний таск, иначе не возбуждаются ошибки из корутины task = None await asyncio.sleep(5) asyncio.ensure_future(starter()) asyncio.ensure_future(cleaner()) loop = asyncio.get_event_loop() loop.run_forever() 

I tried to substitute executor = concurrent.futures.ThreadPoolExecutor (), in this case it turns out to terminate the stream by calling the future result method to read the result of the block_func method in the stream.

But I don’t like this solution, I want to understand why the thread doesn’t end by itself when the execution of the method inside the stream ends.

  • What does it mean "the stream does not end" How do you define it? - Sergey Gornostaev
  • Through htop, I watch the threads generated by the main thread, and after working out the Korutina and deleting its Task, the thread will be suspended until the main program thread ends. The executore also increases the number of threads, but after processing they are not cleared from there. - xIdan
  • one
    This is logical, because the event loop uses the ThreadPoolExecutor, and he, as the name implies, reuses the threads, returning the spent ones to the pool. - Sergey Gornostaev
  • Yes indeed, the threads are being reused, thank you. This is very useful information. - xIdan

1 answer 1

When the flow is complete, all the data inside it is consumed by the GC. Including the answer.

You can make the thread terminate itself if you do not need an answer.

Just hang up an empty function on the callback:

 def cb(future): pass future.add_done_callback(cb) 

You can handle exceptions in the callback:

 def cb(future): exc = future.exception() if exc: logger.error(exc)