There are about 1800 folders, in each txt-files from 1 to 2500 pcs. It is necessary to process everything. I divided all folders into ranges, 0..180, 181-360, etc. Ranges transmitted in 10 streams. Everything is working. My question is: how to create a queue of streams, for example, also 10, but transfer there not a range, but one folder, and in order to work, the thread takes the next one? How to avoid blocking at the same time, so that at the same time the freed threads do not take the same task?

example of my code

def writer(start, stop, num): v_id_count = 0 v_Count = 0 out = open('C:\\chunks\\part_'+str(num)+'.txt', 'w') for i in islice(dirs,start,stop): id = i v_id_count += 1 p = '{}\\{}'.format(path,i) files = [z for z in os.listdir(p)] for file in files: file_path = '{}\\{}'.format(p, file) f = open(file_path, 'r').read() soup = BeautifulSoup(f, 'html.parser') # cur = con.cursor() v_Count_loop = 0 try: for v in range(get_count(soup)): string = '{};{};{};{};{};{};{}'.format(id, get_status_date(soup), get_status(soup), get_z_name(soup),get_d_name(soup), get_method(soup, v), get_description(soup, v)) out.write(string + '\n') v_Count += 1 v_Count_loop += 1 print('Поток {}. Вставлено {} строк. Всего вставлено {}. Обработано id {}'.format(num, v_Count_loop, v_Count, v_inn_count)) except Exception as e: print(e) out.close() if __name__ == '__main__': path = 'C:\\html' dirs = [x for x in os.listdir(path)] maxpoint = len(dirs) params = [] #делим на диапазоны for i in range(1,11): if round(maxpoint/10 * (i-1)) == 0: st = round(maxpoint/10 * (i-1)) else: st = round(maxpoint/10 * (i-1)) + 1 ed = round(maxpoint/10*i) args = st, ed, i params.append(args) # print(params[0]) p1 = Process(target = writer, args=params[0]) p2 = Process(target = writer, args=params[1]) p3 = Process(target = writer, args=params[2]) p4 = Process(target = writer, args=params[3]) p5 = Process(target = writer, args=params[4]) p6 = Process(target = writer, args=params[5]) p7 = Process(target = writer, args=params[6]) p8 = Process(target = writer, args=params[7]) p9 = Process(target = writer, args=params[8]) p10 = Process(target = writer, args=params[9]) p1.start() p2.start() p3.start() p4.start() p5.start() p6.start() p7.start() p8.start() p9.start() p10.start() p1.join() p2.join() p3.join() p4.join() p5.join() p6.join() p7.join() p8.join() p9.join() p10.join() 

    1 answer 1

    work is in the process pool

    read / write files - in the main

     from os import path, walk from multiprocessing import Pool, Lock def files_from_dir(dir_: str) -> iter: '''генератор файлов''' for root, dirs, files in walk(dir_): for file in files: yield path.join(root, file) def file_worker(name: str) -> tuple: '''обработка файла(в потоке)''' with file_lock: return name, str(len(open(name).read())) def set_pool_lock(lock: Lock) -> None: global file_lock file_lock = lock if __name__ == '__main__': file_lock = Lock() # лок на чтение/запись файлов на диск with Pool(processes=10, initializer=set_pool_lock, initargs=[file_lock]) as pool: # обработка файлов из file_worker в пуле-процессов pool for name, text_len in pool.imap_unordered(file_worker, files_from_dir('TEMP')): with file_lock: # логирование результата работы file_worker with open('%s.log' % name, 'w') as log: log.write(text_len) 
    • Thank. Your previous example was clearer (And how are the locks implemented here? Or will there be none? - ss_beer
    • one
      There is no sense in parallel work with the disk, work (i / o) with the files occurs sequentially, in a separate process. The transfer of tasks to the worker-process occurs by means of multiprocessing .Pool, which takes care of everything. Those locks are not needed. - vadim vaduxa