There is such a code.

// Коллекция буферов на запись List<List<int>> writeReadyList = new List<List<int>>(); private void backgroundWorker1_DoWork(object sender, DoWorkEventArgs e) { // Локальный буфер данного потока List<int> localBufer = new List<int>(); Random random = new Random(); while (!backgroundWorker1.CancellationPending) { // Забиваем локальный буфер значениями localBufer.Add(random.Next(0, 100)); if (localBufer.Count > 10) { // Передаем буфер в очередь на запись и обнуляем его writeReadyList.Add(localBufer); localBufer = new List<int>(); } Thread.Sleep(50); } } private void backgroundWorker2_DoWork(object sender, DoWorkEventArgs e) { while (!backgroundWorker2.CancellationPending) { // Коллекция буферов, которые не удалось записать List<List<int>> failList = new List<List<int>>(); foreach (List<int> currentBufer in writeReadyList) { try { // Пишем значения из перебираемого буфера в базу данных } catch { failList.Add(currentBufer); } } writeReadyList = new List<List<int>>(failList); Thread.Sleep(450); } } 

A little explanation on the code - the first stream reads some values, saves them in the local buffer. When a certain number of elements are reached, the local buffer is queued for writing and flushed. The second thread looks at what we have in the write queue and, in fact, writes the values ​​to the database. It is impossible to change the collection in foreach, therefore, in order to exclude everything that was recorded successfully, I form a list of unsuccessfully written buffers.

As far as I understand, with this approach several options for incorrect work are possible.

  1. at the time of work foreach in the second stream it will be changed from the first stream - we get an exception?

  2. the second thread iterated through the queue, but has not yet performed the rewrite according to the failed write attempts. At this point, the first one adds the record, but since the second one does not know anything about it, the record will be lost.

How to organize work with the recording queue between these streams? And it is necessary to organize it so that if the second thread is currently processing the queue, then the first thread does not wait for the end of work with the resource, but continues to add the values ​​into its local buffer. Which way to dig?

______UPD:

On the advice of VladD, I used mutexes.

  Mutex writeReadyAccess = new Mutex(); private void backgroundWorker1_DoWork(object sender, DoWorkEventArgs e) { if (localBufer.Count > 10) // Таймаут в 1 секунду тк нельзя тормозить первый поток на время обработки очереди if (writeReadyAccess.WaitOne(1)) { // Если второй поток не обрабатывает очередь, то добавляем в нее локальный буфер и обнуляем его writeReadyAccess.ReleaseMutex(); } // Если очередь была в обработке на момент запроса, то продолжаем писать в локальный буфер } private void backgroundWorker2_DoWork(object sender, DoWorkEventArgs e) { // Этот поток можно тормозить по времени, поэтому ждем без таймаута if (writeReadyAccess.WaitOne()) { // Обработка очереди, в том числе ее перезапись writeReadyAccess.ReleaseMutex(); } } 
  • 2
    take System.Collections.Concurrent.BlockingCollection. and in one thread write to this collection. and in another thread read GetConsumingEnumerable () - Stack
  • one
    if it uses System.Collections.Concurrent.BlockingCollection, then in the first thread, when there is nothing else to push, then call CompleteAdding () - so that the second thread also stops waiting for the elements. - Stack
  • one
    Possible duplicate question: queue of tasks on WinRT - andreycha
  • Comments are not intended for extended discussion; conversation moved to chat . - Nick Volynkin
  • "used mutexes" - can be made easier by using TPL DataFlow - Stack

1 answer 1

 using System.Collections.Concurrent; using System.Threading; class Dto { public int State; public int Index; } class Test { BlockingCollection<Dto> list = new BlockingCollection<Dto>(); public void Run() { Task.Run(() => backgroundWorker2()); Task.Run(() => backgroundWorker1()); } void backgroundWorker1() { for(int i=0; i < 3; i++) list.Add(new Dto() { Index = i }); } void backgroundWorker2() { foreach (var v in list.GetConsumingEnumerable()) { // сохраняем данные Console.WriteLine("index=" + v.Index + " state=" + v.State); // если сохранить не удалось, то возвращаем в list if (v.Index == 0 && v.State++ < 2) list.Add(v); if(list.Count == 0) break; } Console.WriteLine("completed"); } } var t = new Test(); t.Run(); 

Result

 index=0 state=0 index=1 state=0 index=2 state=0 index=0 state=1 вторая попытка записи index=0 state=2 еще одна completed 

If you need to transfer data in chunks, then define the collection, for example, so BlockingCollection<Dto[]>


UPDATE

Another version in which backgroundWorker1 is slow

 class Dto { public int Index; } class Test { BlockingCollection<Dto> list = new BlockingCollection<Dto>(); public void Run() { Task.Run(() => backgroundWorker2()); Task.Run(() => backgroundWorker1()); } void backgroundWorker1() { for (int i = 0; i < 3; i++) { list.Add(new Dto() { Index = i }); // Thread.Sleep(1000); } list.CompleteAdding(); } void backgroundWorker2() { var retry = new Queue<Dto>(); foreach (var v in list.GetConsumingEnumerable()) { // сохраняем данные Console.WriteLine("index=" + v.Index); // если сохранить не удалось, то ставим в очередь if (v.Index == 0) retry.Enqueue(v); } // повторяем попытку записи while(retry.Count > 0) Console.WriteLine("retry=" + retry.Dequeue().Index); Console.WriteLine("completed"); } } var t = new Test(); t.Run(); 

Result

 index=0 index=1 index=2 retry=0 повторная попытка записи completed 

UPDATE

For pipelined data processing in different streams, a data flow library is used - TPL Dataflow. Description with examples is in MSDN and there is a nuget-package Microsoft TPL Dataflow .

  • one
    (1) CompleteAdding? (2) You still do not synchronize the list . - VladD
  • one
    Well ... ideone.com/WLhzBG // Regarding the synchronization list , I did not convince you, which is a pity. - VladD
  • one
    Well yes. The correctness of a multi-threaded code should not depend on the order in which the scheduler selects the threads. Sleep there just to provoke race condition to go the way where everything is not so smooth. - VladD
  • one
    Well, once in a million the scheduler will delay the execution of one of the threads, so the argument about "slowly" does not roll. Any thread can stop at any time (for example, on a JIT or garbage collection) for as long as necessary. - VladD
  • 2
    Well, tomorrow will be launched on the server. Or inside the loop will add a call to another function. Well, even without all this, the scheduler is not at all obliged to give a quantum to the stream; this is not guaranteed by anything. Fair. It can postpone activation of the stream indefinitely. Infrequently, once a million cases or even less. Believe that person who has written (and debugged) a lot of multi-threaded code in his life. - VladD