There is a task. Training, so that the issues of optimality of the decision fade into the background. There are 3 threads that need to be synchronized. The first thread collects information about files and folders. It works cyclically and recursively. Having found the information, it should stop and let threads 2 and 3 write it to a file and memory. When streams 2 and 3 have finished recording, stream 1 resumes operation and searches for the next file (folder). There should be 3 threads. Writing each found file in a new stream is not an option.
The questions are as follows:
1. Which mechanism is better to use for synchronization of threads: AutoResetEvent (its own for each of the two threads - see the code), ManualResetEvent or Semaphore.
2. How do I fix the code so that the threads start and stop when needed? I would be grateful for an explanation of why this is so, because with the Set (), Reset (), Wait () methods I played for a long time, but without success.
3. How to organize the stopping of secondary flows after the end of the primary work? I equated the null primary stream, but I think this is not the best idea.
4. And I would be grateful for an explanation of how to solve similar problems in normal, non-educational projects.
5. Is it necessary in this example to read information by secondary flows in the critical section? As I assume, the information in the reading process will not change, since the main thread waits for the completion of the secondary ones, and there is no need for a critical section.

Code:

private Queue<Nested3> collection; static int counter=100; object locked; Thread SearcherThread; Thread Handler1Thread; Thread Handler2Thread; static WaitHandle[] events; private class Nested3 { public int Val; public bool IsHandled; public Nested3(int val) { this.Val = val; IsHandled = false; } } public ARETest() { collection = new Queue<Nested3>(); events = new WaitHandle[] { new AutoResetEvent(false), new AutoResetEvent(false) }; SearcherThread = new Thread(Searcher); SearcherThread.Start(); Handler1Thread = new Thread(Handler1); Handler2Thread = new Thread(Handler2); locked = new object(); } private void Searcher() { // ΠžΡ‡Π΅Ρ€Π΅Π΄ΡŒ для Π΄Π²ΡƒΡ… Π·Π°Π΄Π°Ρ‡ Π² Π΄Π²ΡƒΡ… Ρ€Π°Π·Π½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠ°Ρ…. ThreadPool.QueueUserWorkItem(new WaitCallback(Handler1), events[0]); ThreadPool.QueueUserWorkItem(new WaitCallback(Handler2), events[1]); for (int i = 0; i < 3; i++) { Thread.Sleep(2); // имитация поиска Monitor.Enter(locked); // ΠΏΡ€ΠΈΠΌΠ΅Π½Π΅Π½ΠΈΠ΅ класса Monitor для создания критичСской сСкции обусловлСно условиСм Π·Π°Π΄Π°Ρ‡ΠΈ collection.Enqueue(new Nested3(++counter)); // НайдСн ΠΏΠ΅Ρ€Π²Ρ‹ΠΉ ΠΏΠ°ΠΊΠ΅Ρ‚ ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠΈ для записи Π²Ρ‚ΠΎΡ€ΠΈΡ‡Π½Ρ‹ΠΌΠΈ ΠΏΠΎΡ‚ΠΊΠ°ΠΌΠΈ Monitor.Exit(locked); (events[0] as AutoResetEvent).Set(); // запуск Π²Ρ‚ΠΎΡ€ΠΈΡ‡Π½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² (events[1] as AutoResetEvent).Set(); (events[0] as AutoResetEvent).Reset(); // сброс Π² нСсигнальноС состояниС (events[1] as AutoResetEvent).Reset(); // ОТиданиС ΠΏΠΎΠΊΠ° всС Π·Π°Π΄Π°Ρ‡ΠΈ Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ. WaitHandle.WaitAll(events); Monitor.Enter(locked); // ΠžΡ‡ΠΈΡΡ‚ΠΊΠ° ΠΎΡ‡Π΅Ρ€Π΅Π΄ΠΈ ΠΎΡ‚ записанной ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠΈ collection.Dequeue(); Monitor.Exit(locked); if (i<2) Searcher(); // РСкурсивный Π²Ρ‹Π·ΠΎΠ² } SearcherThread = null; // Запуск послСднСго ΠΏΡ€ΠΎΡ…ΠΎΠ΄Π° Ρ†ΠΈΠΊΠ»Π° Π²Ρ‚ΠΎΡ€ΠΈΡ‡Π½Ρ‹ΠΌΠΈ ΠΏΠΎΡ‚ΠΎΠΊΠ°ΠΌΠΈ (events[0] as AutoResetEvent).Set(); (events[1] as AutoResetEvent).Set(); } private void Handler1(object state) { var auto = (AutoResetEvent)state; do { auto.WaitOne(); // здСсь я Ρ…ΠΎΡ‡Ρƒ Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΏΠΎΡ‚ΠΎΠΊΠΈ ΠΆΠ΄Π°Π»ΠΈ сигнала ΠΎΡ‚ ΠΏΠ΅Ρ€Π²ΠΈΡ‡Π½ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ° Thread.Sleep(15); // имитация записи Π² Ρ„Π°ΠΉΠ» auto.Set(); // Π—Π΄Π΅ΡΡŒ Π²Ρ‚ΠΎΡ€ΠΈΡ‡Π½Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ Π΄ΠΎΠ»ΠΆΠ΅Π½ Π΄Π°Ρ‚ΡŒ ΠΏΠ΅Ρ€Π²ΠΈΡ‡Π½ΠΎΠΌΡƒ сигнал, Ρ‡Ρ‚ΠΎ ΠΎΠ½ Π·Π°ΠΊΠΎΠ½Ρ‡ΠΈΠ» ΠΈΡ‚Π΅Ρ€Π°Ρ†ΠΈΡŽ } while (SearcherThread != null); } private void Handler2(Object state) // Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚ Π°Π½Π°Π»ΠΎΠ³ΠΈΡ‡Π½ΠΎ ΠΏΡ€Π΅Π΄Ρ‹Π΄ΡƒΡ‰Π΅ΠΌΡƒ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρƒ, Π½ΠΎ с Π΄Ρ€ΡƒΠ³ΠΎΠΉ Π·Π°Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ { var auto = (AutoResetEvent)state; do { auto.WaitOne(); // здСсь я Ρ…ΠΎΡ‡Ρƒ Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΏΠΎΡ‚ΠΎΠΊΠΈ ΠΆΠ΄Π°Π»ΠΈ сигнала ΠΎΡ‚ ΠΏΠ΅Ρ€Π²ΠΈΡ‡Π½ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ° Thread.Sleep(15); // имитация записи Π² Ρ„Π°ΠΉΠ» auto.Set(); // Π—Π΄Π΅ΡΡŒ Π²Ρ‚ΠΎΡ€ΠΈΡ‡Π½Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ Π΄ΠΎΠ»ΠΆΠ΅Π½ Π΄Π°Ρ‚ΡŒ ΠΏΠ΅Ρ€Π²ΠΈΡ‡Π½ΠΎΠΌΡƒ сигнал, Ρ‡Ρ‚ΠΎ ΠΎΠ½ Π·Π°ΠΊΠΎΠ½Ρ‡ΠΈΠ» ΠΈΡ‚Π΅Ρ€Π°Ρ†ΠΈΡŽ } while (SearcherThread != null); } } 

    2 answers 2

    Do you really need direct work with streams here? To perform several tasks in parallel, you can use the high-level mark Parallel.Invoke .

     static int counter=100; private class Nested3 { public int Val; public bool IsHandled; public Nested3(int val) { this.Val = val; IsHandled = false; } } private void Searcher() { for(int i = 0; i < 3; i++) { Thread.Sleep(2); // имитация поиска Nested3 currentItem = new Nested3(++counter); Parallel.Invoke(() => Handler1(currentItem), () => Handler2(currentItem)); } } private void Handler1(Nested3 currentItem) { Thread.Sleep(15); // имитация записи Π² Ρ„Π°ΠΉΠ» } private void Handler2(Nested3 currentItem) { Thread.Sleep(15); // имитация записи Π² Ρ„Π°ΠΉΠ» } 

    If it is important to use WaitHandle , then you can do the following:

     private AutoResetEvent done = new AutoResetEvent(false); private void Searcher() { for(int i = 0; i < 3; i++) { Thread.Sleep(2); // имитация поиска Nested3 currentItem = new Nested3(++counter); ThreadPool.QueueUserWorkItem(_ => { Handler2(currentItem); done.Set(); }); // ΠΎΠ΄Π½ΠΎ ΠΈΠ· Π·Π°Π΄Π°Π½ΠΈΠΉ выполняСм Π² ΠΏΡƒΠ»Π΅ ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² Handler1(currentItem); // Π° Π΄Ρ€ΡƒΠ³ΠΎΠ΅ выполняСм Π² Ρ‚Π΅ΠΊΡƒΡ‰Π΅ΠΌ ΠΏΠΎΡ‚ΠΎΠΊΠ΅ done.WaitOne(); // ΠΆΠ΄Ρ‘ΠΌ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ задания, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ ΠΎΡ‚Π΄Π°Π»ΠΈ ΠΏΡƒΠ»Ρƒ ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² } } 
    • The use of WaitHandle derivatives is a condition of the problem. What you suggested is well suited for the solution, and most likely I will use it in projects, but the condition was the use of WaitHandle and its derivatives. - foxhound
    • @foxhound Completed the answer. - PetSerAl

    Thanks for the answer. This decision did not occur to me.
    In general, as a result, the Searcher () method looks like this.

     private void Searcher() { for (int i = 0; i < 3; i++) { Thread.Sleep(2); Monitor.Enter(locked); collection.Enqueue(new Nested3(++counter)); // Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΌΠ΅Π½ΠΈΡ‚ΡŒ Π½Π° Π΅Π΄ΠΈΠ½ΠΈΡ‡Π½Ρ‹ΠΉ ΠΎΠ±ΡŠΠ΅ΠΊΡ‚ класса Monitor.Exit(locked); ThreadPool.QueueUserWorkItem(_ => { Handler2(); (events[1] as AutoResetEvent).Set(); }); ThreadPool.QueueUserWorkItem(_ => { Handler1(); (events[0] as AutoResetEvent).Set(); }); WaitHandle.WaitAll(events); Console.WriteLine("ДодТался"); Monitor.Enter(locked); collection.Dequeue(); Monitor.Exit(locked); Console.WriteLine("Поиск ΠΎΡ‚Ρ€Π°Π±ΠΎΡ‚Π°Π½"); Console.WriteLine(); if (counter<105) Searcher(); // рСкурсивыный Π²Ρ‹Π·ΠΎΠ² } }