There are several http-requests observers that start at the start of the application in parallel, generating some stream of entities for further processing (doSomething). Suppose request1, request2, request3

There is one observer that also starts when the application starts in parallel with these requests, generating only onComplete. Let's say init

We need to do this so that for these three threads, doSomething starts working as soon as init returns onComplete.

At the same time, any of the requests can reach the processing stage both earlier and later onComplete.

Now I’ve done this with a creepy build with a boolean readiness flag of this form in init:

Observer<Boolean> init = Observer.just(null) .repeatWhen(o->o.delay(1,TimeUnit.SECONDS)) .map(o->isready).filter(o->o).take(1); 

and for each request stuck it

 Observable.concat(init, requestN).skip(1).doSomething(...).subscribe(); 

But I feel that this is akin to the removal of the glands through the anus. How to do right?

    1 answer 1

    This is done like this ...

     //Класс, выполняющий некие асинхронные действия и сигнализирующий о ходе выполнения class Worker { public static PublishSubject<Boolean> someMethod(){ //На него будем подписываться. Он одновременно и Observer и Observable PublishSubject<Boolean> trigger = PublishSubject.create(); //Запускаем что-то асинхронное. Запрос через retrofit с rx например Request.api.call() .subscribe( r->somthing(), //onNext e->{trigger.onNext(false);trigger.onComplete()}, //onError. onNext(true) для trigger ()->{trigger.onNext(true);trigger.onComplete()} //onComplete. onNext(true) для trigger ); return trigger; } } //Класс, который будет дергать Worker.someMethod() и асинхронно же ждать от него ответа о выполнении class SomeClass { public void method() { Subscription someWork = Worker.someMethod().subscribe(result->doSomthing(result)); .......... } } 

    Actually such Workers can be as many as you want and you can stick together the answers from them as much as you wish and the rx methods will allow