Input data: a vector of elements of constant size (circular buffer), plus an atomic number indicating the current position in this same vector.

const int MAXIMUM = 100; QVector<Data> _buffer(MAXIMUM); QAtomicInt _atomic_index; 

Further, a method that allows multi-stream data to be inserted into a vector:

 void insert(const Data &data) { _buffer[_atomic_index++] = data; } 

The problem is obvious here, that after reaching the vector size limit, it is necessary to return to the zero index:

 void insert(const Data &data) { _atomic_index.testAndSetOrdered(MAXIMUM,0); _buffer[_atomic_index++] = data; } 

This is not a solution, because If, for example, two or more streams simultaneously pass the test in the first line and the value of _atomic_index is not far from the vector boundary, then the output and the subsequent default will actually be guaranteed.

It turns out that in this situation, an atomic operation is required, consisting of a test for equality to the maximum, plus the index increment in it:

 void insert(const Data &data) { mutex.lock(); if(index == MAXIMUM) index = 0; _buffer[index++] = data; mutex.unlock(); } 

But this option completely eliminates the profit when multi-threaded data insertion.

I tried to solve the problem myself, but there is no complete confidence in the correctness of the solution:

 void insert(const Data &data) { int index = _atomic_index++; if(index >= MAXIMUM) { const int new_index = index - MAXIMUM; _atomic_index.testAndSetOrdered(index+1,new_index); index = new_index; } _buffer[index++] = data; } 

The point is to use a variable local and individual for each thread that contains the current index value:

 int index = _atomic_index++; 

It is this variable that we check for exceeding the size of the vector:

 if(index >= MAXIMUM) {...} 

And inside this condition, _atomic_index , in my opinion, should take the value of the most recent index increment operation. Am I right? Or, perhaps, there is another way to solve this problem?

Update

In order to focus only on the moment of checking the index to go beyond the boundaries of the vector size, we assume, by default, that the number of recording streams in this particular example is significantly (several times) less than the number of elements in the vector and, thus, we exclude from attention the possibility of rewriting the still not read (occupied) elements of the vector with newly inserted elements. For example:

  • recording streams: 10
  • reading threads: 20
  • vector length: 100

Reading streams do not lag behind the recording, which means that all the newly filled cells will be read almost immediately. Or even radically: the recorded cells can be overwritten at least immediately, since the data for reading is not important.

Only the moment of resetting the counter value of the current index of the record with multi-threaded inserts is of interest, when, for example, all 10 threads simultaneously try to increment it by one and the current index is equal to 98 out of 100 possible elements. In this case, the correct processing should assign index 99 to one stream, while the remaining nine should be assigned from 0 to 8. At the same time, the value of the current index (in the code is _atomic_index ) should be equal to the last index of those assigned, i.e. also 8.

Update 2

In the strikethrough statement - an error. The current index should be equal to 9, i.e. one more from the last inserted, because the line:

 int index = _atomic_index++; 

... use post-increment.

  • Comments are not intended for extended discussion; conversation moved to chat . - Nick Volynkin

1 answer 1

My answer is based on my own experience with java; With C ++, I practically had no experience, but the semantics should be the same.

I would like to make out the proposed example. Let's make a mental experiment with the following situation:

 void insert(const Data &data) { int index = _atomic_index++; // Thread B if(index >= MAXIMUM) { const int new_index = index - MAXIMUM; _atomic_index.testAndSetOrdered(index+1,new_index); // Thread A index = new_index; } _buffer[index++] = data; } 

Thread A managed to go through several expressions faster than Thread B, and at the moment when Thread A tries to reset the index to zero, Thread B increments it, so resetting the index to zero will fail, and (since the operation does not create barriers to its repetition) This allows for an unlimited increase in _atomic_index - if the situation is constantly repeating. As soon as _atomic_index reaches the value MAXIMUM * 2 - 1 , the index will be calculated with the value MAXIMUM , which will lead to an attempt to write beyond the expected range of values.

The first solution that comes to mind is to use the modulo operator instead of a single subtraction of MAXIMUM to reduce the likelihood of such a situation, but it won’t save it, because if the int overflow _atomic_index returns an unexpected value.

One of the classic approaches for this situation is to implement a spinlock manually based on optimistic locking. Next comes the pseudocode, i.e. I do not know the actual methods in the library used, nor the syntax with ++:

 void insert(const Data &data) { // это все без проблем выносится в отдельный метод int current; int next; do { current = _atomic_index.get(); next = (current + 1) % MAXIMUM; } while (!_atomic_index.compareAndSet(current, next)) _buffer[current] = data; } 

In this case, we can talk about the following invariant:

  • Next value = current value + 1% MAXIMUM
  • The following value can be set instead of the current one only if the current value is true (= not changed since it was read)

The algorithm has exactly two ways:

  • Set the next value of the index if it has not changed
  • Go to a new circle, if between two operations the index has changed

If we have competing streams, it will look something like this:

  • Index: 1
  • Maximum: 3
  • T1: trying to set 2 instead of 1
  • T2: trying to set 2 instead of 1
  • T3: trying to set 2 instead of 1

(theatrical pause, magic behind the scenes)

  • T1: success, set 2
  • T2: failure, trying to set 0 instead of 2
  • T3: failure, trying to set 0 instead of 2

(theatrical pause, magic behind the scenes)

  • T1: worked
  • T2: failure, trying to set 1 instead of 0
  • T3: success, set to 0

(theatrical pause, magic behind the scenes)

  • T1: worked
  • T2: success, installed 1
  • T3: worked

What happens if the stream "falls asleep", and during sleep the counter went through a full cycle?

If I did not mess up anything - nothing terrible. Let's reproduce the algorithm:

  • Index: 1
  • Maximum: 4
  • T1: reads 1 and falls asleep
  • T2: reads 1 and updates to 2
  • T2: reads 2 and updates at 3
  • T2: reads 3 and updates to 0
  • T2: reads 0 and updates at 1
  • T2: reads 1
  • T1: wakes up and updates 1 on 2
  • T2: fails in the update, comes to a new circle

Is this approach safe? Specifically in this case - no

This answer suggests a solution to the problem of correctly incrementing the counter, but not the entire algorithm.

  • Choosing a new index and setting the value directly are two different operations, and the atomicity of the insert method is not guaranteed. The OS has the right to suspend the flow immediately after the index "reserve" and wake it up after an arbitrary time, so data in the same order can be written in the same cell:
    • T1: reserved cell with index 1
    • T1: fell asleep
    • (some time working with the algorithm, during which the index is reset and goes to a new circle)
    • T2: reserved cell with index 1
    • T2: put the data
    • T1: woke up
    • T1: put obsolete data
  • In addition, the index is incremented before the appearance of the data at the address - this means that the read index can refer to the cell in which the old data is still, considering them relevant
  • In addition, the entire algorithm cannot be considered working until the tail position is taken into account, otherwise the tail may reach the head, and in this case (!) MAXIMUM elements will be lost.

If you want to find a good implementation of this algorithm, you should look in the google lock-free circular buffer or a similar phrase; I, frankly, cannot immediately come up with implementations other than a linked list with length tracking (in which it will be very difficult to guarantee the correctness of the algorithm without pessimistic locking) and directly pessimistic locking. The latter may not be so badly varinatom, and, perhaps, will show itself even better than the example of the spinlock in an environment with a large number of threads.

  • Exhaustive answer, thank you very much! - alexis031182
  • But there will be no problems if the index has not only changed, but also ran a whole circle while doing this? It will be delineated as if it has not changed. - VladD
  • @VladD there from the very beginning section about it. In fact, it would be like that the call itself would be delayed for a whole range. - etki
  • @Etki: It seems to be yes. And what will happen if before the line _buffer[current] = data; the buffer will run on a circle + one element? - VladD
  • @VladD about this whole last section after the statement about insecurity - etki