Another answer is because it is dedicated not to advice, but to experimental verification.
After some experiments, I must admit that my assumption that disk operations will not give acceleration due to multithreading did not materialize.
Here are the experiments.
To begin, create 20,000 files of 200K each -
#include <stdlib.h> #include <stdio.h> #include <string.h> int main() { for(int i = 0; i < 20000; ++i) { char buf[200]; sprintf(buf,"g:\\tmp\\test\\%06d.dat",i); FILE * f = fopen(buf,"wb"); if (f) { for(int j = 0; j < 50000; ++j) fwrite(&i,sizeof(i),1,f); fwrite(buf,strlen(buf)+1,1,f); fclose(f); } } }
After that, the main program comes into play.
#include <vector> #include <string> #include <iostream> #include <fstream> #include <iomanip> #include <thread> #include <mutex> #include <future> #include <io.h> #include <muTimer.hpp> using namespace std; mutex sumMtx; // Мьютекс для защиты аккумулятора unsigned long long sum = 0; // Аккумулятор (сумма чисел, считанных из файла) mutex filesMtx; // Мьютекс для защиты списка файлов vector<string> files; // Список имен файлов size_t filesIdx; // Текущий индекс в файле vector<string> getDirFiles(string dirName) // Сбор всех имен файлов в каталоге { vector<string> fileNames; _finddata_t info; intptr_t handle = _findfirst((dirName + "\\*.*").c_str(),&info); if (handle == -1) return fileNames; do { if (info.attrib & _A_SUBDIR) continue; fileNames.push_back(string(dirName) + "\\" + info.name); } while(_findnext(handle,&info) == 0); _findclose(handle); return fileNames; } void handleOneFile(const char * name) // Обработка одного файла { fstream f(name,ios::binary|ios::in|ios::out); if (f) { unsigned int v = 0; // Чтение около 40К for(int j = 0; j < 10000; ++j) f.read((char*)&v,sizeof(v)); time_t t; time(&t); f.seekp(f.tellg()); f.write((char*)&t,sizeof(t)); // Запись 4 байт lock_guard<mutex> lk(sumMtx); // Увеличение аккумулятора sum += v; } } void asyncHandle() // Главная функция потока { for(;;) { const char * name; // Чтение очередного { // файла из списка, lock_guard<mutex> lk(filesMtx); // если уже все считаны - if (filesIdx >= files.size()) break; // выход name = files[filesIdx++].c_str(); } handleOneFile(name); // Обработка файла } } int main(int argc, const char * argv[]) { files = getDirFiles("G:\\Tmp\\Test"); // Сбор списка файлов filesIdx = 0; // Сброс и пару проходов asyncHandle(); // приведения кешей в filesIdx = 0; // стабильное состояние asyncHandle(); for(int threadCount = 1; threadCount < 20; ++threadCount) // Для разного числа потоков { sum = 0; // Сброс аккумулятора и индекса filesIdx = 0; cout << "threadCount = " << setw(3) << threadCount << ": "; muTimer mu; // Мой таймер для хронометража vector<future<void>> tasks; for(int i = 0; i < threadCount; ++i) // Создаем threadCount потоков { tasks.push_back(async(asyncHandle)); } for(int i = 0; i < threadCount; ++i) // Дожидаемся завершения { tasks[i].get(); } cout << sum << " "; // Выводим накопленную сумму } // и затраченное время }
The result was as follows (I tried to remove all the tasks on the machine as much as possible so that no one at that moment would get to the disk):
threadCount = 1: 199990000 46108 ms threadCount = 2: 199990000 23587 ms threadCount = 3: 199990000 9802 ms threadCount = 4: 199990000 8409 ms threadCount = 5: 199990000 8492 ms threadCount = 6: 199990000 8575 ms threadCount = 7: 199990000 8332 ms threadCount = 8: 199990000 8507 ms threadCount = 9: 199990000 8585 ms threadCount = 10: 199990000 8254 ms threadCount = 11: 199990000 8326 ms threadCount = 12: 199990000 8218 ms threadCount = 13: 199990000 8784 ms threadCount = 14: 199990000 8359 ms threadCount = 15: 199990000 8528 ms threadCount = 16: 199990000 8481 ms threadCount = 17: 199990000 8531 ms threadCount = 18: 199990000 8772 ms threadCount = 19: 199990000 8525 ms
I compiled VC ++ 2015 x86, a machine under Windows 7 x64, with a quad-core processor, so the advice to launch threads no more than there are cores is generally justified :)
Update
When changing in main() to such a code
vector<thread> tasks; for(int i = 0; i < threadCount; ++i) { tasks.push_back(thread(asyncHandle)); } for(int i = 0; i < threadCount; ++i) { tasks[i].join(); }
The results have not changed much:
threadCount = 1: 199990000 41373 ms threadCount = 2: 199990000 24758 ms threadCount = 3: 199990000 9544 ms threadCount = 4: 199990000 8165 ms threadCount = 5: 199990000 7911 ms threadCount = 6: 199990000 7970 ms threadCount = 7: 199990000 7807 ms threadCount = 8: 199990000 7942 ms threadCount = 9: 199990000 8064 ms threadCount = 10: 199990000 7858 ms threadCount = 11: 199990000 8361 ms threadCount = 12: 199990000 8157 ms threadCount = 13: 199990000 8550 ms threadCount = 14: 199990000 8001 ms threadCount = 15: 199990000 8392 ms threadCount = 16: 199990000 8346 ms threadCount = 17: 199990000 8558 ms threadCount = 18: 199990000 8410 ms threadCount = 19: 199990000 8398 ms
But when you change
void asyncHandle(int start, int stop) { for(;start < stop; ++start) { handleOneFile(files[start].c_str()); } }
and
vector<thread> tasks; int count = (files.size()+20)/threadCount; for(int i = 0; i < threadCount; ++i) { tasks.push_back(thread(asyncHandle,i*count, std::min((size_t)(i+1)*count,files.size()))); } for(int i = 0; i < threadCount; ++i) { tasks[i].join(); }
(i.e. each thread was given an approximately equal piece of work initially) the results were obtained
threadCount = 1: 199990000 57153 ms threadCount = 2: 199990000 27474 ms threadCount = 3: 199990000 16320 ms threadCount = 4: 199990000 13041 ms threadCount = 5: 199990000 8656 ms threadCount = 6: 199990000 8811 ms threadCount = 7: 199990000 8943 ms threadCount = 8: 199990000 10088 ms threadCount = 9: 199990000 9069 ms threadCount = 10: 199990000 8360 ms threadCount = 11: 199990000 8578 ms threadCount = 12: 199990000 8839 ms threadCount = 13: 199990000 8435 ms threadCount = 14: 199990000 8957 ms threadCount = 15: 199990000 8718 ms threadCount = 16: 199990000 10704 ms threadCount = 17: 199990000 10382 ms threadCount = 18: 199990000 10500 ms threadCount = 19: 199990000 11576 ms
Those. growth with an increase in the number of flows appeared, but not so great, and with small numbers of flows, the results were clearly worse. Why - there are no special ideas, except that the first streams had time to launch the next ones to do a lot of work, so they ended first and waited for the latter to finish their part. So it seems to me that more uniform work - when selecting files from a collection one by one - is a more efficient way of working.
Well, and the last - if you transfer the accumulation after all the work done, and not for each file:
int handleOneFile(const char * name) { fstream f(name,ios::binary|ios::in|ios::out); if (f) { unsigned int v = 0; for(int j = 0; j < 10000; ++j) f.read((char*)&v,sizeof(v)); time_t t; time(&t); f.seekp(f.tellg()); f.write((char*)&t,sizeof(t)); return v; } return 0; } void asyncHandle(int start, int stop) { long long s = 0; for(;start < stop; ++start) { s += handleOneFile(files[start].c_str()); } lock_guard<mutex> lk(sumMtx); sum += s; }
It turned out like this:
threadCount = 1: 199990000 38168 ms threadCount = 2: 199990000 23800 ms threadCount = 3: 199990000 10787 ms threadCount = 4: 199990000 9065 ms threadCount = 5: 199990000 8403 ms threadCount = 6: 199990000 8077 ms threadCount = 7: 199990000 8410 ms threadCount = 8: 199990000 9656 ms threadCount = 9: 199990000 8038 ms threadCount = 10: 199990000 8209 ms threadCount = 11: 199990000 8722 ms threadCount = 12: 199990000 8561 ms threadCount = 13: 199990000 8524 ms threadCount = 14: 199990000 8524 ms threadCount = 15: 199990000 8000 ms threadCount = 16: 199990000 8511 ms threadCount = 17: 199990000 8463 ms threadCount = 18: 199990000 9431 ms threadCount = 19: 199990000 8456 ms
Frankly, it is somewhat difficult to draw any unambiguous (far leading :)) conclusions and to give some recommendations ... Mutexes clearly play a role, although not cardinal, but quite noticeable. Now I will do another experiment with async and round it up.
Here with async again. The difference is that here the accumulation in the global variable is not done after processing the file, but after all the work has been completed; In addition, in order to further reduce the number of calls to the mutex, I took two file names from the collection at once.
The result is not to say that it is very different. Why, for a small number of threads, more than the very first version is a mystery to me :(
threadCount = 1: 199990000 44203 ms threadCount = 2: 199990000 39898 ms threadCount = 3: 199990000 23217 ms threadCount = 4: 199990000 7808 ms threadCount = 5: 199990000 8043 ms threadCount = 6: 199990000 7668 ms threadCount = 7: 199990000 8168 ms threadCount = 8: 199990000 7762 ms threadCount = 9: 199990000 7675 ms threadCount = 10: 199990000 8221 ms threadCount = 11: 199990000 7910 ms threadCount = 12: 199990000 8056 ms threadCount = 13: 199990000 7888 ms threadCount = 14: 199990000 8058 ms threadCount = 15: 199990000 7741 ms threadCount = 16: 199990000 8075 ms threadCount = 17: 199990000 7812 ms threadCount = 18: 199990000 8489 ms threadCount = 19: 199990000 8346 ms
getFilePath()should be made atomic — say, using mutexes. And something is incomprehensible to me - how does it work for you? Iterator where is stored? Is it a member of the class? - Harry