I run multi-threaded file parsing through the Executor. When trying to execute, an error occurs when trying to call a service from a multi-threaded task. In my opinion, the error occurs because the service starts from a multi-threaded environment. Is this true and how can this be fixed?
Task: Parse 10 files in three streams and write each batch to the database. Breaks when trying to call a service. (Simplified in the example.)
@Component public class MyExecutor {private final Logger logger = Logger.getLogger (this.getClass ());
//--> Number of parsing processes private static final int NTHREDS = 3; public void executor() throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(NTHREDS); // --> Parsing Tasks FileList fileList = new FileList(); int fileSize = fileList.getList().size(); for (int i = 1; i <= fileSize; i++) { Runnable worker = new MyRunnable(i); executor.execute(worker); logger.info("Parsing task " + i + " start"); } /** * This will make the executor accept no new threads * and finish all existing threads in the queue * Wait until all threads are finish */ executor.shutdown(); executor.awaitTermination(5000, TimeUnit.NANOSECONDS); logger.info("Finished all threads"); } // class MyRunnable
/** * MyRunnable will count the sum of the number from 1 to the parameter * countUntil and then write the result to the console. * MyRunnable is the task which will be performed */ public class MyRunnable implements Runnable { private final Logger logger = Logger.getLogger(this.getClass()); private final long countUntil; MyRunnable(long countUntil) { this.countUntil = countUntil; } @Override public void run() { FileReader fileReader = new FileReader(); FileList fileList = new FileList(); logger.info("Link " + countUntil + "= " + fileList.getList().get((int) countUntil - 1)); try { fileReader.mainReader(fileList.getList().get((int) countUntil - 1)); } catch (InterruptedException e) { e.printStackTrace(); } } } // Running task
public void mainReader(String list) throws InterruptedException { HashMap<String, Integer> listT = getList(list); System.out.println("SIZE OF LIST" + listT.size()); wordsService.addItemsList2(10); } public synchronized HashMap<String, Integer> getList(String fileName) { HashMap<String, Integer> listOfWords = new HashMap<>(); try (Stream<String> stream = Files.lines(Paths.get(fileName))) { stream.forEach(item -> { if (listOfWords.containsKey(item)) { int value = listOfWords.get(item).intValue(); listOfWords.put(item, value); } else { listOfWords.put(item, 1); } }); // stream.close(); } catch (IOException e) { System.out.println("EX"); e.printStackTrace(); logger.error("Error #3"); } finally { } return listOfWords ; } } // Service that starts
@Component @Transactional public class WordsServiceImpl implements WordsService { private final Logger logger = Logger.getLogger(this.getClass()); @Override public void addItemsList2 (int i) { System.out.println(i+" is OK"); } } // Exception that occurs when trying to call a service
Exception in thread "pool-2-thread-2" java.lang.NullPointerException at io.github.oleiva.services.impl.FileReader.mainReader(FileReader.java:63) at io.github.oleiva.core.MyRunnable.run(MyRunnable.java:32) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
wordsService.addItemsList2(10);? - Maksim