Hello! I'm trying to parallelize 2 operations:

  1. The process of learning the neural network at Keras. I use the model.fit () method from this library. https://keras.io/models/model/
  2. The process of reading data for learning this network from an SSD disk into RAM. All data occupy 300Gb and do not fit into the RAM, I have only 16Gb of it.

Those. I want it to be like this:

a) Write to 5Gb data RAM

b) The grid starts learning on this data.

c) Simultaneously with p. b), while the grid is learning, another 5Gb of data is copied from the disk into memory.

d) Once the mesh has learned from the first data, it immediately begins to learn from the second data already prepared. At the same time, the process of rewriting 3 data is launched. And so on.

I did it, but everything works as follows. First, in the first step (a), everything is read from the disk is very cool. Disk read speed of about 40MB / s. Immediately I will explain that my data from the disk is read from a large h5py file and the lines are selected from it in random order, so the speed is not 3500MB / s :), i.e. The processor slows down the entire reading process with its calculations. So, at first everything is acceptable quickly, but when the grid starts learning at the same time, the read speed immediately drops to 13-15MB / s. And, most unpleasantly, the grid begins to learn 1.5 times slower when these operations begin to occur simultaneously. Those. Two processes, as it were, are struggling for a resource - CPU. Accordingly, the GPU, which is used in training, does not start up and instead of 85-90% of its load, I begin to observe 50-60%. At the same time, the total CPU load does not rise above 25-30%. I will attach the screen-shot of the "System Monitor" with the schedules of the CPU load below.

Here is the code. Not all, really. I threw out 200 lines of code from it that relate only to the grid and are irrelevant. The most important thing is the data_gen_v2 module and the nn_model.model_fit method:

Main program

import sys sys.path.append("./moduls") import data_gen_v2 as my_gen class MyModel(): '''Класс который создает сверточную нейронную ctnm''' def __init__(self, files, epoch_size, batch_size, Frame, cand_param, num_target_cat): '''Инициируем наши данные''' self.train_file = files[0] self.test_file = files[1] self.epoch_size = epoch_size self.batch_size = batch_size self.Frame = Frame self.cand_param = cand_param self.num_target_cat = num_target_cat def make_model(self): #описание слоёв и пр. В общем, моя сетка return nn def model_fit(self, nn, mem_stp, file_mod): for e in range(self.epoch_size): print("Эпоха - {}".format(e+1)) # Обучение print("Обучение") gen = my_gen.fast_gen(self.train_file, mem_stp) #all_hist = [] for i in gen: Data_X = i[0] Data_Y = i[1] history = nn.fit(Data_X, [Data_Y, Data_Y], initial_epoch = 0, epochs = 1, batch_size = self.batch_size) #all_hist.appen(history) #nn.save('./{}-{}'.format(file_mod, e+1)) #all_hist -> json # Валидация print("Валидация") gen = my_gen.fast_gen(self.test_file, mem_stp) #all_eval = [] for i in gen: Data_X = i[0] Data_Y = i[1] eval_s = nn.evaluate(Data_X, [Data_Y, Data_Y]) #all_eval.appen(eval_s) #all_eval -> json #nn - это моя скомпилированная сетка #mem_stp - это кол-во строк данных, которые должен поместить в память и занять 5Gb #file_mod - это имя файла, куда сохранять веса сетки. nn_model = MyModel(files, epoch_size, batch_size, Frame, cand_param, num_target_cat) nn = nn_model.make_model() nn_model.model_fit(nn, mem_stp, file_mod) 

Imported data_gen_v2 module

 import random import math import threading, queue import numpy as np import h5py def get_data(file_info, ind_s, ind_e, q): file = file_info[0] len_data = file_info[1] Fr_len = file_info[2] Data_ALL = file_info[3] data_ind_sh = file_info[4] if ind_e > len_data: ind_e = len_data mem_ind = data_ind_sh[ind_s:ind_e] len_step = len(mem_ind) for i in range(len(Fr_len) - 1): Data_ALL[i] = np.zeros((len_step, Fr_len[i][0], Fr_len[i][1]), dtype='float32') Data_ALL[-1] = np.zeros((len_step, Fr_len[-1][0]), dtype='float32') with h5py.File(file) as f: for i in range(len_step): Data_ALL[0][i] = f['data_X_0'][mem_ind[i]] Data_ALL[1][i] = f['data_X_1'][mem_ind[i]] Data_ALL[2][i] = f['data_X_2'][mem_ind[i]] Data_ALL[3][i] = f['data_X_3'][mem_ind[i]] Data_ALL[4][i] = f['data_X_4'][mem_ind[i]] Data_ALL[5][i] = f['data_Y'][mem_ind[i]] Data_X = Data_ALL[:5] Data_Y = Data_ALL[-1] * -1 q.put([Data_X, Data_Y]) def fast_gen(file, mem_stp): with h5py.File(file) as f: data_X_f = f['data_X_0'] len_data = len(data_X_f) Fr_len = [] for item in f: len_fr = f[item][0].shape Fr_len.append(len_fr) Data_ALL = [i for i in range(len(Fr_len))] data_ind_sh = [i for i in range(len_data)] random.shuffle(data_ind_sh) file_info = [file, len_data, Fr_len, Data_ALL, data_ind_sh] ind_s = 0 ind_e = mem_stp q = queue.Queue() threading.Thread(target=get_data, args=(file_info, ind_s, ind_e, q)).start() last_step = int((math.ceil(len_data/mem_stp)-1)*mem_stp) for i in range(0, len_data, mem_stp): ind_s = i + mem_stp ind_e = ind_s + mem_stp Data_XY = q.get() if i != last_step: print("Запустилась get_data. Итерация - {}".format(i)) threading.Thread(target=get_data, args=(file_info, ind_s, ind_e, q)).start() yield Data_XY 

Those. it can be seen that I use threading and queue for parallelization in the generator. Maybe this is not a suitable method? Is it possible to somehow rigidly specify that the fit () method uses, for example, cores 1 and 2, and the generator uses cores 3 and 4? Make it so that these 2 processes do not interfere with each other. Maybe somehow using python or the OS kernel to configure it differently, I have Ubuntu 16.04. I know that you can configure the scheduler and other things in the kernel, but I don’t know what will work better in my case :) Do I need a kernel for more latency or vice versa? I tried to configure the kernel for this article ( https://habrahabr.ru/post/234653/ ), thinking that in my case the mode of operation would be similar, only BFS and BFQ patches were not installed. In short, I do not know which side to approach my problem. Tell me please, knowledgeable people, otherwise I already broke my whole head :)

Finished 5.10.17. Reply to comments @ 0andriy.

I just could not understand the formatting syntax in the comments, so I added a message (I did not translate the line, neither 2 spaces, nor
, the code was not realistic to enter ...).

So, when I read the data from the h5py file, I read it from 6 different datasets in this file:
Zero numpy array # 1, line [n] = file ['daset_1'] [num],
Zero numpy array number 2, line [n] = file ['daset_2'] [num]
...
Zero numpy array number 6, line [n] = file ['daset_6'] [num],
The line number (num) is the same for all 6 datasets, but each time it is chosen randomly. Then we move to the next line num, and so on until we read as many datasets from file as it fits into the RAM. As a result, 6 numpy arrays with randomly selected lines from file are obtained, but the lines are consistent between arrays №1-№6.
So, I started the process of reading from a file through
'multiprocessing.Pool'
I do it like this:

 from multiprocessing import Pool def file_read(args): file = args[0] mem_ind = args[1] name_dset = args[2] Data_ALL = args[3] args = [] with h5py.File(file) as f: for num, item in enumerate(mem_ind): Data_ALL[num] = f[name_dset][item] # В этом месте постоянно всё виснет print("Переменная {} из h5py зачитанна".format(name_dset)) return Data_ALL def make_pool(args): p = Pool(2) Data_ALL = p.map(file_read, args) p.close() p.terminate() p.join() return Data_ALL # Создаю пустые numpy массивы, для последующего наполния из file # len_step = 32000 - кол-во строчек, которые влезают в оперативку Data_ALL = [0, 1, 2, 3, 4, 5] # Просто числа, которые потом будут перезаписаны нулевыми numpy массивами. for i in range(5): Data_ALL[i] = np.zeros((len_step, 1000, 4), dtype='float32') Data_ALL[-1] = np.zeros((len_step, 1), dtype='float32') # Описываю нужные датасеты и строчки, которые нужно выбирать # mem_ind = [список из случайно выбранных строчек, его длина равна len_step] arg_0 = [file, mem_ind, 'data_X_0', Data_ALL[0]] Data_ALL[0] = [] # Зачищаю нулевой массив, для экономии оперативки arg_1 = [file, mem_ind, 'data_X_1', Data_ALL[1]] Data_ALL[1] = [] arg_2 = [file, mem_ind, 'data_X_2', Data_ALL[2]] Data_ALL[2] = [] arg_3 = [file, mem_ind, 'data_X_3', Data_ALL[3]] Data_ALL[3] = [] arg_4 = [file, mem_ind, 'data_X_4', Data_ALL[4]] Data_ALL[4] = [] arg_5 = [file, mem_ind, 'data_Y', Data_ALL[5]] Data_ALL = [] args = [arg_0, arg_1, arg_2, arg_3, arg_4, arg_5] # Запускаю зачитку из файла в 2х параллельных процессах. Data_ALL = make_pool(args) 

At first, everything works fine and everything works well, but at a random point in time (it’s not clear what it depends on, it can happen after 50 iterations, etc.) everything hangs in this place

 with h5py.File(file) as f: for num, item in enumerate(mem_ind): Data_ALL[num] = f[name_dset][item] # В этом месте постоянно всё виснет 

Moreover, all datasets are read: data_X_1, data_X_2, data_X_3, data_X_4, data_Y, except data_X_0. And when data_X_0 is read, everything hangs and does not move anywhere else, the overall large program cycle waits for the read data, but this does not happen and you have to press ^ + C to exit the program and restart it again. In this case, restarting almost never helps, then you need to restart the computer :) And rebooting it does not always heal. As a result, you wait 2 hours while everything is working and the grid is normally trained, and then the BAM, and everything must be restarted again :(

Now I will try to remake everything without multiprocessing.Pool, and with the help of mmap (). Maybe then the data will be read very quickly and I will not need to parallelize it.

  • one
    those. The processor slows down the entire reading process with its calculations - this statement is most likely erroneous. The slowest part in the system is I / O. - 0andriy
  • one
    Further, in python there is a problem called GIL - the global interpreter lock, which you also stumble upon. - 0andriy
  • one
    Well, the part about the beginning is most likely filling the caches, because everything “flies”, you need to make sure that you open the file using mmap () . - 0andriy
  • @ 0andriy, thank you so much for answering me. I just saw your comment just now. When I posted a question, I waited a couple of days for an answer, but no one answered, I thought that there would be no answer :) In parallel, I myself tried to think of something and thought up the same, but it works crookedly: (Now I’ll describe what I did and what is the problem now, maybe tell me how to deal with it? - Peter Koltakov
  • one
    Rather hid. Here is the latest article on Linux I / O, which allows you to look under the hood: scylladb.com/2017/10/05/io-access-methods-scylla - 0andriy

0