I have a search that collects txt files in the ConcurrentLinkedQueue queue, and then in several threads simultaneously checks the contents of these files for compliance with the specified string. In general, a multi-threaded search for a file containing some string. When one of the threads finds the given string, it stops all others. My own file system traversal is performed in a separate thread.
For a thread pool I use ExecutorService .
Here is such a Consumer :
public class ConsumerSearch implements ConsumerGetter<File>, Consumer<File> { private File result = null; @Override public void accept(File result) { this.result = result; } @Override public File getResult() { return result; } } Here is a general class that creates threads:
public class ParallelSearch implements Parallel { private AtomicReference<ConsumerSearch<File>> consumer; // останавливает все потоки (вызывает cancel в цикле по threads). private final Stopper stopper; // лист запущенных потоков. private final List<Future<File>> threads; // размер пула потоков. private final int amountThreads; // очередь в которой храняться файлы. private final Queue<File> container; private final ExecutorService service; public ParallelSearch(final int amountThreads, Consumer<File> consumer) { this.service = Executors.newFixedThreadPool(amountThreads); this.amountThreads = amountThreads; this.container = new LinkedTransferQueue<>(); this.consumer = new AtomicReference<>(consumer); this.threads = new ArrayList<>(amountThreads); this.stopper = new Stopper(); } @Override public AtomicReference<Consumer<File>> getConsumer() { return consumer; } @Override public void killConsumer() { this.consumer.set(null); } @Override public void startParallelSearch( final String pathToRoot, final String targetText) { this.searchByFileSystem(pathToRoot); this.searchByFilesContent(targetText); } // поток обходит файловую систему в поисках txt файлов и добавляет в очередь. private void searchByFileSystem(final String pathToRoot) { final Thread searchByFileSystem = new Thread(new Runnable() { @Override public void run() { new CollectingTXTFiles( pathToRoot, container) .collectTXTFromRoot(); } }); this.service.submit(searchByFileSystem); } // добавляем в пул потоки для поиска строки по содержимому txt файлов private void searchByFilesContent(final String targetText) { for (int i = 0; i < (this.amountThreads - 1); i++) { final Future<File> taskRead = this.service.submit(new Callable<File>() { @Override public File call() throws Exception { return new SearchByFileContent(container, stopper, ParallelSearch.this) .scanQueue(targetText); } }); // добавляем в потоки поиск контенту this.threads.add(taskRead); } } /** * Останавливает все потоки ссылка его экземпляр есть у всех потоков. * Если один из потоков найдет искомую строку, вызовет stopAllSearchingThread(). */ class Stopper { void stopAllSearchingThread() { for (Future future : threads) { future.cancel(true); } } } } Here are the threads themselves:
public class SearchByFileContent implements SearchByFile { private final Queue<File> paths; private final ParallelSearch.Stopper stopper; private final Parallel parallelSearch; public SearchByFileContent(final Queue<File> paths, final ParallelSearch.Stopper stopper, final Parallel parallelSearch) { this.stopper = stopper; this.paths = paths; this.parallelSearch = parallelSearch; } @Override public File scanQueue(final String target) { while (this.paths.size() != 0 && !Thread.currentThread().isInterrupted()) { final File next = this.paths.poll(); final File result = getFileWhichContains(target, next); if (result != null && !result.getName().equals("-1")) { if (parallelSearch.getConsumer() != null) { synchronized (parallelSearch.getConsumer()) { if (this.parallelSearch.getConsumer().get() != null) { this.parallelSearch.getConsumer().get().accept(result); this.parallelSearch.killConsumerReference(); this.stopper.stopAllSearchingThread(); return result; } } } } } return new File("-1"); } private File getFileWhichContains(final String target, final File file) { final StringBuilder sb = new StringBuilder(); if (file == null) return new File("-1"); try (BufferedReader reader = new BufferedReader( new FileReader(file)) ) { String line; while (Objects.nonNull(line = reader.readLine())) { sb.append(line); } if (new String(sb).contains(target)) { return file; } } catch (IOException e) { e.printStackTrace(); } return new File("-1"); } } Call it like this:
@Test public void whenThen() throws ExecutionException, InterruptedException { ConsumerSearch<File> consumer = new ConsumerSearch(); Parallel parallel = new ParallelSearch(5, consumer); parallel.startParallelSearch(dir.getAbsolutePath(), "test"); final File result = consumer.getResult(); Assert.assertThat(result.getName(), is("test2.txt")); } Sometimes it works, sometimes it crashes with NullPointerException in the penultimate line of the final File result = consumer.getResult() test final File result = consumer.getResult() . Although everything seems to be logical and should work. Help fix this problem.
this.searchByFileSystem(pathToRoot); this.searchByFilesContent(targetText);these two strings can run in parallel for you, and as far as I understand it,searchByFilesContentshould start strictly aftersearchByFileSystem. That is, insidesearchByFilesContentyou runthis.service.submit(searchByFileSystem);in theExecutorthis.service.submit(searchByFileSystem);and immediately, without waiting for execution, runthis.searchByFilesContent(targetText);. It seems to be wrong, you need to wait for the end of reading the file list. - iksuy