There is such a piece of code:

BlockingQueue<String> queue = new ArrayBlockingQueue(links.size()); for (String link : links) { queue.add(link); } ExecutorService executorService = Executors.newCachedThreadPool(); int threadsNumber = Integer.valueOf(arguments.get("-n")); for (int i = 0; i < threadsNumber; i++) { executorService.submit(new Consumer(queue, arguments.get("-o"), Integer.valueOf(arguments.get("-l")))); } while (!queue.isEmpty()) { Thread.sleep(1000); } executorService.shutdownNow(); System.out.println("Все файлы были успешно скачены"); 

The last lines mean I'm going to close the thread pool when all the messages in the queue have been read. Everything is fine, but there is a problem at the moment when one of the threads takes the last message from the queue. The condition !queue.isEmpty() stops executing and the program terminates before this message is processed.

How can I make the program complete only after processing all messages in the queue?

Consumer run method

 @Override public void run() { try { while (true) { if (!queue.isEmpty()) { String stringURL = queue.take(); String inputFileName = Paths.get(stringURL).getFileName().toString(); String outputFileName = new File(new File(storageFilesDirectory), inputFileName).toString(); Downloader.download(stringURL, outputFileName, speedLimit); System.out.println(String.format("Файл %s был успешно скачен", inputFileName)); } Thread.sleep(500); } } catch (InterruptedException e) { } catch (FileCouldNotBeDownloaded e) { System.out.println(String.format("Произошла ошибка скачивания файла.")); } } 

I have only two threads in the pool, and the messages in the queue can be unlimited.

  • 3
    executorService.awaitTermination do not consider? - Senior Pomidor
  • I do not understand how he can help me .... I completed the question. - faoxis
  • It is easier to pool each link individually, and awaitTermination , and to limit the number of threads, use fixedThreadPool . - zRrr
  • @zRrr could you give me an example? While it is not very clear how to look like this. - faoxis

3 answers 3

If you need to perform N pre-known tasks in K threads, then it is simpler to use the Executor with a fixed number of threads and feed it with N Runnable performing one task:

 final int NUMBER_OF_WORKERS = 2; List<String> urls = IntStream.range( 0, 20 ).mapToObj( String::valueOf ).collect( Collectors.toList() ); ExecutorService executor = Executors.newFixedThreadPool( NUMBER_OF_WORKERS ); for ( String url : urls ) { executor.submit( () -> { // new SingleUrlDownloadTask( url ) etc... System.out.printf( "Worker [%s] downloading url: %s%n", Thread.currentThread().getName(), url ); try { Thread.sleep( 300 + ThreadLocalRandom.current().nextInt(400) ); } catch (Exception e) {} System.out.printf( "Worker [%s] completed url: %s%n", Thread.currentThread().getName(), url ); }); } System.out.println( "All tasks queued." ); executor.shutdown(); // пул перестает принимать новые задачи, // уже поставленные в очередь задачи будут выполнены рано или поздно executor.awaitTermination( Long.MAX_VALUE, TimeUnit.NANOSECONDS ); System.out.println( "All tasks done."); 
  • Wow primerchik. :) I just want to get a Java Junior . :) I picked it for a long time, but it pumped well. Thank! - faoxis

Normally, thread pools are completed only when the application itself is finished: the application author knows that he has some background processing of tasks, a pool of database listeners, so you can create them once and not worry about them - in the worst case, the threads will simply stand idle a small amount of RAM. If you leave the thread pool itself alive until the application is fully completed, this will not affect the operation of the application itself, but it will be easier to work with it, because you do not have to bear in mind its life cycle.

In your case, you are not looking for the moment when the thread pool completes, but some synchronizer that will tell you that all consumers have completed. The easiest way is to start CountDownLatch, initializing it with the help of the queue size, and removing one for each processing of the entity from the queue - in this case, you can wait for it to finish with the help of it. You can do with lower-level primitives (I have never worked with them, so I can be mistaken):

 public class Consumer implements Runnable { private final BlockingQueue<String> queue; private final Object synchronizer; public Consumer(BlockingQueue<String> queue, Object synchronizer) { this.queue = queue; this.synchronizer = synchronizer; } public void run() { String value; while ((value = queue.poll(0, TimeUnit.Milliseconds)) !== null) { // обработка } // в эту область код попадет только тогда, когда в очереди кончатся элементы synchronizer.notify(); } } // код главного потока for (Consumer consumer : consumers) { executor.submit(consumer); } synchronizer.wait(); 
  • Then it can be even simpler: public static volatile AtomicInteger handledCounter in Consumer . :) - faoxis
  • @faoxis you can’t wait based on AtomicX without a spinup, and this, in general, is not welcome anywhere. And volatil is not needed here, atomik and so atomik. - etki
  • Yes, indeed, volatile extra. I do not understand what prevents your example from occurring as follows: at the end, say, 2 threads remain. One takes the last item, the second frees the wait ( synchronizer.notify() ). Then all the threads are released and the program ends, despite the fact that the first thread is still processing the message from the queue. - faoxis
  • @faoxis yes, it is a fakap. better with CountDownLatch. - etki
  • and where I will fasten CountDownLatch? If I make a static, then how does this approach differ from what I have suggested above. And if not static, how will adjacent objects know about each other? - faoxis

Use java.util.concurrent.ThreadPoolExecutor , it is more flexible to use.

An example for your task:

 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); for (int i = 0; i < 1_000_000; i++) poolExecutor.execute(()->{ //задача выполняемая в другом потоке }); while (poolExecutor.getActiveCount() != 0) LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); poolExecutor.shutdownNow(); 
  • I have k streams that need to download n files. n can be much larger than k. I don't need the handler to die after work. - faoxis
  • @faoxis they will not die until the thread has a job it will be active. And the getActiveCount method will return a nonzero number - Artem Konovalov