Input data: a vector of elements of constant size (circular buffer), plus an atomic number indicating the current position in this same vector.
const int MAXIMUM = 100; QVector<Data> _buffer(MAXIMUM); QAtomicInt _atomic_index; Further, a method that allows multi-stream data to be inserted into a vector:
void insert(const Data &data) { _buffer[_atomic_index++] = data; } The problem is obvious here, that after reaching the vector size limit, it is necessary to return to the zero index:
void insert(const Data &data) { _atomic_index.testAndSetOrdered(MAXIMUM,0); _buffer[_atomic_index++] = data; } This is not a solution, because If, for example, two or more streams simultaneously pass the test in the first line and the value of _atomic_index is not far from the vector boundary, then the output and the subsequent default will actually be guaranteed.
It turns out that in this situation, an atomic operation is required, consisting of a test for equality to the maximum, plus the index increment in it:
void insert(const Data &data) { mutex.lock(); if(index == MAXIMUM) index = 0; _buffer[index++] = data; mutex.unlock(); } But this option completely eliminates the profit when multi-threaded data insertion.
I tried to solve the problem myself, but there is no complete confidence in the correctness of the solution:
void insert(const Data &data) { int index = _atomic_index++; if(index >= MAXIMUM) { const int new_index = index - MAXIMUM; _atomic_index.testAndSetOrdered(index+1,new_index); index = new_index; } _buffer[index++] = data; } The point is to use a variable local and individual for each thread that contains the current index value:
int index = _atomic_index++; It is this variable that we check for exceeding the size of the vector:
if(index >= MAXIMUM) {...} And inside this condition, _atomic_index , in my opinion, should take the value of the most recent index increment operation. Am I right? Or, perhaps, there is another way to solve this problem?
Update
In order to focus only on the moment of checking the index to go beyond the boundaries of the vector size, we assume, by default, that the number of recording streams in this particular example is significantly (several times) less than the number of elements in the vector and, thus, we exclude from attention the possibility of rewriting the still not read (occupied) elements of the vector with newly inserted elements. For example:
- recording streams: 10
- reading threads: 20
- vector length: 100
Reading streams do not lag behind the recording, which means that all the newly filled cells will be read almost immediately. Or even radically: the recorded cells can be overwritten at least immediately, since the data for reading is not important.
Only the moment of resetting the counter value of the current index of the record with multi-threaded inserts is of interest, when, for example, all 10 threads simultaneously try to increment it by one and the current index is equal to 98 out of 100 possible elements. In this case, the correct processing should assign index 99 to one stream, while the remaining nine should be assigned from 0 to 8. At the same time, the value of the current index (in the code is _atomic_index ) should be equal to the last index of those assigned, i.e.also 8.
Update 2
In the strikethrough statement - an error. The current index should be equal to 9, i.e. one more from the last inserted, because the line:
int index = _atomic_index++; ... use post-increment.