I've been puzzling for several hours, I can’t figure out how to implement file processing performed by two threads.

Threading (2)

for (int i = 0; i < threadCount; i++) { threads.push_back(boost::thread(readFile)); } 

readFile function

 void readFile() { while (true) { fs::path filePath = getFilePath(); fs::ifstream ifs(filePath); if (!ifs) break; if (!ifs.is_open()) { break; } *обработка* } 

getFilePath function

 fs::path getFilePath() { if (it != fs::directory_iterator() && it->path().extension() == ".txt") { fs::path itPath = it->path(); ++it; return itPath; } } 

Those. Now I create a thread that immediately accesses getFilePath and processes the file. This is repeated as long as there are unprocessed files in the folder. If you run one thread, then everything is fine, but with two, everything is bad.

Give a couple of examples of working code, or point out my mistakes.

UPD:

Achieved normal operation by changing getFilePath

 string getFilePath() { { lock_guard<mutex> lock(m); if (it != fs::directory_iterator() && it->path().extension() == ".txt") { string itPath = it->path().string(); ++it; bHaveFiles = true; return itPath; } else { return "null"; } } } 

But the question has not changed much. Now, if I create conditional 8 threads, then because of the mutext only one thread will receive the file for processing, and the remaining 7 threads have to wait for their turn. Is it really possible to organize asynchronous work?

  • Bad - what? what exactly is not working? Well, at least getFilePath() should be made atomic — say, using mutexes. And something is incomprehensible to me - how does it work for you? Iterator where is stored? Is it a member of the class? - Harry
  • An error "... caused a breakpoint to be triggered." If you use mutexes, the threads will wait for their turn, and I need independent execution, or something like that. - Vitali
  • 2
    You seem to have the usual simplified producer-consumer. How I would implement this task. The main thread collects a list of files and puts them into a large array. Further threads are created. Their processing is simple - get the file, process. The "get file" function is wrapped in a mutex and takes the next element from the array. If the files run out, the stream ends. Since the file retrieval function will work quickly, everything will work well. - KoVadim

2 answers 2

Another answer is because it is dedicated not to advice, but to experimental verification.

After some experiments, I must admit that my assumption that disk operations will not give acceleration due to multithreading did not materialize.

Here are the experiments.

To begin, create 20,000 files of 200K each -

 #include <stdlib.h> #include <stdio.h> #include <string.h> int main() { for(int i = 0; i < 20000; ++i) { char buf[200]; sprintf(buf,"g:\\tmp\\test\\%06d.dat",i); FILE * f = fopen(buf,"wb"); if (f) { for(int j = 0; j < 50000; ++j) fwrite(&i,sizeof(i),1,f); fwrite(buf,strlen(buf)+1,1,f); fclose(f); } } } 

After that, the main program comes into play.

 #include <vector> #include <string> #include <iostream> #include <fstream> #include <iomanip> #include <thread> #include <mutex> #include <future> #include <io.h> #include <muTimer.hpp> using namespace std; mutex sumMtx; // Мьютекс для защиты аккумулятора unsigned long long sum = 0; // Аккумулятор (сумма чисел, считанных из файла) mutex filesMtx; // Мьютекс для защиты списка файлов vector<string> files; // Список имен файлов size_t filesIdx; // Текущий индекс в файле vector<string> getDirFiles(string dirName) // Сбор всех имен файлов в каталоге { vector<string> fileNames; _finddata_t info; intptr_t handle = _findfirst((dirName + "\\*.*").c_str(),&info); if (handle == -1) return fileNames; do { if (info.attrib & _A_SUBDIR) continue; fileNames.push_back(string(dirName) + "\\" + info.name); } while(_findnext(handle,&info) == 0); _findclose(handle); return fileNames; } void handleOneFile(const char * name) // Обработка одного файла { fstream f(name,ios::binary|ios::in|ios::out); if (f) { unsigned int v = 0; // Чтение около 40К for(int j = 0; j < 10000; ++j) f.read((char*)&v,sizeof(v)); time_t t; time(&t); f.seekp(f.tellg()); f.write((char*)&t,sizeof(t)); // Запись 4 байт lock_guard<mutex> lk(sumMtx); // Увеличение аккумулятора sum += v; } } void asyncHandle() // Главная функция потока { for(;;) { const char * name; // Чтение очередного { // файла из списка, lock_guard<mutex> lk(filesMtx); // если уже все считаны - if (filesIdx >= files.size()) break; // выход name = files[filesIdx++].c_str(); } handleOneFile(name); // Обработка файла } } int main(int argc, const char * argv[]) { files = getDirFiles("G:\\Tmp\\Test"); // Сбор списка файлов filesIdx = 0; // Сброс и пару проходов asyncHandle(); // приведения кешей в filesIdx = 0; // стабильное состояние asyncHandle(); for(int threadCount = 1; threadCount < 20; ++threadCount) // Для разного числа потоков { sum = 0; // Сброс аккумулятора и индекса filesIdx = 0; cout << "threadCount = " << setw(3) << threadCount << ": "; muTimer mu; // Мой таймер для хронометража vector<future<void>> tasks; for(int i = 0; i < threadCount; ++i) // Создаем threadCount потоков { tasks.push_back(async(asyncHandle)); } for(int i = 0; i < threadCount; ++i) // Дожидаемся завершения { tasks[i].get(); } cout << sum << " "; // Выводим накопленную сумму } // и затраченное время } 

The result was as follows (I tried to remove all the tasks on the machine as much as possible so that no one at that moment would get to the disk):

 threadCount = 1: 199990000 46108 ms threadCount = 2: 199990000 23587 ms threadCount = 3: 199990000 9802 ms threadCount = 4: 199990000 8409 ms threadCount = 5: 199990000 8492 ms threadCount = 6: 199990000 8575 ms threadCount = 7: 199990000 8332 ms threadCount = 8: 199990000 8507 ms threadCount = 9: 199990000 8585 ms threadCount = 10: 199990000 8254 ms threadCount = 11: 199990000 8326 ms threadCount = 12: 199990000 8218 ms threadCount = 13: 199990000 8784 ms threadCount = 14: 199990000 8359 ms threadCount = 15: 199990000 8528 ms threadCount = 16: 199990000 8481 ms threadCount = 17: 199990000 8531 ms threadCount = 18: 199990000 8772 ms threadCount = 19: 199990000 8525 ms 

I compiled VC ++ 2015 x86, a machine under Windows 7 x64, with a quad-core processor, so the advice to launch threads no more than there are cores is generally justified :)

Update

When changing in main() to such a code

  vector<thread> tasks; for(int i = 0; i < threadCount; ++i) { tasks.push_back(thread(asyncHandle)); } for(int i = 0; i < threadCount; ++i) { tasks[i].join(); } 

The results have not changed much:

 threadCount = 1: 199990000 41373 ms threadCount = 2: 199990000 24758 ms threadCount = 3: 199990000 9544 ms threadCount = 4: 199990000 8165 ms threadCount = 5: 199990000 7911 ms threadCount = 6: 199990000 7970 ms threadCount = 7: 199990000 7807 ms threadCount = 8: 199990000 7942 ms threadCount = 9: 199990000 8064 ms threadCount = 10: 199990000 7858 ms threadCount = 11: 199990000 8361 ms threadCount = 12: 199990000 8157 ms threadCount = 13: 199990000 8550 ms threadCount = 14: 199990000 8001 ms threadCount = 15: 199990000 8392 ms threadCount = 16: 199990000 8346 ms threadCount = 17: 199990000 8558 ms threadCount = 18: 199990000 8410 ms threadCount = 19: 199990000 8398 ms 

But when you change

 void asyncHandle(int start, int stop) { for(;start < stop; ++start) { handleOneFile(files[start].c_str()); } } 

and

  vector<thread> tasks; int count = (files.size()+20)/threadCount; for(int i = 0; i < threadCount; ++i) { tasks.push_back(thread(asyncHandle,i*count, std::min((size_t)(i+1)*count,files.size()))); } for(int i = 0; i < threadCount; ++i) { tasks[i].join(); } 

(i.e. each thread was given an approximately equal piece of work initially) the results were obtained

 threadCount = 1: 199990000 57153 ms threadCount = 2: 199990000 27474 ms threadCount = 3: 199990000 16320 ms threadCount = 4: 199990000 13041 ms threadCount = 5: 199990000 8656 ms threadCount = 6: 199990000 8811 ms threadCount = 7: 199990000 8943 ms threadCount = 8: 199990000 10088 ms threadCount = 9: 199990000 9069 ms threadCount = 10: 199990000 8360 ms threadCount = 11: 199990000 8578 ms threadCount = 12: 199990000 8839 ms threadCount = 13: 199990000 8435 ms threadCount = 14: 199990000 8957 ms threadCount = 15: 199990000 8718 ms threadCount = 16: 199990000 10704 ms threadCount = 17: 199990000 10382 ms threadCount = 18: 199990000 10500 ms threadCount = 19: 199990000 11576 ms 

Those. growth with an increase in the number of flows appeared, but not so great, and with small numbers of flows, the results were clearly worse. Why - there are no special ideas, except that the first streams had time to launch the next ones to do a lot of work, so they ended first and waited for the latter to finish their part. So it seems to me that more uniform work - when selecting files from a collection one by one - is a more efficient way of working.

Well, and the last - if you transfer the accumulation after all the work done, and not for each file:

 int handleOneFile(const char * name) { fstream f(name,ios::binary|ios::in|ios::out); if (f) { unsigned int v = 0; for(int j = 0; j < 10000; ++j) f.read((char*)&v,sizeof(v)); time_t t; time(&t); f.seekp(f.tellg()); f.write((char*)&t,sizeof(t)); return v; } return 0; } void asyncHandle(int start, int stop) { long long s = 0; for(;start < stop; ++start) { s += handleOneFile(files[start].c_str()); } lock_guard<mutex> lk(sumMtx); sum += s; } 

It turned out like this:

 threadCount = 1: 199990000 38168 ms threadCount = 2: 199990000 23800 ms threadCount = 3: 199990000 10787 ms threadCount = 4: 199990000 9065 ms threadCount = 5: 199990000 8403 ms threadCount = 6: 199990000 8077 ms threadCount = 7: 199990000 8410 ms threadCount = 8: 199990000 9656 ms threadCount = 9: 199990000 8038 ms threadCount = 10: 199990000 8209 ms threadCount = 11: 199990000 8722 ms threadCount = 12: 199990000 8561 ms threadCount = 13: 199990000 8524 ms threadCount = 14: 199990000 8524 ms threadCount = 15: 199990000 8000 ms threadCount = 16: 199990000 8511 ms threadCount = 17: 199990000 8463 ms threadCount = 18: 199990000 9431 ms threadCount = 19: 199990000 8456 ms 

Frankly, it is somewhat difficult to draw any unambiguous (far leading :)) conclusions and to give some recommendations ... Mutexes clearly play a role, although not cardinal, but quite noticeable. Now I will do another experiment with async and round it up.

Here with async again. The difference is that here the accumulation in the global variable is not done after processing the file, but after all the work has been completed; In addition, in order to further reduce the number of calls to the mutex, I took two file names from the collection at once.

The result is not to say that it is very different. Why, for a small number of threads, more than the very first version is a mystery to me :(

 threadCount = 1: 199990000 44203 ms threadCount = 2: 199990000 39898 ms threadCount = 3: 199990000 23217 ms threadCount = 4: 199990000 7808 ms threadCount = 5: 199990000 8043 ms threadCount = 6: 199990000 7668 ms threadCount = 7: 199990000 8168 ms threadCount = 8: 199990000 7762 ms threadCount = 9: 199990000 7675 ms threadCount = 10: 199990000 8221 ms threadCount = 11: 199990000 7910 ms threadCount = 12: 199990000 8056 ms threadCount = 13: 199990000 7888 ms threadCount = 14: 199990000 8058 ms threadCount = 15: 199990000 7741 ms threadCount = 16: 199990000 8075 ms threadCount = 17: 199990000 7812 ms threadCount = 18: 199990000 8489 ms threadCount = 19: 199990000 8346 ms 
  • If you look closely, after increasing the number of "streams" is more than 4, the result does not change (at the level of statistical error). And practice suggests that it will begin to degrade for a long time. But I did not accidentally take the word "stream" in quotes - in your example the "streams" are not entirely honest. According to the documentation, async may not create a stream at all. - KoVadim
  • Thank you so much for the answer. I tried several implementation options with threads and asynchronous calls. The total execution time varies minimally (error). In my case, everything comes getFilePath to the mitexty of getFilePath - Vitali
  • If you are sure that you rest against these mutexes (well, there is not much), then try to take not one at a time, but a dozen at once. If productivity grows, then there really is a problem. - KoVadim
  • @KoVadim Well, of course, I tracked the number of threads in the process. It grew, as it should be - so that the threads really created. I rather feel that the OS since some point serializes the disk access. Yes, async(launch::async,... I did the same - the picture is the same. And also - what will be the "right" threads for you? If I use thread - will it be the "honest" threads? If yes, I will New Experiment ... - Harry
  • I think that using clean std :: thread with a controlled count will be a pretty good test. And even more - it will probably be correct to write the code like this - after you have received the entire list of files, divide it between threads and let them process. And the counter (battery) they will update only when everything is processed. - KoVadim

The comment simply did not fit, so forgive me, I will express some of my thoughts with the answer.

Disk operations are significantly longer than retrieving a single value from a container using an iterator, so waiting for the mutex will not inhibit the flow - this is not the main brake.

As for mutexes, again, you need to somehow implement the counting of running threads, and to do this, use the same mutex ( here I was given advice on how to implement it).

I would collect the list of files in advance, and then simply give out the streams one at a time — right up to collect them all in some vector, and issue a link or pointer to the name without even copying — it is unlikely that such a short operation will be long. But podgadit unsynchronized treatment can greatly. Yes, and collecting a list along the way is bad for two reasons: this is also a disk operation, and there may be brakes, this is a time, and the list of files during operation may change - these are two ...

Also, I would not use threads. All authorities in one voice say that it is better to use async - if they write the truth, then its implementations work with the thread pool, which means it is significantly faster than creating-killing threads. From my point of view, it is even more important that async correctly handle even the exception in the stream without putting the program at the same time.

Another note - with a large number of files and a large number of threads, the limit on the number of simultaneously open files will start working against you.

But in general, I repeat: personally, I don’t expect much effect from parallelizing disk operations. If you parallelize, say, computations and disk operations, then this is reasonable: a slow disk operation does not force the computations to slow down in this case. But when several threads at the same time require the disk to do something - I have a suspicion that the operating system will simply put such requirements in one place, and all multithreading will become self-deception ...

The question you asked is interesting, you should sit and log out ...