Faced the problem of implementing a Consumer / Producer pattern with several Consumers; I would like to implement the ability to send a message to a queue with a single Producer and the ability to read from a queue with many Workers. I would say that the only problem is that I don’t have any idea how to build the right composition of classes.
In the main thread, I decided to leave only two challenges:
public static void main(String[] args) { CloudQueue sqs = new CloudQueue(5); for(int i=0; i<10; ++i){ sqs.send("Data"); } } Accordingly, the CloudQueue object performs some setup operations, and also creates a large queue in which the subroutines will communicate. I understand that everything goes to the creation of god-object , I will try to eliminate it.
class CloudQueue{ CloudQueueService queue = new CloudQueueService(); public CloudQueue(int queues){ } public void send(String s){ queue.push(s); } } CloudQueue in turn, creates an instance of CloudQueueService , the purpose of which is an attempt to implement the actual communication producer - queue - consumers ;
class CloudQueueService{ private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10); private WorkerPool pool; public CloudQueueService(){ WorkerPool pool = new WorkerPool(this.queue); } public void push(String s){ queue.add(s); } } CloudQueueService already directly creating a queue, as well as a pool of workers that must directly pull data from the queue and work with them.
I give an example of code WorkersPool and Worker; especially something about them I can not say.
class WorkerPool{ public WorkerPool(BlockingQueue<String> q){ ExecutorService executor = Executors.newFixedThreadPool(5); for(int i=0; i<5; i++){ Runnable worker = new Worker(q); executor.execute(worker); } executor.shutdown(); while (!executor.isTerminated()){ } System.out.println("Done"); } } class Worker implements Runnable{ private BlockingQueue<String> queue; public Worker(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run(){ try{ System.out.println(Thread.currentThread().getName() + "->" + this.queue.take()); }catch (Exception e){ e.printStackTrace(); } } } Accordingly, what problems have I encountered?
Weak idea how to organize the structure of this solution. In the main thread, I want to get rid of the system details as much as possible, since in the future
CloudQueuewill solve quite complex tasks, so I would like for the user to keep only thesend,getand initialization methods.Most likely the wrong understanding of the principles of working with threads, as the Producer should receive data from somewhere. I can’t think up a point of entry.