I'll start with the background.

So, I need an example of a "one writes - one reads" thread-safe C ++ queue. I searched on different sites and it seemed that something interesting had been dug up: here it is, this code .

But something further alerted me. More precisely, I did not understand something and began to search in various directories such as MSDN and cppreference, in different blogs, but alas. Turned here, namely here .

And now I have some doubt about that code. Yes, and I myself would have changed something, basically, I do not understand the purpose of the alloc_node() method there. No, I should not explain the logic of the algorithms from there, it seems to be quite obvious, and I understood it. Only now I think that two extra first and tail_copy pointers have been entered there, and this method is not needed, and the “writer” should be rewritten: the fact is that, apparently, his data are considered to be a reader, but they will hang in memory until destroyed by the destructor or overwritten. This would be useful if they would need to be seen again, but in such a situation, I think, a doubly linked list or ring, or at least a key library was required - and in that code there is no, there is only a next pointer at each node, and this is not enough. To create a special designator, constantly showing the end of the queue, and then passing through it, is also not very good, in my opinion.

I do not pretend to be a connoisseur, I am a novice, do not chase me, but it seems to me that it would be better (I haven’t figured out the barriers yet, if they are needed here). This is not a question, more thought out loud, it may be necessary to answer / someone will come in handy (if it works at all):

 ~spsc_queue() { if(load_consume(&tail) == nullptr) return; // если уже всё удалено до if(load_consume(&tail->next) == nullptr) { delete tail; return; } // если есть только начальное звено от конструктора do { node * next = tail->next; delete tail; store_release(&tail, next); } while(load_consume(&tail) != nullptr); } // другой поток (читатель) обязательно должен понимать, // что сначала должно быть записано значение в ячейку, // а уже потом эта ячейка станет для него видима, // то есть, она станет подключенной к очереди void enqueue(T v) { node* n = new node; n->next = nullptr; n->value = v; if(load_consume(&head) != nullptr) { // на всякий случай store_release(&head->next, n); } // тут подключается к очереди head = n; } // удаляется все считанное, кроме пустого начального звена, // который создаётся конструктором bool dequeue(T& v) { if(load_consume(&tail->next) != nullptr) { v = tail->next - > value; node * next = tail->next->next; delete tail->next; store_release(&tail->next, next); return true; } return false; } 

The writer uses only the head , the reader only the tail . To be honest, did not even check how it works. Just read articles from scrutator, and indeed, there was no time.

Well, finally, the long-awaited question: probably, in that code from the Intel site there are still some flaws or something like that. This is not obvious to me. I would like to know what else can be there, so that I can take this into account. Or, please, link to the sample code without blots and with the implementation of barriers and without depending on the platform. Or here. It is desirable that fits also under x64. Thank.

  • one
    Like for example (since c ++) gist - Vladimir Gamalyan
  • one
  • I think, after all, boost, locks and "many people read, many people write" are redundant for me, but thanks anyway. - brenoritvrezorkre

2 answers 2

Are you sure you do not want to use a trivial pipe as a queue ?

See, a simple program based on the producer / consumer

 // это поток-потребитель void * consumer_thread (void *arg) { struct producer_consumer_queue *q = (typeof(q))arg; long long sum = 0, nc = 0, ns = 0; for (;;) { void *data = dequeue(q); // это сигнал, что очередь окончена if (!data) break; // значит, пора закрывать поток char *str = (char *)data; // тут наша обработка данных ns++; // puts(str); int i; for (i = 0; str[i]; i++) sum += str[i]; nc += (i - 1); free(str); } printf("consumed %lld strings %lld chars (%lld)\n", ns, nc, sum); return 0; } int main () { pthread_t consumer_threads[2]; void *res = 0; char *in = NULL; // создадим очередь: struct producer_consumer_queue *q = producer_consumer_queue_create(); // и потоки-«потребители» // pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q); pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q); // главный цикл int i, n = 100000; for (i = 0; i < n; i++) { int l = (rand() % 100 + 1) * 50; in = (char *)malloc(l--); in[l--] = 0; while (l >= 0) in[l--] = rand() % ('~' - ' ') + ' '; enqueue(in, q); in = NULL; } producer_consumer_queue_stop(q); if (pthread_join(consumer_threads[0], &res)) perror("join"); return (long)res; } 

creates and fills lines in one stream, counts the number of bytes, summarizes them and frees the memory in another. String addresses are passed through a queue on mutexes.

It runs in 5.5sec.

The same with the implementation of the pipe queue

 struct pipe { int p[2]; }; void enqueue (void *data, struct pipe *p) { if (write(p->p[1], &data, sizeof(data)) != sizeof(data)) close(p->p[1]); } void * dequeue(struct pipe *p) { void *data; if (read(p->p[0], &data, sizeof(data)) != sizeof(data)) data = 0; return data; } struct pipe * producer_consumer_queue_create() { struct pipe *p = (__typeof__(p))malloc(sizeof(*p)); if (pipe(p->p)) p = 0; return p; } void producer_consumer_queue_stop(struct pipe *q) { void *d = 0; if (write(q->p[1], &d, sizeof(d)) == sizeof(d)) close(q->p[1]); } // это поток-потребитель void * consumer_thread (void *arg) { struct pipe *q = (typeof(q))arg; long long sum = 0, nc = 0, ns = 0; for (;;) { void *data = dequeue(q); // это сигнал, что очередь окончена if (!data) break; // значит, пора закрывать поток char *str = (char *)data; // тут наша обработка данных ns++; // puts(str); int i; for (i = 0; str[i]; i++) sum += str[i]; nc += (i - 1); free(str); } close(q->p[0]); printf("consumed %lld strings %lld chars (%lld)\n", ns, nc, sum); return 0; } int main () { pthread_t consumer_threads[2]; void *res = 0; char *in = NULL; // создадим очередь: struct pipe *q = producer_consumer_queue_create(); // и потоки-«потребители» // pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q); pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q); // главный цикл int i, n = 100000; for (i = 0; i < n; i++) { int l = (rand() % 100 + 1) * 50; in = (char *)malloc(l--); in[l--] = 0; while (l >= 0) in[l--] = rand() % ('~' - ' ') + ' '; enqueue(in, q); in = NULL; } producer_consumer_queue_stop(q); if (pthread_join(consumer_threads[0], &res)) perror("join"); return (long)res; } 

already 5.3sec (at least not worse, but much easier).

If you are not afraid of an additional level of buffering ( FILE * ), then with minor modifications, like

 struct pipe { FILE *p[2]; }; struct pipe * producer_consumer_queue_create() { struct pipe *p = (__typeof__(p))malloc(sizeof(*p)); int pp[2]; if (pipe(pp)) p = 0; else { p->p[0] = fdopen(pp[0], "r"); p->p[1] = fdopen(pp[1], "w"); } return p; } 

the execution time of the same work is reduced to 4.8sec . Yes, reactivity in such a system is lower.

For comparison, a single-threaded version with all malloc / free

 int main () { char *in = NULL; long long sum = 0, nc = 0, ns = 0; // главный цикл int i, n = 100000; for (i = 0; i < n; i++) { int l = (rand() % 100 + 1) * 50; in = (char *)malloc(l--); in[l--] = 0; while (l >= 0) in[l--] = rand() % ('~' - ' ') + ' '; // enqueue(in, q); ns++; // puts(str); int ii; for (ii = 0; in[ii]; ii++) sum += in[ii]; nc += (ii - 1); free(in); in = NULL; } printf("consumed %lld strings %lld chars (%lld)\n", ns, nc, sum); return 0; } 

will work for 2.7sec.

PS
What thoughts all this suggests?

    I want to warn you, I didn’t understand the code completely - it takes time to analyze multithreaded data structures, and even more for non-blocking ones. However, I will answer for your question: the alloc_node() function is needed in this code, since in it, in addition to improving performance by reusing previously allocated nodes, it also assigns the first_ , a pointer, which, as I understand it, is needed to free the freed nodes.

    Indeed, the variable tail_copy_ the class level is not needed, it could be made local - the code would only benefit from this, in general, I don’t like the code. He is sloppy and has a lot of questions to him. Nevertheless, I have repeatedly come across the texts of Dmitry Vyukov on various resources, and personally I have no doubt in his competence, in the part of writing multi-stream data structures and algorithms. Of course, this does not mean that he writes beautiful C ++ code; at least 7 years ago, he wrote a very ugly code.

    As for your code: as I said, it’s quite difficult to analyze such a code and it’s unlikely that anyone will do this - I definitely won’t begin, so I’ll have to stuff my own bumps here. Believe me, the correctness of the algorithm can take hours, or even days, for people who are professionally involved in this. As far as I know, there are no such professionals on this resource. There are no them in the English part, at least they are not there all the time and it is difficult to get on them.