Ladies and gentlemen

I'm looking for a class in C ++ that would implement a thread-safe blocking queue. By blocking, I mean:

  • when we try to read something from an empty queue, the thread blocks until someone else puts something into it;

I don't want to write this bike myself. In extreme cases, you can use mqueue, but I want it in C ++

Thank.

    1 answer 1

    Indeed, mqueue.h is somewhat complicated, perhaps.

    If interested, here is a simple example in C (C ++ is also compiled and works) for Linux (tested in ubuntu). You can simply remake under their data structure. If you place the queue and data in shared memory, you can use it for processes with a common ancestor.

    /* q1ps.c Программка делает очередь (linked list) из строк main кладет fgets() в очередь threads печатают из нее и немного спят */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #ifndef __cplusplus #define __USE_GNU #define __USE_UNIX98 #endif #include <pthread.h> #include <semaphore.h> #include <errno.h> #include <time.h> #ifndef __cplusplus extern pid_t gettid(void); #else extern "C" pid_t gettid(void); #endif #define fatal(msg) do { perror(msg); exit(-1); } while(0) struct qelem { struct qelem *next; char *str; }; struct queue { sem_t sem; pthread_mutex_t lock; struct qelem *head, *tail; }; struct queue * getqueue () { struct queue *q = (struct queue *)calloc(sizeof(*q),1); sem_init(&q->sem,0,0); pthread_mutexattr_t mattr; pthread_mutexattr_init(&mattr); pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK); pthread_mutex_init(&q->lock,&mattr); return q; } void enqueue (struct queue *q, char *str) { struct qelem *e = (struct qelem *)malloc(sizeof(*e)); e->str = strdup(str); e->next = NULL; pthread_mutex_lock(&q->lock); if (q->head == NULL) q->tail = q->head = e; else { q->tail->next = e; q->tail = e; } sem_post(&q->sem); pthread_mutex_unlock(&q->lock); } char * dequeue (struct queue *q) { struct qelem *e; sem_wait(&q->sem); pthread_mutex_lock(&q->lock); e = q->head; q->head = e->next; pthread_mutex_unlock(&q->lock); char *res = e->str; free(e); return res; } void * priq (void *a) { struct queue *q = (struct queue *)a; pid_t tid = gettid(); printf ("Thread %ld ready\n",(long)tid); for (;;) { char *str = dequeue(q); int sec = rand()%5; printf ("Thread %ld get [%s] and sleep %d sec\n", (long)tid,str,sec); free(str); sleep(sec); } } main () { struct queue *q = getqueue(); pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); int i; for (i = 0; i < 3; i++) { pthread_t th; if (pthread_create(&th,&attr,priq,(void *)q)) fatal("pth_create"); } char str[1000]; printf ("Press\n"); while (fgets(str,1000,stdin)) { str[strlen(str)-1] = 0; enqueue(q,str); printf ("Press\n"); } printf ("End\n"); exit (0); } 

    This is the function for the thread ID.

     /* gettid.c avp 2011 there are no gettid() in Linux libc so make it */ #include <sys/types.h> #include <sys/syscall.h> pid_t gettid() { return syscall(SYS_gettid); } 

    g ++ q1ps.c gettid.o -pthread

    If something is unclear - ask.