There is a table of sites There is a table URL - urls

url are processed in some way: with a certain periodicity tasks are set with the desired execution time ( desired_time ) for each url and are executed using ScheduledThreadPool . A thread (let's call it A ), checks the active url in an infinite loop (with the flag url.active == 1 in the database) and, if necessary, sets a new task that will be processed by another thread (let's call such threads B ); or closes an open task, as no B stream managed to pick it up and the desired_time desired_time of its execution has expired.

The assigned tasks are waiting to be completed (when they are picked up by one of the performing threads B ) in the BlockingQueue .

A situation is possible when B processed the task at the moment when the desired_time task was about to desired_time . While flow B will take steps to prepare for the transfer of the result to save to the database (or some post processing), the desired_time will finally expire. But the task has already been completed and cannot be considered overdue. However, thread A at this point can check the task in the database and see that its desired_time expired. But since the result has not yet been recorded in the database - the status of the actually completed task has not yet been changed to DONE, and A can close the task with the status ADDLED , after which B completes the post processing and the result of the task will be saved in the database. The database will display the completed task with the correct result, but with the status ADDLED .

There is a flow C , which performs the role of a certain callback for B. If during the processing of a task in B , an error occurred that does not allow the processing of the task to continue, B passes task C. that it would be closed with the status of ERROR . And in addition, we have three non-synchronized streams.

It is clear that it is necessary to block the task for other threads when:

  • B got the result and started post processing until the result was saved in the database
  • C saves task with ERROR status

There should be exactly as many monitors as the URL is processed. Those. one monitor for all tasks of one URL.

How can such locks be implemented?

  • It may be easier to remove the closure of tasks from A, add the "Completion time" field, and have already figured out when viewing whether the task was on time or not? Then A either puts the task in a queue, or does not, B executes and writes the result, or does not perform and returns the work to C, C writes to ERROR. - zRrr
  • As an option. But this is not a blockage, in this case collisions are theoretically possible - jonua
  • Why not add the task in the database flag "processed". The thread takes the task, then makes the request. UPDATE task SET task.locked=1 WHERE task.id=:id AND task.locked=0 and then looks at how many rows updated the request. if 0 means that someone has already taken the task, we score, if 1 means success, no one will touch the task while we are processing it. Just need to ensure that there are no "hung" tasks. - Sanya_Zol
  • And why not update the status together with the result of the task? Then ADDLED will be replaced with the conditional COMPLETED , and you will eventually receive the data. - andreycha

2 answers 2

I usually use a cache in which I keep objects for synchronization.

 Ehcache idLockingCache = ..... ; private Lock idLockingCacheLock = new ReentrantLock(); public Object getLock(String key) { Element element = idLockingCache.get(key); if (element == null) { idLockingCacheLock.lock(); try { element = idLockingCache.get(key); if (element == null) { element = new Element(key, new Object()); idLockingCache.put(element); } } finally { idLockingCacheLock.unlock(); } } return element.getObjectValue(); } 

Using

 //key в вашем случае это либо id url-а из БД, либо сам URL Object lock = getLock(key); synchronized (lock) { //work } 

And synchronize accordingly in all places.

PS For distributed locks I use Hazelcast IMap.lock (key) / unlock (key)

    I do not understand why we need any kind of blocking here. It is necessary to create an additional data structure java.util.concurrent.ConcurrentHashMap<String,Status> . The key will be the url , and the value will be the status of the task that processes the given url . Now, at the moment when flow A takes the task, he sets it to PROCESSED status and does not touch it anymore, transferring it to flow B Flow B is processing, after the task is completed, it simply saves the result to the database and deletes the task from the map . If an error occurs, flow C proceeds in the same way.

    If you want to strictly regulate tasks in time, i.e. do not save the result, if the task is not completed at the set time, there is the following option. Instead of status, keep a link to the Worker thread that is processing. Now when thread B tries to save the result, it must do a check and if the stream is not interrupted, then we save the result.

    The code will look like this:

      private static class Worker extends Thread { private final AtomicBoolean isAddled = new AtomicBoolean(false); private void saveToDatabase(Result result) { if (!isAddled.get()) { //логика сохранения } workers.remove(result.url); } } 

    and in stream A do the following check:

     @AllArgsConstructor private static class Task { private final int desired_time ; private final String url ; } private static boolean isReadyForProcessing(Task task) { Worker updatedWorker = workers.computeIfPresent(task.url, (url, worker) -> { worker.isAddled.set(System.currentTimeMillis() > task.desired_time); return worker; }); return updatedWorker == null; } 

    It turned out a bit messy and confusing, but I hope the essence is clear.