I have a search that collects txt files in the ConcurrentLinkedQueue queue, and then in several threads simultaneously checks the contents of these files for compliance with the specified string. In general, a multi-threaded search for a file containing some string. When one of the threads finds the given string, it stops all others. My own file system traversal is performed in a separate thread.

For a thread pool I use ExecutorService .

Here is such a Consumer :

 public class ConsumerSearch implements ConsumerGetter<File>, Consumer<File> { private File result = null; @Override public void accept(File result) { this.result = result; } @Override public File getResult() { return result; } } 

Here is a general class that creates threads:

 public class ParallelSearch implements Parallel { private AtomicReference<ConsumerSearch<File>> consumer; // останавливает все потоки (вызывает cancel в цикле по threads). private final Stopper stopper; // лист запущенных потоков. private final List<Future<File>> threads; // размер пула потоков. private final int amountThreads; // очередь в которой храняться файлы. private final Queue<File> container; private final ExecutorService service; public ParallelSearch(final int amountThreads, Consumer<File> consumer) { this.service = Executors.newFixedThreadPool(amountThreads); this.amountThreads = amountThreads; this.container = new LinkedTransferQueue<>(); this.consumer = new AtomicReference<>(consumer); this.threads = new ArrayList<>(amountThreads); this.stopper = new Stopper(); } @Override public AtomicReference<Consumer<File>> getConsumer() { return consumer; } @Override public void killConsumer() { this.consumer.set(null); } @Override public void startParallelSearch( final String pathToRoot, final String targetText) { this.searchByFileSystem(pathToRoot); this.searchByFilesContent(targetText); } // поток обходит файловую систему в поисках txt файлов и добавляет в очередь. private void searchByFileSystem(final String pathToRoot) { final Thread searchByFileSystem = new Thread(new Runnable() { @Override public void run() { new CollectingTXTFiles( pathToRoot, container) .collectTXTFromRoot(); } }); this.service.submit(searchByFileSystem); } // добавляем в пул потоки для поиска строки по содержимому txt файлов private void searchByFilesContent(final String targetText) { for (int i = 0; i < (this.amountThreads - 1); i++) { final Future<File> taskRead = this.service.submit(new Callable<File>() { @Override public File call() throws Exception { return new SearchByFileContent(container, stopper, ParallelSearch.this) .scanQueue(targetText); } }); // добавляем в потоки поиск контенту this.threads.add(taskRead); } } /** * Останавливает все потоки ссылка его экземпляр есть у всех потоков. * Если один из потоков найдет искомую строку, вызовет stopAllSearchingThread(). */ class Stopper { void stopAllSearchingThread() { for (Future future : threads) { future.cancel(true); } } } } 

Here are the threads themselves:

 public class SearchByFileContent implements SearchByFile { private final Queue<File> paths; private final ParallelSearch.Stopper stopper; private final Parallel parallelSearch; public SearchByFileContent(final Queue<File> paths, final ParallelSearch.Stopper stopper, final Parallel parallelSearch) { this.stopper = stopper; this.paths = paths; this.parallelSearch = parallelSearch; } @Override public File scanQueue(final String target) { while (this.paths.size() != 0 && !Thread.currentThread().isInterrupted()) { final File next = this.paths.poll(); final File result = getFileWhichContains(target, next); if (result != null && !result.getName().equals("-1")) { if (parallelSearch.getConsumer() != null) { synchronized (parallelSearch.getConsumer()) { if (this.parallelSearch.getConsumer().get() != null) { this.parallelSearch.getConsumer().get().accept(result); this.parallelSearch.killConsumerReference(); this.stopper.stopAllSearchingThread(); return result; } } } } } return new File("-1"); } private File getFileWhichContains(final String target, final File file) { final StringBuilder sb = new StringBuilder(); if (file == null) return new File("-1"); try (BufferedReader reader = new BufferedReader( new FileReader(file)) ) { String line; while (Objects.nonNull(line = reader.readLine())) { sb.append(line); } if (new String(sb).contains(target)) { return file; } } catch (IOException e) { e.printStackTrace(); } return new File("-1"); } } 

Call it like this:

 @Test public void whenThen() throws ExecutionException, InterruptedException { ConsumerSearch<File> consumer = new ConsumerSearch(); Parallel parallel = new ParallelSearch(5, consumer); parallel.startParallelSearch(dir.getAbsolutePath(), "test"); final File result = consumer.getResult(); Assert.assertThat(result.getName(), is("test2.txt")); } 

Sometimes it works, sometimes it crashes with NullPointerException in the penultimate line of the final File result = consumer.getResult() test final File result = consumer.getResult() . Although everything seems to be logical and should work. Help fix this problem.

  • What line does NPE appear in? - Mikhail Vaysman
  • @Mikhail Vaysman in the test, the last but one line: final File result = consumer.getResult (); - Pavel
  • Update your question according to the guidelines for keeping discussions on Stack Overflow , instead of posting comments. - Mikhail Vaysman
  • this.searchByFileSystem(pathToRoot); this.searchByFilesContent(targetText); these two strings can run in parallel for you, and as far as I understand it, searchByFilesContent should start strictly after searchByFileSystem . That is, inside searchByFilesContent you run this.service.submit(searchByFileSystem); in the Executor this.service.submit(searchByFileSystem); and immediately, without waiting for execution, run this.searchByFilesContent(targetText); . It seems to be wrong, you need to wait for the end of reading the file list. - iksuy
  • @iksuy is not really the idea is that the collection of files in the queue goes parallel scanning files on the result. - Pavel

1 answer 1

Apparently because the file has not yet been found, and you are doing getResult (). When I wrote to you yesterday about Comsumer, I meant that you do the processing in the accept method itself when it is called. This is the asynchronous processing of results, you are trying to work synchronously.

If it is necessary to do a test, it can be modified as follows (a very bad solution, but it will work for the test and will work):

 int sec = 5; //Кол-во секунд которые вы готовы ждать на поиск результата. while (sec != 0){ if (consumer.getResult() != null) break; Thread.currentThread().sleep(1000); sec--; } assertNotNull(consumer.getResult()); 
  • >> processing is done in the accept method itself when it is called << in what sense is processing? What do you mean by treatment? - Pavel
  • one
    I mean, you were looking for a file for something. Here you have found, you have volunteered to accept at Consumer, but right there and do with it what you want according to your business logic. If you still want to get your thread (in which you call this method) by getResult (), wait for the file to be found and return control and results to you, then the implementation will be different and more complicated. But I do not see the point in such a decision - blocking the flow is moveton. - Alexander Martyntsev
  • By business logic, I have to get a File object. But how can I not catch NPE? - Pavel
  • one
    You get it in accept. What is the problem there further work with him? Not using any getResult () - Alexander Martyntsev
  • The only problem is that I need to test that he came. I need to show me with a test that the file was found here, and to hand over the task, that's all. Well, then another bilateral dependencies as it is to remove. And if I didn’t show where it works, how would I say it ... I didn’t decide - Pavel