Faced the problem of implementing a Consumer / Producer pattern with several Consumers; I would like to implement the ability to send a message to a queue with a single Producer and the ability to read from a queue with many Workers. I would say that the only problem is that I don’t have any idea how to build the right composition of classes.
In the main thread, I decided to leave only two challenges:
public static void main(String[] args) { CloudQueue sqs = new CloudQueue(5); for(int i=0; i<10; ++i){ sqs.send("Data"); } }
Accordingly, the CloudQueue
object performs some setup
operations, and also creates a large queue in which the subroutines will communicate. I understand that everything goes to the creation of god-object
, I will try to eliminate it.
class CloudQueue{ CloudQueueService queue = new CloudQueueService(); public CloudQueue(int queues){ } public void send(String s){ queue.push(s); } }
CloudQueue
in turn, creates an instance of CloudQueueService
, the purpose of which is an attempt to implement the actual communication producer
- queue - consumers
;
class CloudQueueService{ private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10); private WorkerPool pool; public CloudQueueService(){ WorkerPool pool = new WorkerPool(this.queue); } public void push(String s){ queue.add(s); } }
CloudQueueService
already directly creating a queue, as well as a pool of workers that must directly pull data from the queue and work with them.
I give an example of code WorkersPool and Worker; especially something about them I can not say.
class WorkerPool{ public WorkerPool(BlockingQueue<String> q){ ExecutorService executor = Executors.newFixedThreadPool(5); for(int i=0; i<5; i++){ Runnable worker = new Worker(q); executor.execute(worker); } executor.shutdown(); while (!executor.isTerminated()){ } System.out.println("Done"); } } class Worker implements Runnable{ private BlockingQueue<String> queue; public Worker(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run(){ try{ System.out.println(Thread.currentThread().getName() + "->" + this.queue.take()); }catch (Exception e){ e.printStackTrace(); } } }
Accordingly, what problems have I encountered?
Weak idea how to organize the structure of this solution. In the main thread, I want to get rid of the system details as much as possible, since in the future
CloudQueue
will solve quite complex tasks, so I would like for the user to keep only thesend
,get
and initialization methods.Most likely the wrong understanding of the principles of working with threads, as the Producer should receive data from somewhere. I can’t think up a point of entry.