Your function is worker (). You can parallelize by reading a part of a file and writing the processed result into different files.
subprocess_run: run the script 'sub.py' in the new python session, those in each process have their own GIL, in theory the fastest option
concurrent_run, multiprocessing_run: reading a file in parts, the more lines_limit, the more process will process data in one approach, the faster the total time
single_run: read the entire file, single-threaded execution (for comparison)
import re import subprocess import multiprocessing import concurrent.futures from itertools import islice import timeit def worker(args): """ читать rfile, начиная со строки begin_line до строки end_line результат писать в wfile(писать в разные файлы) """ rfile, wfile, begin_line, end_line, enc = args with open(rfile, encoding=enc, errors='ignore') as rf, open(wfile, 'w', encoding=enc, errors='ignore') as wf: for line in islice(rf, begin_line, end_line): wf.write('%s\n' % re.sub('\s+', '', line.replace(' ', ';'))) def subprocess_run(file='sub.py'): """выполнить (2 процесса), через новую python-сессию""" m = max_line//2 # обработать строк за проход with subprocess.Popen(['python', file, str([file_read, file_write % 1, 0, m, enc])]), \ subprocess.Popen(['python', file, str([file_read, file_write % 2, m, max_line, enc])]): pass def multiprocessing_run(): """выполнить через multiprocessing""" with multiprocessing.Pool(processes=2) as pool: pool.map(worker, worker_args) def concurrent_run(): """выполнить через ProcessPoolExecutor""" with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool: pool.map(worker, worker_args) def single_run(): """выполнить однопоточно""" worker([file_read, file_write % '_', 0, max_line, enc]) if __name__ == '__main__': # исходные данные enc = 'utf-8' file_read = 'test2.txt' file_write = 'test2_out_%s.txt' start_line, stop_line = 0, 0 lines_limit = 1500000 # обработать строк за проход max_line = len(list(open(file_read))) # кол-во строк файла # для multiprocessing_run, concurrent_run worker_args = [] while start_line < max_line: start_line = stop_line stop_line += lines_limit args = file_read, file_write % start_line, start_line, stop_line, enc worker_args.append(args) # выполнить и замерить производительность for fn in [subprocess_run, single_run, concurrent_run, multiprocessing_run]: print(timeit.Timer(fn).repeat(1, 1), 'сек', fn.__name__)
'sub.py' file
import sys import re from itertools import islice rfile, wfile, begin_line, end_line, enc = eval(sys.argv[1]) with open(rfile, encoding=enc, errors='ignore') as rf, open(wfile, 'w', encoding=enc, errors='ignore') as wf: for line in islice(rf, begin_line, end_line): wf.write('%s\n' % re.sub('\s+', '', line.replace(' ', ';')))
speed, the source file 'test2.txt' - 120Mb ~ 3000000 lines:
[16.207453404993398] сек subprocess_run [29.336879417459787] сек single_run [18.94762167044125] сек concurrent_run [20.828409269737875] сек multiprocessing_run
\\tsclientdisks / network are loaded with other work or something else 3- "replace encoding" (from utf-8 to utf-8 - it’s not clear what you mean) - can you Want to make sure the entire file can be decoded? for example, that there is no lone surrogates? Or something different? 4- "parallelizing the process" - either it is not necessary or useless, depending on # 2 - jfs