I study RxJava. There is a main rx construction:

 ProcessFactory.generateProcessing(5) .flatMap(processes -> Observable.from(processes)) .flatMap(process -> Observable.just(process)) .doOnNext(process -> { process.start(); }) .subscribe(process -> System.out.println(process.toString() + " subscribed")); 

The method generates a list of threads wrapped in the Observable:

 public static Observable<List<Process>> generateProcessing(int count) { Random rand = new Random(71); int bound = 5; for (int i = 0; i < count; i++) mProcessList.add(Process.newProcess("Process" + i, rand.nextInt(bound))); return Observable.just(mProcessList); } 

The Process class itself is simple and inherited from Thread , all it does is work for a while and generates some data:

 public class Process extends Thread { private String name; private int duration; private int[] myArray; private Process(String name, int duration) { this.name = name; this.duration = duration; Logger.log("Process %s create. d = " + duration, name); } @Override public void run() { Logger.log("Process %s start! d = " + duration + " sec", name); try { Thread.sleep(duration * 1000); } catch (InterruptedException e) { e.printStackTrace(); } myArray = downloadBytes(); Logger.log("Precess %s finish", name); } @Override public String toString() { return name; } private int[] downloadBytes() { int bound = 100; Random rand = new Random(31); return new int[] {rand.nextInt(bound), rand.nextInt(bound), rand.nextInt(bound), rand.nextInt(bound)}; } public int[] getData() { return myArray; } public static Process newProcess(String name, int duration) { return new Process(name, duration); } } 

How can I make it so that after completing my task, Subscriber can be immediately notified about it and in my case, for example, output data to the console?

  • you try to use rx incorrectly; below, in the answer, everything was correctly spelled out that in the background you can run the execution, specifying only the sheduler you need. - andreich

1 answer 1

Rx already has everything you need to work with multithreading. Here is a small example similar to yours. Class in which the work is performed:

 class Process { int id; public Process(int id) { this.id = id; } void work() { try { System.out.println("running process " + id + " in thread " + Thread.currentThread().getName()); Thread.sleep(1000); System.out.println("process finish " + id + " in thread " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } 

And methods for doing work in different streams.

 public static Observable<String> runProcessObs(int id) { return Observable.fromCallable(() -> { new Process(id).work(); return "Notify sucess " + id; //здесь возвращаем сообщение после того как работа выполнена. Можно возвращать напрямую из метода "work()" и можно возвращать объекты любого типа (предварительно указанного в дженерике) }) .subscribeOn(Schedulers.io()); //каждый Process подписываем на отдельный поток } public static void runProcesses() { Observable.range(0, 5) .flatMap(integer -> runProcessObs(integer)) .subscribe(System.out::println); } 

logs:

 09:00:32.479 I/System.out: running process 0 in thread RxIoScheduler-3 09:00:32.479 I/System.out: running process 1 in thread RxIoScheduler-2 09:00:32.480 I/System.out: running process 2 in thread RxIoScheduler-5 09:00:32.480 I/System.out: running process 3 in thread RxIoScheduler-6 09:00:32.481 I/System.out: running process 4 in thread RxIoScheduler-4 09:00:33.479 I/System.out: process finish 0 in thread RxIoScheduler-3 09:00:33.479 I/System.out: Notify sucess 0 09:00:33.480 I/System.out: process finish 1 in thread RxIoScheduler-2 09:00:33.480 I/System.out: Notify sucess 1 09:00:33.481 I/System.out: process finish 2 in thread RxIoScheduler-5 09:00:33.481 I/System.out: Notify sucess 2 09:00:33.481 I/System.out: process finish 3 in thread RxIoScheduler-6 09:00:33.481 I/System.out: Notify sucess 3 09:00:33.481 I/System.out: process finish 4 in thread RxIoScheduler-4 09:00:33.481 I/System.out: Notify sucess 4 
  • Thank you for the detailed answer! - abbath0767