Good day.

There is a code that simultaneously starts several asynchronous processes on the list. It is necessary to get a result from each process and update the status in the database. Code example:

from time import sleep from multiprocessing import Process, Value import subprocess def worker_email(keyword, func_result): subprocess.Popen(["python", "mongoworker.py", str(keyword)]) func_result.value = 1 return True keywords_list = ['apple', 'banana', 'orange', 'strawberry'] if __name__ == '__main__': for keyword in keywords_list: # Выполняю задачу func_result = Value('i', 0) p = Process(target=worker_email, args=(keyword,func_result)) p.start() # Обновляю статус задачи if func_result.value == 1: stream.update_one({'_id': doc['_id']}, {"$set": {"status": True}}, upsert=False) 

What is the problem: if you use p.join() , then everything works, but the processes are executed in turn. If not used, the processes are not closed, and the status is not updated. The working variant is to perform not the function code, but subprocess.Popen , but it looks like something obscene.

Actually, I will be glad to any advice :)

    1 answer 1

    I decided to transfer the connection to the database and check the result in the function-worker itself. Like that:

     def worker_email(keyword, task_id): # Соединяюсь с базой client = MongoClient('mongodb://localhost:27017/') db = client.admetric stream = db.stream sleep(10) print('Yo:' + keyword) # Обновляю статус задачи на "Выполнено" (если все ок) или не меняю статус и отправляю на повторое выполнение (если не ок) if True: stream.update_one({'_id': task_id}, {"$set": {"status": True}}, upsert=False) # Отключаюсь от базы client.close() return True 

    UPD: More exploded version:

     def update_status(task_id, func_result): # Соединяюсь с базой client = MongoClient('mongodb://localhost:27017/') db = client.admetric stream = db.stream # Обновляю статус задачи на "Выполнено" (если все ок) или не меняю статус и отправляю на повторое выполнение (если не ок) if func_result: stream.update_one({'_id': task_id}, {"$set": {"status": True}}, upsert=False) # Отключаюсь от базы client.close() def yo_func(keyword): sleep(10) print('Yo:' + keyword) return True def worker_email(keyword, task_id): update_status(task_id, yo_func(keyword))