wrote a parser.

To send requests I used async + aiohttp.

The response from the server is stored in the list.

Then using the thread pool to parse the responses from the server:

Code:

results = [] async with aiohttp.ClientSession() as client: coroutines_category = [request(client, url) for url in url_list_category] completed, pending = await asyncio.wait(coroutines_category) for item in completed: results.append(item.result()) with ThreadPoolExecutor(count_thread) as executor: for _ in executor.map(parsing_category, results): pass 

Question:

How can I make that when I hit the list, this element is immediately processed by the thread pool?

Update:

 results = [] async with aiohttp.ClientSession() as client: coroutines_main_list_category = [parsing_main_list_category(client, url) for url in url_page_category] completed, pending = await asyncio.wait(coroutines_main_list_category) # coroutines_category = [request(client, url) for url in url_list_category] completed, pending = await asyncio.wait(coroutines_category) for item in completed: results.append(item.result()) with ThreadPoolExecutor(count_thread) as executor: for _ in executor.map(parsing_category, results): pass results = [] async with aiohttp.ClientSession() as client: coroutines_main_list_product = [parsing_main_list_product(client, url) for url in url_page_product] await asyncio.wait(coroutines_main_list_product) coroutines_list_product = [request(client, url) for url in url_list_product] completed, pending = await asyncio.wait(coroutines_list_product) for item in completed: results.append(item.result()) with ThreadPoolExecutor(count_thread) as executor: for _ in executor.map(parsing_list_product, results): pass results = [] async with aiohttp.ClientSession() as client: coroutines_product = [request(client, url) for url in url_product] completed, pending = await asyncio.wait(coroutines_product) for item in completed: results.append(item.result()) with ThreadPoolExecutor(count_thread) as executor: for _ in executor.map(parsing_product, results): pass 

In part

 coroutines_main_list_category = [parsing_main_list_category(client, url) for url in url_page_category] completed, pending = await asyncio.wait(coroutines_main_list_category) 

Url_list_category is filled.

In part:

 coroutines_main_list_product = [parsing_main_list_product(client, url) for url in url_page_product] await asyncio.wait(coroutines_main_list_product) 

Url_list_product is filled

And I run this method like this:

 event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(start_main()) finally: event_loop.close() 

Update 2:

 Traceback (most recent call last): File "D:/Git/aquapolis.py", line 401, in <module> loop.run_until_complete(main(url_page_category)) File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 568, in run_until_complete return future.result() File "D:/Git/aquapolis.py", line 335, in main await crawl(initial_future, client, pool) File "D:/Git/aquapolis.py", line 320, in crawl parse_future = loop.run_in_executor(pool, parse, (await request_future).result()) AttributeError: 'str' object has no attribute 'result' 

Update 3 Tried parse_future = loop.run_in_executor(pool, parse, await request_future) Throws out:

 future: <Task finished coro=<crawl() done, defined at D:/Git/aquapolis.py:315> exception=TypeError("'NoneType' object is not iterable")> Traceback (most recent call last): File "D:/Git/aquapolis.py", line 319, in crawl for request_future in asyncio.as_completed([request(client, url) for url in urls]): TypeError: 'NoneType' object is not iterable 

I tried

 `parse_future = loop.run_in_executor(pool, parse, (await request_future).result)` Traceback (most recent call last): File "D:/Git/aquapolis.py", line 401, in <module> loop.run_until_complete(main(url_page_category)) File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 568, in run_until_complete return future.result() File "D:/Git/aquapolis.py", line 335, in main await crawl(initial_future, client, pool) File "D:/Git/aquapolis.py", line 320, in crawl parse_future = loop.run_in_executor(pool, parse, (await request_future).result ) AttributeError: 'str' object has no attribute 'result' 

I tried

 parse_future = loop.run_in_executor(pool, parse, await request_future.result()) 

and parse_future = loop.run_in_executor(pool, parse, await request_future.result())

 future: <Task finished coro=<request() done, defined at D:/Git/aquapolis.py:61> exception=RuntimeError('Session is closed')> Traceback (most recent call last): File "D:/Git/aquapolis.py", line 64, in request async with client.get(url, headers=headers) as r: File "C:\ProgramData\Anaconda3\lib\site-packages\aiohttp\client.py", line 855, in __aenter__ self._resp = await self._coro File "C:\ProgramData\Anaconda3\lib\site-packages\aiohttp\client.py", line 254, in _request raise RuntimeError('Session is closed') RuntimeError: Session is closed Traceback (most recent call last): File "D:/Git/aquapolis.py", line 401, in <module> loop.run_until_complete(main(url_page_category)) File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 568, in run_until_complete return future.result() File "D:/Git/aquapolis.py", line 335, in main await crawl(initial_future, client, pool) File "D:/Git/aquapolis.py", line 320, in crawl parse_future = loop.run_in_executor(pool, parse, await request_future.result()) AttributeError: 'coroutine' object has no attribute 'result' sys:1: RuntimeWarning: coroutine 'as_completed.<locals>._wait_for_one' was never awaited 
  • Try to better read the documentation for your version of Python than to sort through the options at random. The first one is correct, just in your crawl instead of a list of links, None falls. The problem must be found in the function parse . - Hivemaster

1 answer 1

 with concurrent.futures.ThreadPoolExecutor(max_workers=count_thread) as pool: async with aiohttp.ClientSession() as client: for f in asyncio.as_completed([request(client, url) for url in url_list_category]): loop.run_in_executor(pool, parsing_category, f.result()) 

UPDATE: Complicate

 import asyncio from concurrent.futures import ThreadPoolExecutor from contextlib import suppress import os def parse(page_text): urls = [] # Определяем по разметке на каком типе страниц находимся if ...: # и извлекаем из неё соответствующие ссылки ... elif ...: ... else: # или необходимые данные, если это конечная страница парсинга, # и что-нибудь с ними делаем ... return urls async def crawl(future, client, pool): futures = [] # Получаем из футуры ссылки urls = await future # Выгребаем для каждой ссылки разметку страницы for request_future in asyncio.as_completed([request(client, url) for url in urls]): # Передаём парсинг разметки в пул потоков parse_future = loop.run_in_executor(pool, parse, (await request_future).result()) # Рекурсивно вызываем себя для парсинга новой порции ссылок futures.append(asyncio.ensure_future(crawl(parse_future, client, pool))) # Это нужно только для того, чтобы знать # когда завершать цикл событий if futures: await asyncio.wait(futures) async def main(root_urls): loop = asyncio.get_event_loop() # Создаём пул потоков по количеству процессоров with ThreadPoolExecutor(max_workers=os.cpu_count()) as pool: # Создаём клиентскую сессию async with aiohttp.ClientSession() as client: # Создаём корневую футуру initial_future = loop.create_future() # Помещаем в неё ссылки, с которых начнём парсить initial_future.set_result(root_urls) # Передаём эту футуру в сопрограмму обхода ссылок # вместе с пулом потоков и клиентской сессией await crawl(initial_future, client, pool) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(['http://www.com'])) except KeyboardInterrupt: for task in asyncio.Task.all_tasks(): task.cancel() with suppress(asyncio.CancelledError): loop.run_until_complete(task) finally: loop.close() 
  • You can comment on the last line what it means. If it is possible then the last 2. - danilshik
  • one
    as_compeleted wraps the coroutines in Future and returns one by one as they are executed, and run_in_executor starts the function for execution in the thread pool. All calls are asynchronous, nothing is blocked anywhere. - Sergey Gornostaev
  • And do not tell me, a more complex example? Code added below - danilshik
  • @danilshik will tell. True, I’m telling you this is not the first time, but for some reason you don’t accept my answers. - Sergey Gornostaev pm
  • Yes, complicated ... - danilshik