Faced the problem of implementing a Consumer / Producer pattern with several Consumers; I would like to implement the ability to send a message to a queue with a single Producer and the ability to read from a queue with many Workers. I would say that the only problem is that I don’t have any idea how to build the right composition of classes.

In the main thread, I decided to leave only two challenges:

public static void main(String[] args) { CloudQueue sqs = new CloudQueue(5); for(int i=0; i<10; ++i){ sqs.send("Data"); } } 

Accordingly, the CloudQueue object performs some setup operations, and also creates a large queue in which the subroutines will communicate. I understand that everything goes to the creation of god-object , I will try to eliminate it.

 class CloudQueue{ CloudQueueService queue = new CloudQueueService(); public CloudQueue(int queues){ } public void send(String s){ queue.push(s); } } 

CloudQueue in turn, creates an instance of CloudQueueService , the purpose of which is an attempt to implement the actual communication producer - queue - consumers ;

 class CloudQueueService{ private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10); private WorkerPool pool; public CloudQueueService(){ WorkerPool pool = new WorkerPool(this.queue); } public void push(String s){ queue.add(s); } } 

CloudQueueService already directly creating a queue, as well as a pool of workers that must directly pull data from the queue and work with them.

I give an example of code WorkersPool and Worker; especially something about them I can not say.

 class WorkerPool{ public WorkerPool(BlockingQueue<String> q){ ExecutorService executor = Executors.newFixedThreadPool(5); for(int i=0; i<5; i++){ Runnable worker = new Worker(q); executor.execute(worker); } executor.shutdown(); while (!executor.isTerminated()){ } System.out.println("Done"); } } class Worker implements Runnable{ private BlockingQueue<String> queue; public Worker(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run(){ try{ System.out.println(Thread.currentThread().getName() + "->" + this.queue.take()); }catch (Exception e){ e.printStackTrace(); } } } 

Accordingly, what problems have I encountered?

  • Weak idea how to organize the structure of this solution. In the main thread, I want to get rid of the system details as much as possible, since in the future CloudQueue will solve quite complex tasks, so I would like for the user to keep only the send , get and initialization methods.

  • Most likely the wrong understanding of the principles of working with threads, as the Producer should receive data from somewhere. I can’t think up a point of entry.

    1 answer 1

    As I understand it, you want to create such an implementation of a pattern in which consumers will be created immediately (classes that implement the consumer can be transferred when designing a consumer-producer pattern) and their number is predetermined. I propose such a set of it, you need to refine to your goals.

     public class Test { private static boolean runningFlag = true; private static final int CONSUMERS_NUMBER = 5; private static ExecutorService executor = Executors.newFixedThreadPool(CONSUMERS_NUMBER); private static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10); public final static Test INSTANCE = new Test(); private Test(){ startConsumers(); } public static void startConsumers(){ for(int worker = 0; worker < CONSUMERS_NUMBER; worker++){ executor.submit(()->{ try { while(runningFlag) { String job = queue.take(); Thread.sleep((int)(Math.random()*1000));// do some job System.out.println("Job is done:"+job); } } catch (InterruptedException e) { e.printStackTrace(); } }); } } public static void stopConsumers(){ runningFlag = false; executor.shutdown(); } public static void sendJob(String someJob) throws InterruptedException { queue.put(someJob); // или add и т.п. } public static void main(String[] args) throws InterruptedException { for(int i = 100; i> 0; i--){ Test.INSTANCE.sendJob("sf"+i); } Test.stopConsumers(); } }