C implementation with pthreads library
In C, in accordance with the spirit of the language, there are no built-in high-level synchronized collections. Probably the most popular and widely used library that implements multithreading is pthreads. With its help, the pattern can be implemented as follows:
#include <pthread.h> // объявляем структуру данных для одного задания struct producer_consumer_queue_item { struct producer_consumer_queue_item *next; // здесь идут собственно данные. вы можете поменять этот кусок, // использовав структуру, более специфичную для вашей задачи void *data; }; // объявляем очередь с дополнительными структурами для синхронизации. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания. struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ int is_alive; // показывает, не закончила ли очередь свою работу };
Now we need procedures for adding and retrieving jobs from the queue.
void enqueue (void *data, struct producer_consumer_queue *q) { // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&q->lock); // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } q->tail = p; // разрешаем доступ всем снова pthread_mutex_unlock(&q->lock); } void * dequeue(struct producer_consumer_queue *q) { // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&q->lock); while (!q->head && q->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&q->cond, &q->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = q->head; if (p) { // и удаляем его из очереди q->head = q->head->next; if (!q->head) q->tail = q->head; } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&q->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет free(p); return data; }
Still need a procedure to initialize the queue:
struct producer_consumer_queue * producer_consumer_queue_create() { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->is_alive = 1; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->cond, 0); return q; }
And the procedure for closing the queue:
void producer_consumer_queue_stop(struct producer_consumer_queue *q) { // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&q->lock); q->is_alive = 0; pthread_cond_broadcast(&q->cond); pthread_mutex_unlock(&q->lock); }
Great, we have everything we need.
How to use it? Need to:
- run several streams- "producers" and several "consumers"
- come up with a data structure for the job
Example: (producer - main flow, consumers - 2 flows)
// это поток-потребитель void * consumer_thread (void *arg) { struct producer_consumer_queue *q = (typeof(q))arg; for (;;) { void *data = dequeue(q); // это сигнал, что очередь окончена if (!data) break; // значит, пора закрывать поток char *str = (char *)data; // тут наша обработка данных printf ("consuming: %s\n", str); sleep(2); printf ("consumed: %s\n", str); free(str); } return 0; } int main () { pthread_t consumer_threads[2]; void *res = 0; char *in = NULL; size_t sz; // создадим очередь: struct producer_consumer_queue *q = producer_consumer_queue_create(); // и потоки-«потребители» pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q); pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q); // главный цикл // получаем данные с клавиатуры: while (getline(&in, &sz, stdin) > 0) { enqueue(in, q); in = NULL; } producer_consumer_queue_stop(q); if (pthread_join(consumer_threads[0], &res) || pthread_join(consumer_threads[1], &res)) perror("join"); return (long)res; }
This is an implementation of a task with an "infinite" queue. In practice, it is sometimes (or almost always?) More useful to limit the size of the queue and thus balance the speed of manufacturers, sometimes putting them into a sleeping state, with the capabilities of consumers.
To do this, slightly change our producer_consumer_queue
struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t condp; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ pthread_cond_t condc; // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО int is_alive; // показывает, не закончила ли очередь свою работу int max, cnt, // максимальный размер очереди и число заданий в ней pqcnt; // количество производителей, ждущих свободного места в очереди };
pthread_cond_t condc for "falling asleep / waking up" the streams of manufacturers, their counter in the queue for sending a message and a couple of variables containing the maximum queue size and the current number of jobs in it.
Accordingly, the functions for setting the job to the queue ( enqueue ), retrieving it from the queue ( dequeue ), initializing the queue ( producer_consumer_queue_create ) and stopping it ( producer_consumer_queue_stop ) change:
void enqueue (void *data, struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&aq->lock); // проверим не переполнена ли она if (q->max <= q->cnt) { q->pqcnt++; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти // будем ждать пока потребители ее слегка не опустошат while(q->max <= q->cnt & q->is_alive) pthread_cond_wait(&aq->condc, &aq->lock); q->pqcnt--; asm volatile ("" : : : "memory"); } // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&aq->condp); } q->tail = p; q->cnt++; asm volatile ("" : : : "memory"); // разрешаем доступ всем снова pthread_mutex_unlock(&aq->lock); } void * dequeue(struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&aq->lock); if (q->pqcnt && q->max > q->cnt) // в очереди есть место, а кто-то спит, разбудим их pthread_cond_broadcast(&aq->condc); while (!q->head && q->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&aq->condp, &aq->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = q->head; if (p) { // и удаляем его из очереди q->head = q->head->next; if (!q->head) q->tail = q->head; q->cnt--; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти // разбудим поставщиков в их очереди pthread_cond_broadcast(&aq->condc); } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&aq->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет free(p); return data; } struct producer_consumer_queue * producer_consumer_queue_create(int max) { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->is_alive = 1; q->max = max; q->cnt = 0; q->pqcnt = 0; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->condc, 0); pthread_cond_init(&q->condp, 0); return q; } // И процедура для закрытия очереди: void producer_consumer_queue_stop(struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&aq->lock); q->is_alive = 0; asm volatile ("" : : : "memory"); pthread_cond_broadcast(&aq->condc); pthread_cond_broadcast(&aq->condp); pthread_mutex_unlock(&aq->lock); }
It also shows the memory barrier ( asm volatile ("" : : : "memory"); ), which prevents the compiler from reordering read-write operations from RAM.
This implementation does not ensure the "orderliness" of manufacturers waiting for their turn to send a message. Those. the flow producer, "falling asleep" first because of the lack of free space in the queue does not necessarily wake up first.
If this behavior does not suit us, then we will have to make some changes to our data, first of all by adding a queue of suppliers from the producer_queue_item structures (which will be part of the producer_consumer_queue structure.
We get the following data structures:
// объявляем структуру данных для одного задания struct producer_consumer_queue_item { struct producer_consumer_queue_item *next; // здесь идут собственно данные. вы можете поменять этот кусок, // использовав структуру, более специфичную для вашей задачи void *data; }; // струкура данных для спящего (ждущего свободного места) потока-производителя struct producer_queue_item { struct producer_queue_item *next; struct producer_consumer_queue_item *item; // данные для которых нет места pthread_cond_t cond; // этот cond "сигналим", когда в очереди появилось место #if DEBUG pid_t tid; // linux thread id for debug print int signaled; // индикатор "побудки" for debug print #endif }; // объявляем очередь данных с дополнительными структурами для синхронизации. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания. struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ int is_alive; // показывает, не закончила ли очередь свою работу int max, cnt; // максимальный размер очереди и число заданий в ней // очередь потоков-производителей, ждущих свободного места для своих данных struct producer_queue_item *pqhead, *pqtail; };
and implementation of the main functions:
void enqueue (void *data, struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&q->lock); #if DEBUG printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data)); #endif // ... и добавим новое задание туда: if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать #if DEBUG if (vq->cnt < vq->max) { puts("========================"); print_queue(q, 0); puts("========================"); } #endif struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq)); pthread_cond_init(&pq->cond, 0); // cond по которому его разбудят pq->next = 0; pq->item = p; // сохраним данные на время сна #if DEBUG pq->tid = gettid(); #endif // поместим себя в очередь спящих производителей if (vq->pqtail) vq->pqtail->next = pq; else vq->pqhead = pq; vq->pqtail = pq; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти #if DEBUG int at = 0; // счетчик циклов пробуждения #endif do { // пойдем спать до появления свободного места в очереди данных #if DEBUG printf ("%ld prod cond wait (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at++, (char *)(p->data)); pq->signaled = 0; #endif pthread_cond_wait(&pq->cond, &q->lock); } while(vq->max <= vq->cnt && vq->is_alive); // проснулись и владеем очередью /* Вот тонкий момент. Порядок активизации потоков не определен, а нам надо соблюдать очередность данных. Поэтому переустановим локальные переменные из очереди, хотя это могут быть данные, положенные туда другим потоком. */ #if DEBUG if (pq != vq->pqhead) { printf ("BAAAD %ld (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at, (char *)(p->data)); print_queue(q, 0); if (vq->is_alive) exit(1); // совсем плохо, такого быть не должно else puts("CONTINUE"); } #endif pq = vq->pqhead; // в любом случае берем голову очереди производителей if ((vq->pqhead = pq->next) == 0) // и удаляем ее vq->pqtail = 0; asm volatile ("" : : : "memory"); p = pq->item; free(pq); #if DEBUG printf ("%ld prod enqueued after wait (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at, (char *)(p->data)); #endif } // вот тут реально кладем data в очередь для потребителей if (vq->tail) vq->tail->next = p; else { vq->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } vq->tail = p; vq->cnt++; asm volatile ("" : : : "memory"); // сбросим изменения очереди в память // разрешаем доступ всем снова pthread_mutex_unlock(&q->lock); } #if DEBUG #define cond_signal_producer(q) ({ \ if ((q)->pqhead) { \ (q)->pqhead->signaled = 1; \ pthread_cond_signal(&(q)->pqhead->cond); \ } \ }) #else #define cond_signal_producer(q) ({ \ if ((q)->pqhead) \ pthread_cond_signal(&(q)->pqhead->cond); \ }) #endif void * dequeue(struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&q->lock); // если есть спящие производители, то разбудим первого cond_signal_producer(vq); while (!vq->head && vq->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&q->cond, &q->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = vq->head; if (p) { // и удаляем его из очереди vq->head = vq->head->next; if (!vq->head) vq->tail = vq->head; vq->cnt--; asm volatile ("" : : : "memory"); // сбросим изменения очереди в память // разбудим первого поставщика в их очереди cond_signal_producer(vq); } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&q->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет // согласно 7.20.3.2/2, можно не проверять на 0 free(p); return data; } struct producer_consumer_queue * producer_consumer_queue_create(int max) { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->pqhead = q->pqtail = 0; q->is_alive = 1; q->max = max; q->cnt = 0; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->cond, 0); return q; } // И процедура для закрытия очереди: void producer_consumer_queue_stop(struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&q->lock); vq->is_alive = 0; pthread_cond_broadcast(&q->cond); // разбудим потребителей volatile struct producer_queue_item *pq; for (pq = vq->pqhead; pq; pq = pq->next) { #if DEBUG pq->signaled = 1; asm volatile ("" : : : "memory"); #endif // будим каждого ждущего производителя pthread_cond_signal((pthread_cond_t *)&pq->cond); } pthread_mutex_unlock(&q->lock); }
All three programs (pq1.c, pq2.c and pq3.c) along with the gettid() function are located at http://pastebin.com/E23r9DZk . For experiments, copy them into different files and compile, for example, gcc pq3.c -pthread gettid.o