There is a controller's addItem () method, which is used to save an item; Subscriber subscribes to each new item:

public void addItem(String item){ someService.save(item); PublishSubject<String> subject = PublishSubject.create(); subject.subscribeOn(Schedulers.trampoline()).subscribe(simpleObserver); subject.onNext(item); log.info("Save item - " + item); } 

and there is a SimpleObserver

  public class SimpleObserver extends Subscriber<String> { @Override public void onCompleted() { log.info("onCompleted..."); } @Override public void onError(Throwable throwable) { log.info("onError Subscriber..."); } @Override public void onNext(String s) { log.info("After subscribe " + s"); } } 

in a sequential call

 addItem1("Item1"); addItem1("Item2"); addItem1("Item3"); addItem1("Item4"); 

the result is

 After subscribe Item1 Save item - Item1 After subscribe Item2 Save item - Item2 After subscribe Item3 Save item - Item3 After subscribe Item4 Save item - Item4 

How to make that all someService.save (item) are executed first; in mainThread, and then sequentially in a separate ThreadPool, all onNext () are simpleObserver-a, i.e. the result should be:

 Save item - Item1 Save item - Item2 Save item - Item3 Save item - Item4 //все save в mainThread After subscribe Item1 After subscribe Item2 After subscribe Item3 After subscribe Item4 //все After subscribe в одном отдельном пуле потоков последовательно 

Closed due to the fact that the essence of the question is incomprehensible by the participants Yuriy SPb , aleksandr barakin , user194374, Grundy , D-side 11 Jul '16 at 12:21 .

Try to write more detailed questions. To get an answer, explain what exactly you see the problem, how to reproduce it, what you want to get as a result, etc. Give an example that clearly demonstrates the problem. If the question can be reformulated according to the rules set out in the certificate , edit it .

  • 2
    I do not know how you use the code, but why not call all onNext consistently by a separate method at the right time? Those. first sign all the people you need, do the rest of the work, and then call the method separately with the onNext pass. You do not expect that some magic code will understand itself that addItem1 was called the last time and now you can do something? - JuriySPb
  • in fact, each onNext () should be called separately, because the addItem () method is called in RestControllere, so I need to figure out how to put each Item in a queue and process it in one ThreadPool - DenStreet
  • Well, I’m sort of proposing this ... Call them separately ... Put the data for them on the boob and call onNext with them at a time .... - Yuriy SPb am
  • Unfortunately, we need each onNext () separately ... That is, then call one method with a string of onNext () does not fit ... - DenStreet
  • In this case, your question becomes completely incomprehensible and most likely the solution simply does not exist, because it is impossible to know when you finish calling addItem to start calling onNext. In principle, impossible. - Yuriy SPb

1 answer 1

I will answer my question myself: It was necessary to use ExecutorService and add:

 private ExecutorService executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build()); 

then the method will look like this:

 public void addItem1(String item){ someService.save(item); PublishSubject<String> subject = PublishSubject.create(); subject.observeOn(Schedulers.from(executor)).subscribe(simpleObserver); subject.onNext(item); log.info("Save item - " + item + Thread.currentThread()); } 

and the result:

 Save item - Item1 Thread[main,5,main] Save item - Item2 Thread[main,5,main] Save item - Item3 Thread[main,5,main] Save item - Item4 Thread[main,5,main] After subscribe Item1 Thread[SubscribeOn-0,5,main] After subscribe Item2 Thread[SubscribeOn-0,5,main] After subscribe Item3 Thread[SubscribeOn-0,5,main] After subscribe Item4 Thread[SubscribeOn-0,5,main]