The producer / consumer pattern is quite common in multithreaded programming. Its meaning is that one or more streams produce data, and in parallel, one or more streams consume it.

How to correctly implement this pattern in popular programming languages? The task itself is not trivial, because it involves synchronization between threads, and a potential race between several producers and consumers.


Reference.

The producing flow (or flows) is called “producer”, “supplier” or simply “producer”, consuming (s) - “consumer” or “consumer”.

The non-triviality of the problem lies in the fact that potentially both the creation of new data and their consumption can take a long time, and I would like the processing to proceed without any downtime, at the maximum possible speed.

Examples:

The generated data may represent a computationally intensive task . In this case, it is reasonable to have a single production thread, and several execution threads (for example, as many as there are processor cores in the system, if the processing bottleneck is computing).

Or, the production streams download data from the network, and at the end of the download, the executing threads parses the downloaded data. In this case, it is reasonable to have one manufacturer per site and, and limit the number of manufacturers, if the limit of the available network speed is reached.


This question is an adaptation of the research of the same name from Hashcode.

  • I wouldn’t even have to tell about the poison pill for completeness, as an alternative to a clear stop. - andreycha
  • @andreycha: Maybe you will write a separate answer? Because I don’t know how to scale the poison pill in case of multiple consumption. (And several producers, by the way, also.) - VladD

4 answers 4

C # implementation

For modern versions of the language (starting with C # 4.0), it makes sense not to write the implementation manually, but (following the advice of @Flammable), use the BlockingCollection class , which represents the necessary functionality.

For reading in consumer streams, we simply use loops in the sequence that GetConsumingEnumerable() gives. In the producer-streams we use Add , and in the end we do not forget CompleteAdding so that the consumer-streams can stop.

Example:

 class Program { static public void Main() { new Program().Run(); } BlockingCollection<string> q = new BlockingCollection<string>(); void Run() { var threads = new [] { new Thread(Consumer), new Thread(Consumer) }; foreach (var t in threads) t.Start(); string s; while ((s = Console.ReadLine()).Length != 0) q.Add(s); q.CompleteAdding(); // останавливаем foreach (var t in threads) t.Join(); } void Consumer() { foreach (var s in q.GetConsumingEnumerable()) { Console.WriteLine("Processing: {0}", s); Thread.Sleep(2000); Console.WriteLine("Processed: {0}", s); } } } 

BlockingCollection<T> allows you to limit the number of items , so an attempt to add an item to a crowded queue can also be blocked until the place is cleared.

Notice that GetConsumingEnumerable works correctly even when you have a lot of consumer calculators. It is not so obvious.


If you are working with the old version of C #, you will have to write the necessary functionality manually. You can use the built-in class Monitor (which is analogous to the mutex + condition variable from pthreads).

 public class ProducerConsumer<T> where T : class { object mutex = new object(); Queue<T> queue = new Queue<T>(); bool isDead = false; public void Enqueue(T task) { if (task == null) throw new ArgumentNullException("task"); lock (mutex) { if (isDead) throw new InvalidOperationException("Queue already stopped"); queue.Enqueue(task); Monitor.Pulse(mutex); } } public T Dequeue() { lock (mutex) { while (queue.Count == 0 && !isDead) Monitor.Wait(mutex); if (queue.Count == 0) return null; return queue.Dequeue(); } } public void Stop() { lock (mutex) { isDead = true; Monitor.PulseAll(mutex); } } } 

Usage (similar example):

 class Program { static public void Main() { new Program().Run(); } ProducerConsumer<string> q = new ProducerConsumer<string>(); void Run() { var threads = new [] { new Thread(Consumer), new Thread(Consumer) }; foreach (var t in threads) t.Start(); string s; while ((s = Console.ReadLine()).Length != 0) q.Enqueue(s); q.Stop(); foreach (var t in threads) t.Join(); } void Consumer() { while (true) { string s = q.Dequeue(); if (s == null) break; Console.WriteLine("Processing: {0}", s); Thread.Sleep(2000); Console.WriteLine("Processed: {0}", s); } } } 
  • one
    IMHO, the mention of BlockingCollection would be worth putting up to the self-written version - PashaPash
  • one
    @PashaPash: Reasonable. So do. - VladD
  • one
    "You can use the built-in Monitor class (which is an analogue of the condition variable from pthreads)." - The wording is not entirely accurate. The monitor is an analogue of the mutex + condition variable bundle. - Pavel Mayorov
  • And it would be good to make ProducerConsumer <T> implement IDisposable. - Pavel Mayorov
  • @PavelMayorov: Hmm. And what should Dispose do in this case? - VladD

C implementation with pthreads library

In C, in accordance with the spirit of the language, there are no built-in high-level synchronized collections. Probably the most popular and widely used library that implements multithreading is pthreads. With its help, the pattern can be implemented as follows:

 #include <pthread.h> // объявляем структуру данных для одного задания struct producer_consumer_queue_item { struct producer_consumer_queue_item *next; // здесь идут собственно данные. вы можете поменять этот кусок, // использовав структуру, более специфичную для вашей задачи void *data; }; // объявляем очередь с дополнительными структурами для синхронизации. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания. struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ int is_alive; // показывает, не закончила ли очередь свою работу }; 

Now we need procedures for adding and retrieving jobs from the queue.

 void enqueue (void *data, struct producer_consumer_queue *q) { // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&q->lock); // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } q->tail = p; // разрешаем доступ всем снова pthread_mutex_unlock(&q->lock); } void * dequeue(struct producer_consumer_queue *q) { // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&q->lock); while (!q->head && q->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&q->cond, &q->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = q->head; if (p) { // и удаляем его из очереди q->head = q->head->next; if (!q->head) q->tail = q->head; } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&q->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет free(p); return data; } 

Still need a procedure to initialize the queue:

 struct producer_consumer_queue * producer_consumer_queue_create() { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->is_alive = 1; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->cond, 0); return q; } 

And the procedure for closing the queue:

 void producer_consumer_queue_stop(struct producer_consumer_queue *q) { // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&q->lock); q->is_alive = 0; pthread_cond_broadcast(&q->cond); pthread_mutex_unlock(&q->lock); } 

Great, we have everything we need.

How to use it? Need to:

  • run several streams- "producers" and several "consumers"
  • come up with a data structure for the job

Example: (producer - main flow, consumers - 2 flows)

 // это поток-потребитель void * consumer_thread (void *arg) { struct producer_consumer_queue *q = (typeof(q))arg; for (;;) { void *data = dequeue(q); // это сигнал, что очередь окончена if (!data) break; // значит, пора закрывать поток char *str = (char *)data; // тут наша обработка данных printf ("consuming: %s\n", str); sleep(2); printf ("consumed: %s\n", str); free(str); } return 0; } int main () { pthread_t consumer_threads[2]; void *res = 0; char *in = NULL; size_t sz; // создадим очередь: struct producer_consumer_queue *q = producer_consumer_queue_create(); // и потоки-«потребители» pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q); pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q); // главный цикл // получаем данные с клавиатуры: while (getline(&in, &sz, stdin) > 0) { enqueue(in, q); in = NULL; } producer_consumer_queue_stop(q); if (pthread_join(consumer_threads[0], &res) || pthread_join(consumer_threads[1], &res)) perror("join"); return (long)res; } 

This is an implementation of a task with an "infinite" queue. In practice, it is sometimes (or almost always?) More useful to limit the size of the queue and thus balance the speed of manufacturers, sometimes putting them into a sleeping state, with the capabilities of consumers.

To do this, slightly change our producer_consumer_queue

 struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t condp; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ pthread_cond_t condc; // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО int is_alive; // показывает, не закончила ли очередь свою работу int max, cnt, // максимальный размер очереди и число заданий в ней pqcnt; // количество производителей, ждущих свободного места в очереди }; 

pthread_cond_t condc for "falling asleep / waking up" the streams of manufacturers, their counter in the queue for sending a message and a couple of variables containing the maximum queue size and the current number of jobs in it.

Accordingly, the functions for setting the job to the queue ( enqueue ), retrieving it from the queue ( dequeue ), initializing the queue ( producer_consumer_queue_create ) and stopping it ( producer_consumer_queue_stop ) change:

 void enqueue (void *data, struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&aq->lock); // проверим не переполнена ли она if (q->max <= q->cnt) { q->pqcnt++; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти // будем ждать пока потребители ее слегка не опустошат while(q->max <= q->cnt & q->is_alive) pthread_cond_wait(&aq->condc, &aq->lock); q->pqcnt--; asm volatile ("" : : : "memory"); } // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&aq->condp); } q->tail = p; q->cnt++; asm volatile ("" : : : "memory"); // разрешаем доступ всем снова pthread_mutex_unlock(&aq->lock); } void * dequeue(struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&aq->lock); if (q->pqcnt && q->max > q->cnt) // в очереди есть место, а кто-то спит, разбудим их pthread_cond_broadcast(&aq->condc); while (!q->head && q->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&aq->condp, &aq->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = q->head; if (p) { // и удаляем его из очереди q->head = q->head->next; if (!q->head) q->tail = q->head; q->cnt--; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти // разбудим поставщиков в их очереди pthread_cond_broadcast(&aq->condc); } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&aq->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет free(p); return data; } struct producer_consumer_queue * producer_consumer_queue_create(int max) { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->is_alive = 1; q->max = max; q->cnt = 0; q->pqcnt = 0; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->condc, 0); pthread_cond_init(&q->condp, 0); return q; } // И процедура для закрытия очереди: void producer_consumer_queue_stop(struct producer_consumer_queue *aq) { volatile struct producer_consumer_queue *q = aq; // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&aq->lock); q->is_alive = 0; asm volatile ("" : : : "memory"); pthread_cond_broadcast(&aq->condc); pthread_cond_broadcast(&aq->condp); pthread_mutex_unlock(&aq->lock); } 

It also shows the memory barrier ( asm volatile ("" : : : "memory"); ), which prevents the compiler from reordering read-write operations from RAM.

This implementation does not ensure the "orderliness" of manufacturers waiting for their turn to send a message. Those. the flow producer, "falling asleep" first because of the lack of free space in the queue does not necessarily wake up first.

If this behavior does not suit us, then we will have to make some changes to our data, first of all by adding a queue of suppliers from the producer_queue_item structures (which will be part of the producer_consumer_queue structure.

We get the following data structures:

 // объявляем структуру данных для одного задания struct producer_consumer_queue_item { struct producer_consumer_queue_item *next; // здесь идут собственно данные. вы можете поменять этот кусок, // использовав структуру, более специфичную для вашей задачи void *data; }; // струкура данных для спящего (ждущего свободного места) потока-производителя struct producer_queue_item { struct producer_queue_item *next; struct producer_consumer_queue_item *item; // данные для которых нет места pthread_cond_t cond; // этот cond "сигналим", когда в очереди появилось место #if DEBUG pid_t tid; // linux thread id for debug print int signaled; // индикатор "побудки" for debug print #endif }; // объявляем очередь данных с дополнительными структурами для синхронизации. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания. struct producer_consumer_queue { struct producer_consumer_queue_item *head, *tail; // head == tail == 0, если очередь пуста pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ int is_alive; // показывает, не закончила ли очередь свою работу int max, cnt; // максимальный размер очереди и число заданий в ней // очередь потоков-производителей, ждущих свободного места для своих данных struct producer_queue_item *pqhead, *pqtail; }; 

and implementation of the main functions:

 void enqueue (void *data, struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // упакуем задание в новую структуру struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p)); p->data = data; p->next = 0; // получим "эксклюзивный" доступ к очереди заданий pthread_mutex_lock(&q->lock); #if DEBUG printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data)); #endif // ... и добавим новое задание туда: if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать #if DEBUG if (vq->cnt < vq->max) { puts("========================"); print_queue(q, 0); puts("========================"); } #endif struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq)); pthread_cond_init(&pq->cond, 0); // cond по которому его разбудят pq->next = 0; pq->item = p; // сохраним данные на время сна #if DEBUG pq->tid = gettid(); #endif // поместим себя в очередь спящих производителей if (vq->pqtail) vq->pqtail->next = pq; else vq->pqhead = pq; vq->pqtail = pq; asm volatile ("" : : : "memory"); // зафиксируем изменения очереди в памяти #if DEBUG int at = 0; // счетчик циклов пробуждения #endif do { // пойдем спать до появления свободного места в очереди данных #if DEBUG printf ("%ld prod cond wait (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at++, (char *)(p->data)); pq->signaled = 0; #endif pthread_cond_wait(&pq->cond, &q->lock); } while(vq->max <= vq->cnt && vq->is_alive); // проснулись и владеем очередью /* Вот тонкий момент. Порядок активизации потоков не определен, а нам надо соблюдать очередность данных. Поэтому переустановим локальные переменные из очереди, хотя это могут быть данные, положенные туда другим потоком. */ #if DEBUG if (pq != vq->pqhead) { printf ("BAAAD %ld (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at, (char *)(p->data)); print_queue(q, 0); if (vq->is_alive) exit(1); // совсем плохо, такого быть не должно else puts("CONTINUE"); } #endif pq = vq->pqhead; // в любом случае берем голову очереди производителей if ((vq->pqhead = pq->next) == 0) // и удаляем ее vq->pqtail = 0; asm volatile ("" : : : "memory"); p = pq->item; free(pq); #if DEBUG printf ("%ld prod enqueued after wait (cnt: %d at: %d) %s", (long)gettid(), vq->cnt, at, (char *)(p->data)); #endif } // вот тут реально кладем data в очередь для потребителей if (vq->tail) vq->tail->next = p; else { vq->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } vq->tail = p; vq->cnt++; asm volatile ("" : : : "memory"); // сбросим изменения очереди в память // разрешаем доступ всем снова pthread_mutex_unlock(&q->lock); } #if DEBUG #define cond_signal_producer(q) ({ \ if ((q)->pqhead) { \ (q)->pqhead->signaled = 1; \ pthread_cond_signal(&(q)->pqhead->cond); \ } \ }) #else #define cond_signal_producer(q) ({ \ if ((q)->pqhead) \ pthread_cond_signal(&(q)->pqhead->cond); \ }) #endif void * dequeue(struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // получаем эксклюзивный доступ к очереди: pthread_mutex_lock(&q->lock); // если есть спящие производители, то разбудим первого cond_signal_producer(vq); while (!vq->head && vq->is_alive) { // очередь пуста, делать нечего, ждем... pthread_cond_wait(&q->cond, &q->lock); // wait разрешает доступ другим на время ожидания } // запоминаем текущий элемент или 0, если очередь умерла struct producer_consumer_queue_item *p = vq->head; if (p) { // и удаляем его из очереди vq->head = vq->head->next; if (!vq->head) vq->tail = vq->head; vq->cnt--; asm volatile ("" : : : "memory"); // сбросим изменения очереди в память // разбудим первого поставщика в их очереди cond_signal_producer(vq); } // возвращаем эксклюзивный доступ другим участникам pthread_mutex_unlock(&q->lock); // отдаём данные void *data = p ? p->data : 0; // 0 означает, что данных больше не будет // согласно 7.20.3.2/2, можно не проверять на 0 free(p); return data; } struct producer_consumer_queue * producer_consumer_queue_create(int max) { struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q)); q->head = q->tail = 0; q->pqhead = q->pqtail = 0; q->is_alive = 1; q->max = max; q->cnt = 0; pthread_mutex_init(&q->lock, 0); pthread_cond_init(&q->cond, 0); return q; } // И процедура для закрытия очереди: void producer_consumer_queue_stop(struct producer_consumer_queue *q) { volatile struct producer_consumer_queue *vq = q; // для обращения к разделяемым переменным необходим эксклюзивный доступ pthread_mutex_lock(&q->lock); vq->is_alive = 0; pthread_cond_broadcast(&q->cond); // разбудим потребителей volatile struct producer_queue_item *pq; for (pq = vq->pqhead; pq; pq = pq->next) { #if DEBUG pq->signaled = 1; asm volatile ("" : : : "memory"); #endif // будим каждого ждущего производителя pthread_cond_signal((pthread_cond_t *)&pq->cond); } pthread_mutex_unlock(&q->lock); } 

All three programs (pq1.c, pq2.c and pq3.c) along with the gettid() function are located at http://pastebin.com/E23r9DZk . For experiments, copy them into different files and compile, for example, gcc pq3.c -pthread gettid.o

    C # implementation, Dataflow library

    Another alternative is to use the Microsoft Dataflow Library, which is actually created in order to manage data flows. To use the code in the examples, you need to include the Microsoft.Tpl.Dataflow NuGet package. The BufferBlock<T> class is practically a ready-made producer / consumer, but, unlike the BlockingCollection<T> blocking interface, it has an async interface!

    To asynchronously add a job, the provider can use SendAsync . Asynchronous addition is necessary, since the queue may be of limited length, which means that the addition must wait until there is free space! At the end of the add you need to call Complete .

     async Task ProduceSingle(ITargetBlock<string> queue, int howmuch) { Random r = new Random(); while (howmuch-- > 0) { // эмулируем длительную работу по подготовке следующего задания // длительность выбираем случайно, чтобы задания приходили в // непредсказуемые моменты времени await Task.Delay(1000 * r.Next(1, 3)); var v = string.Format("automatic {0}", r.Next(1, 10)); await queue.SendAsync(v); } queue.Complete(); } 

    If you have several suppliers, you only need to close the queue when they all work out:

     async Task Produce1(ITargetBlock<string> queue, int howmuch) { Random r = new Random(); while (howmuch-- > 0) { await Task.Delay(1000 * r.Next(1, 3)); var v = string.Format("automatic {0}", r.Next(1, 10)); await queue.SendAsync(v); } } // функция Console.ReadLine() -- блокирующая, поэтому выполняем её асинхронно // (иначе она заблокирует вызывающий поток) // у Console нет async-интерфейса. Task<string> ReadConsole() { // блокирующую функцию выгружаем в thread pool return Task.Run(() => Console.ReadLine()); } async Task Produce2(ITargetBlock<string> queue) { string s; while ((s = await ReadConsole()).Length != 0) await queue.SendAsync("manual " + s); } async Task ProduceAll(ITargetBlock<string> queue) { var p1 = Produce1(queue, 20); var p2 = Produce2(queue); await Task.WhenAll(p1, p2); queue.Complete(); } 

    Now, the consumer. If there is only one consumer, everything is simple:

     async Task ConsumeSingle(ISourceBlock<string> queue) { while (await queue.OutputAvailableAsync()) Console.WriteLine(await queue.ReceiveAsync()); } 

    For the case of multiple consumers, use ReceiveAsync is incorrect, since the task may be taken from the queue by another consumer ! TryReceiveAsync also no TryReceiveAsync function, so after asynchronous finding that the queue is not empty, we use TryReceive :

     async Task ConsumeCooperative(IReceivableSourceBlock<string> queue, int number) { Random r = new Random(); while (await queue.OutputAvailableAsync()) { string v; // в этой точке данные могут быть уже уйти другому потребителю if (!queue.TryReceive(out v)) continue; // продолжаем ждать // цветной вывод и прочие плюшки // мне лень синхронизировать вывод на консоль, хотя конечно это разделяемый ресурс if (Console.CursorLeft != 0) Console.WriteLine(); var savedColor = Console.ForegroundColor; Console.ForegroundColor = (ConsoleColor)(number + 8); Console.WriteLine(string.Format("{0}[{1}]: {2}", new string(' ', number * 4), number, v)); Console.ForegroundColor = savedColor; // симулируем длительную обработку результата клиентом await Task.Delay(1000 * r.Next(1, 3)); } } Task ConsumeAll(IReceivableSourceBlock<string> queue) { var c1 = ConsumeCooperative(queue, 1); var c2 = ConsumeCooperative(queue, 2); return Task.WhenAll(c1, c2); } 

    Remained harness:

     class Program { static void Main(string[] args) { new Program().RunAll().Wait(); } async Task RunAll() { BufferBlock<string> queue = new BufferBlock<string>(); var p = ProduceAll(queue); var c = ConsumeAll(queue); await Task.WhenAll(p, c, queue.Completion); } // остальные методы } 

    In his article Async Producer / Consumer Queue using Dataflow, Stephen Cleary offers a different approach, more in the spirit of the Dataflow library. It incorporates symmetry between source blocks ( ISourceBlock<T> ), receiver blocks ( ITargetBlock<T> ), and converter blocks ( IPropagatorBlock<TInput, TOutput> ). В соответствии с этой идеологией, мы применяем для поставщика блок-приёмник ActionBlock<T> :

     Task Consume2(ISourceBlock<string> queue, int number) { var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }; var consumer = new ActionBlock<string>(v => ConsumeImpl2(v, number), consumerOptions); var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; queue.LinkTo(consumer, linkOptions); return consumer.Completion; } void ConsumeImpl2(string v, int number) { Console.WriteLine(string.Format("[{0}]: {1}", number, v)); Thread.Sleep(1500); } Task ConsumeAll2(ISourceBlock<string> queue) { var c1 = Consume2(queue, 1); var c2 = Consume2(queue, 2); return Task.WhenAll(c1, c2); } 

    Для чего нужно BoundedCapacity = 1 ? Дел в том, что по умолчанию ActionBlock<T> имеет «неограниченную» ёмкость, и таким образом за раз потребит все данные из очереди. Таким образом, если мы ввели ограничение на объём очереди

     queue = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 20 }); 

    то данные будут всё равно накапливаться в ActionBlock 'е. Чтобы роль хранилища исполнялась BufferBlock 'ом, а потребителя — ActionBlock , и нужно ограничить его объём. Заметьте также, что ограничение ёмкости ActionBlock 'а позволяет библиотеке Dataflow балансировать нагрузку, отправляя данные свободному блоку.

    Заметьте, что в этом случае то, в каком контексте (пул потоков? выделенный поток?) исполняется ActionBlock , управляется не через стандартный механизм async/await, а посредством TaskScheduler 'а в настройках .

    Подход со стыковкой блоков выполнения может быстро стать слишком тяжеловесным, поэтому я советовал бы использовать его лишь для достаточно сложных задач.

    • В цикле, основанном на await queue.OutputAvailableAsync() + TryReceive в случае малого потока данных от поставщиков все потребители будут пробуждаться одновременно - что негативно скажется на времени. По мне - так лучше использовать await ReceiveAsync и перехватывать исключение.Pavel Mayorov
    • @PavelMayorov: Надеюсь, что в C# доставят таки аналог Go channels с select'ом ( обговаривалось здесь ), тогда такие рассмотрения переместятся в плоскость ответственности разработчиков BCL.VladD
    • Там больше про паттерн-матчинг в операторе select писалось. Все-таки Go channels слишком высокоуровневые, чтобы вводить оператор для них на уровне языка.Pavel Mayorov
    • @PavelMayorov: Практически, Go channel — это и есть producer-consumer-очередь с async-интерфейсом. Довольно хорошо ложится на семантику .NET, как мне кажется.VladD
    • Вот только я, к примеру, за последний год ни разу не писал "чистый" producer-consumer, всюду были тонкости, которые go channels не покроют.Pavel Mayorov

    Реализация на C#, библиотека Dataflow ("инверсный" алгоритм)

    Отличие данного алгоритма от приведенного VladD "прямого" - в том, что используется очередь потребителей вместо очереди элементов.

    Это позволяет избавиться от цикла приема в потребителе - ценой появления знания списка потребителей поставщиком данных. Иными словами, в такой конфигурации потребители выходят не активными - а пассивными.

    Кроме того, сам алгоритм получается очень простым.

    Потребитель в таком алгоритме не имеет никакого алгоритма, это лишь класс (или интерфейс либо вовсе делегат), в котором можно вызвать метод:

     class Consumer { public void Consume(string str) { Console.WriteLine(str); } } 

    На стороне поставщика алгоритм немного сложнее:

     BufferBlock<Consumer> consumers = new BufferBlock<Consumer>(); public async void SendToConsumer(string str) { var consumer = await consumers.ReceiveAsync(); try { consumer.Consume(str); } finally { consumers.Post(consumer); } } async Task Produce(int howmuch) { Random r = new Random(); while (howmuch-- > 0) { // эмулируем длительную работу по подготовке следующего задания // длительность выбираем случайно, чтобы задания приходили в // непредсказуемые моменты времени await Task.Delay(1000 * r.Next(1, 3)); var v = string.Format("automatic {0}", r.Next(1, 10)); SendToConsumer(v); } } 

    К сожалению, это еще не все. Дело в том, что в таком виде любое исключение, возникшее в методе Consume, обрушит программу.

    Поэтому надо сделать одну из двух вещей:

    • добавить в метод контракт класса Consumer требование обязательной обработки исключений;

    • или же установить глобальный обработчик неперехваченных исключений.

    А еще лучше - воспользоваться сразу обоими вариантами.

    И еще один важный момент. Пассивность потребителя означает, в частности, что потребитель будет запущен в контексте синхронизации поставщика. Иногда это может быть нежелательным - в таком случае в код SendToConsumer следует добавить вызов Task.Run .

    Иногда же, напротив, производителю нужно дождаться полной обработки порции данных. В таком случае надо сменить возвращаемое значение у метода SendToConsumer с void на Task .

    • one
      Я не знаю шарп, поэтому не судите строго. Правильно ли я понял, что реальные потоки ОС (в которых выполняется функция Consume() ) будут создаваться вызовом SendToConsumer() , а их количеством (и завершением "лишних") будет управлять среда исполннения (интерпретатор(?)) шарпа?avp
    • @avp нет. Точнее, зависит от контекста синхронизации. При вызове из потока UI потребители будут в этом потоке и запускаться. При вызове изх потока без контекста синхронизации потребители будут запускаться в потоках пула.Pavel Mayorov
    • @avp спасибо за замечание, кстати.Pavel Mayorov
    • one
      @avp здесь - потоки вообще не создаются и не завершаются. await можно представить как подписку на событие завершения задачи.Pavel Mayorov
    • one
      Да, прибавится проблем с неработающими программами.avp