There is a text file over 20gb. It is necessary to change the encoding, replace the separator for further loading into the database. Errors appear during conversion, I don’t know what to do with them, the character codes are different:

#Error: 'charmap' codec can't encode character '\xb9' in position 67: `character` maps to <undefined>, in line 0 str = #Error: 'charmap' codec can't encode character '\x98' in position 197: character maps to <undefined>, in line 0 str = 

And the second question, is it possible to parallelize the process somehow? can write to different files? In one stream per working day the file is not converted. But then the question is how to divide the source file, I read about the streams, but have not tried it yet. Source script:

 import codecs import time import re file = codecs.open('\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER_000000.txt','r', 'utf-8') #добавил out = open('\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER.txt', 'w') n = 0 print('Start :'.format(), time.strftime('%c')) for i in file: try: out.write(re.sub('\s+','',i.replace(' ',';')) + '\n') except Exception as e: print('Error: {}, in line {} str = '.format(e, n, i)) pass out.close() print('end :'.format(), time.strftime('%c')) 
  • I do not see in the code instructions of the encoding into which you convert something - andreymal
  • Duck first, codecs.open (file, 'utf-8') - ss_beer
  • is it reading the file with utf-8 encoding, and writing the file with what encoding? - andreymal
  • @andreymal and, sorry, also in utf-8, before replace. encode ('utf-8') should be - ss_beer
  • one
    Try to limit yourself to one problem in one question: you have several problems here: 1- "'charmap' codec can't encode" 2- download 20GB text. file in the database for a working day (20GB / 8hour ~ 6Mbps) very moderate speed. Without details it is impossible to understand whether \\tsclient disks / network are loaded with other work or something else 3- "replace encoding" (from utf-8 to utf-8 - it’s not clear what you mean) - can you Want to make sure the entire file can be decoded? for example, that there is no lone surrogates? Or something different? 4- "parallelizing the process" - either it is not necessary or useless, depending on # 2 - jfs

2 answers 2

Your function is worker (). You can parallelize by reading a part of a file and writing the processed result into different files.

subprocess_run: run the script 'sub.py' in the new python session, those in each process have their own GIL, in theory the fastest option

concurrent_run, multiprocessing_run: reading a file in parts, the more lines_limit, the more process will process data in one approach, the faster the total time

single_run: read the entire file, single-threaded execution (for comparison)

 import re import subprocess import multiprocessing import concurrent.futures from itertools import islice import timeit def worker(args): """ читать rfile, начиная со строки begin_line до строки end_line результат писать в wfile(писать в разные файлы) """ rfile, wfile, begin_line, end_line, enc = args with open(rfile, encoding=enc, errors='ignore') as rf, open(wfile, 'w', encoding=enc, errors='ignore') as wf: for line in islice(rf, begin_line, end_line): wf.write('%s\n' % re.sub('\s+', '', line.replace(' ', ';'))) def subprocess_run(file='sub.py'): """выполнить (2 процесса), через новую python-сессию""" m = max_line//2 # обработать строк за проход with subprocess.Popen(['python', file, str([file_read, file_write % 1, 0, m, enc])]), \ subprocess.Popen(['python', file, str([file_read, file_write % 2, m, max_line, enc])]): pass def multiprocessing_run(): """выполнить через multiprocessing""" with multiprocessing.Pool(processes=2) as pool: pool.map(worker, worker_args) def concurrent_run(): """выполнить через ProcessPoolExecutor""" with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool: pool.map(worker, worker_args) def single_run(): """выполнить однопоточно""" worker([file_read, file_write % '_', 0, max_line, enc]) if __name__ == '__main__': # исходные данные enc = 'utf-8' file_read = 'test2.txt' file_write = 'test2_out_%s.txt' start_line, stop_line = 0, 0 lines_limit = 1500000 # обработать строк за проход max_line = len(list(open(file_read))) # кол-во строк файла # для multiprocessing_run, concurrent_run worker_args = [] while start_line < max_line: start_line = stop_line stop_line += lines_limit args = file_read, file_write % start_line, start_line, stop_line, enc worker_args.append(args) # выполнить и замерить производительность for fn in [subprocess_run, single_run, concurrent_run, multiprocessing_run]: print(timeit.Timer(fn).repeat(1, 1), 'сек', fn.__name__) 

'sub.py' file

 import sys import re from itertools import islice rfile, wfile, begin_line, end_line, enc = eval(sys.argv[1]) with open(rfile, encoding=enc, errors='ignore') as rf, open(wfile, 'w', encoding=enc, errors='ignore') as wf: for line in islice(rf, begin_line, end_line): wf.write('%s\n' % re.sub('\s+', '', line.replace(' ', ';'))) 

speed, the source file 'test2.txt' - 120Mb ~ 3000000 lines:

 [16.207453404993398] сек subprocess_run [29.336879417459787] сек single_run [18.94762167044125] сек concurrent_run [20.828409269737875] сек multiprocessing_run 
  • Thanks, when starting, I get an error on the line where I want to get max_line: unicodedecodeerror: charmap codec can't decode byte 0x98 - ss_beer
  • Understood. To get Max number of lines, we still need to read this file at 25GB, right? Is there a method: read until the end? - ss_beer

Try this code:

 import re import time import codecs import multiprocessing as mt from queue import Empty as queue_Empty INP_FILENAME = '\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER_000000.txt' OUT_FILENAME = '\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER.txt' ERR_FILENAME = '\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER_err.txt' NUM_THREAD = `8 def read_line(filename, inp_queue): with codecs.open(INP_FILENAME,'r', 'utf-8') as f: for idx, line in enumerate(f): #DEBUG print((idx, line)) inp_queue.put((idx, line)) pass def processed_line(inp_queue, out_queue): while True: try: idx, line = inp_queue.get(): #DEBUG print((idx, line)) out_queue.put(re.sub(r'\s+','', line.replace(' ',';')) + '\n') except Exception: out_queue.put((idx, line)) pass def write_line(out_filename, out_queue, err_filename): err_f = open(err_filename, 'w') with open(out_filename, 'w') as f: while True: data = out_queue.get(timeout=10): #DEBUG print(data) try: f.write(data) except Exception: if isinstance(data, tuple): err_f.write(str(data)) else: err_f.write(str(data[0])) pass def main(): inp_queue = mt.Queue(10000) out_queue = mt.Queue(10000) lprocess = [] lprocess.append(mt.Process(target=read_line, args=(INP_FILENAME, inp_queue), daemon=True)) for _ in range(NUM_THREAD): lprocess.append(mt.Process(target=processed_line, args=(inp_queue, out_queue), daemon=True)) lprocess.append(mt.Process(target=write_line, args=(OUT_FILENAME, out_queue, ERR_FILENAME), daemon=True)) for process in lprocess: process.start() lprocess.pop(0).join() print('end read') lprocess.pop(-1).join() print('end :'.format(), time.strftime('%c')) if __name__ == '__main__': main() 

PS Going through the paths you read the file which is located on another computer, and this can be very slow. It is better to transfer the place of launch or file.

  • Thank. When I start, I get the error: for idx, line in enumerate (): TypeError: Required argument 'iterable' (pos 1) not found Is it possible without an argument? - ss_beer
  • Added (f), now queue.Empty - ss_beer
  • I do not know what to say. Try uncommenting the lines #DEBUG - Andrio Skur
  • @ss_beer I do not think that it can work faster. For each line there are 3 SEPARATE mt.Process () !!!. Those for a million rows, a total of 3 million Process-objects will be created. In fact, this works slower than the single-threaded version. - vadim vaduxa
  • @vadimvaduxa Um, what? Did you read the code? Here, 1 stream is created for reading from the file into the inp_queue queue (read_line), 8 threads for inp_queue processing into out_queue (processed_line), and 1 stream for writing to the file (write_line). Where is the creation of "3 separate mt.Process" on the line - Andrio Skur