I will describe the application in full.

There is a group of threads - json handlers . they take json objects from the SynchronousQueue, which is called jsonQueue and process. There is a group of threads - generators of json-objects, which generates json and puts it in jsonQueue. To generate json generators, we need data that is loaded by a pool-loader. In the pool-loader, I submit a task (Runnable) that says - load me so much data for such an id. A json object consists of an id field and a value field. One id can correspond to a lot of value, but the json object itself contains only 1 id and 1 value.

Handlers work like this: while (true) {take an object from a jsonQueue and process it}

Generators work like this: while (true) {generate a json object and shove it in jsonQueue}

That is, for 1 iteration the flow generator generates 1 object in which there is 1 id and 1 value and shoves in jsonQueue.

In order that each time a flow generator tries to generate an object, it was not necessary to load a value from a database for a given id, I load several value => for each id value => for this id and use MultiMap . My MultiMap is a Map, where the keys are id, value is a collection of value that corresponds to id. After the initial data load, I run the generator threads. They work as follows: randomly choose a key from a set of keys, according to this key they receive a collection of values, take 1 value and generate json, which now consists of id and the selected value. Then the selected value is removed from the collection corresponding to our id. If the flow generator detects that the collection for id is empty, then it will submit the task to the pool loader - load me another value for this id

Here, say such a complete picture

  • The thing is that the idea itself is crazy at the root. Using MultiValueMap with LinkedBlockgingQueue is absurd. - cy6erGn0m
  • Why do you think so ? - Vladimir
  • Because you and MultiValueMap are misusing, and LinkedBlockingQueue. Read what they are for. - cy6erGn0m 1:08 pm
  • A MultiValueMap decorates another map, so I use it, key bind a collection with many values. It so happened that according to the logic of work as value, I need a queue functionality and not, say , Liszt. This was done for the convenience of submitting to update, it seemed to me that it is more convenient to pull an item out of the queue and in case it is null to add a task to update the queue for this key than to check the size of the List every time and = 0 to submit the same task Well, as you, from the point of view of experience, advise to implement such logic ? - Vladimir
  • You better describe the task in more detail, but not your data structures and streams (by task do you mean thread (thread)?) - avp

1 answer 1

In this case, you need to look in the direction of ExecutorCompletionService . With it, start the calculation of parts corresponding to the same id. And expect the completion of all parts. After this happens, start the scan to process all the results associated with this id.

  • that's the point that I don't have a task to process all the results simultaneously. I have a task that takes one of the results, takes the corresponding id, generates a json object and shoves it into the queue of these objects, then again takes one of the results and does the same if there are no results -> submit the task to load new results for it id - Vladimir
  • And who is working with someone in parallel? - avp
  • Map is used by 1. a group of generator threads - which drags id from it, value from the collection, removes this value from the collection, forms json, shoves jsonov into the queue. and starts the same process again. If the Collection for value is empty, submit the update to the pool (cat. It contains 1 stream), then try to generate json for another id (which has elements in the collection) 2. The pool that updates the queues for the specified id - Vladimir
  • The group of generators (as we found out in the question-answer process) does not require synchronization. The stream updating queues works synchronously with this group (either it or group). Actually, there remains only the moment of switching between the group and the stream by the renewer. Do you use wait / notify and some kind of counter? This place probably needs to be synchronized for the streams of the generator group. So ? - avp
  • Honestly, I still don’t understand who takes what and where it goes, and why there is a collection where the id corresponds, if there is already a queue. And it is not clear why the map is needed at all. Why task can not load part of the data for id, and then process it, without adding it to the map. - cy6erGn0m