I just started learning RxJava and need help with a few questions to complete the task.

one.

I have a service that paints the figures and should return the figure after a delay — that is, a second (for circles) or two (for squares) after he was asked for it. You can not block the stream, you need to use timers.

While I was able to achieve a delay in this way, but it takes much more than a second. I added Scheduler , because without it, the result is not returned at all:

 public Observable<PaintedCircle> paint(Shape shape) { return Observable .timer(1, TimeUnit.SECONDS, Schedulers.immediate()) .flatMap(x -> Observable.just(new PaintedCircle(shape.getSize()))); } 

I call this painting service from the code below (the code is still draft). In the same place, the figures are arranged in boxes of 5 pieces and displayed in the console:

 public class Consumer { private static final int CIRCLE_MIN_SIZE = 30; public static void main(String[] args) { Producer producer = new Producer(); while (!producer.doesShouldStop()) { Observable.from(producer.produceShapes(40)) .filter(shape -> shape instanceof Square || shape instanceof Circle && shape.getSize() > CIRCLE_MIN_SIZE) .flatMap(shape -> shape.getPaintingService().paint(shape)) .buffer(5) .map(shapes -> new Box(shapes)) .forEach(System.out::println); } } } 

How to improve the implementation of the delay?

2

The second question concerns the creation of the figures themselves. Squares and circles should be created randomly and randomly sized. So far I have been using the usual Java for this, but it can probably also be done with the help of reactivity. So far my code looks like this:

 public class Producer { private int MAX_SHAPE_SIZE = 100; private int counter; private boolean shouldStop; List<Shape> produceShapes(int amount) { List<Shape> shapes = new ArrayList<>(); Random random = new Random(); System.out.println("Produced following shapes:"); for (int i = 0; i < amount; i++) { int coin = random.nextInt(2); int size = Math.abs(random.nextInt(MAX_SHAPE_SIZE)); Shape shape = coin == 0 ? new Circle(size) : new Square(size); shapes.add(shape); System.out.print(shape); } System.out.println(); if (++counter == 3) { shouldStop = true; } return shapes; } boolean doesShouldStop() { return shouldStop; } } 

Can I rewrite this with RxJava?

In addition, according to the condition of the problem, Producer after three cycles, must inform Consumer 'y that there are no more figures. As you can see in Consumer code, while this is done using a while , but this also needs to be changed - probably with the help of a formal reply.

Please help.

    1 answer 1

    You can not block the stream, you need to use timers. While I was able to achieve a delay in this way, but it takes much more than a second. I added Scheduler, because without it, the result is not returned at all

    First, your Scheduler.immediate() essentially blocks the stream because it forces the timer to work not in a separate stream but in the current one.

    Why does the delay take significantly more than a second from the given code? It is not clear if it is possible to generate or output figures otzhiryayut a lot of time?

    Further, it is not clear why you need flatMap in the paint method. In general, it would be necessary to:

     public Observable<PaintedCircle> paint(Shape shape) { return Observable .timer(1, TimeUnit.SECONDS) .map(x -> new PaintedCircle(shape.getSize())); } 

    But there are 2 problems here:

    1. The timer you will have to work after a second after receiving the next shape, map to some of your rendered shape and send further. But the next shape will be taken not after that, but immediately after starting the timer from the previous one. As a result, you will not be able to output the shapes every second, but all in a crowd in a second after they are generated, as they were generated in a crowd. It worked for you, sort of normal because of the use of Scheduler.immediate() which blocked the stream on the timer and did not allow the next shape to emit it until the current timer runs.
    2. The next problem is that without using Scheduler.immediate() in the main thread, you start timers and assign actions to do what to do when the timers expire, but nobody is going to wait for the completion of tamers, the main method will end immediately after all the timers are started, and with it without giving any results.

    I don’t know about the first problem, maybe you should, but if not, then most likely you need to do not flatMap but concatMap when transferring the shapefile to the drawing service. If I correctly understood the docks, then this thing should, unlike flatMap wait for the previous action to complete before proceeding with the new.

    The second problem can be easily .forEach(System.out::println) adding a .forEach(System.out::println) before .forEach(System.out::println) : toBlocking()

    As a result, your second fragment will be like this:

     public class Consumer { private static final int CIRCLE_MIN_SIZE = 30; public static void main(String[] args) { Producer producer = new Producer(); while (!producer.doesShouldStop()) { Observable.from(producer.produceShapes(40)) .filter(shape -> shape instanceof Square || shape instanceof Circle && shape.getSize() > CIRCLE_MIN_SIZE) .concatMap(shape -> shape.getPaintingService().paint(shape)) .buffer(5) .map(shapes -> new Box(shapes)) .toBlocking() .forEach(System.out::println); } } } 

    By the way, why do you need buffer(5) ? Because of it, the output to the console will be delayed until each 5th shape is received.

    Regarding the rewriting of Producer on Rx - I do not see the point ...

    • thank you very much for the answer! Regarding the last point - I use buffer(5) , because I need to decompose all the figures in boxes of 5 pieces. Yes, they really unfold at the same time after receiving the fifth. Is there any other way to divide the stream into elements of five pieces? - Oleg Shankovskyi
    • what does boxing mean and how does buffer contribute to this? - xkor
    • Why does the buffer seem to understand, but I did not understand then how should they be decomposed taking into account delays - xkor
    • By condition, all the figures after painting should be displayed in "boxes" (lists or any other grouping) in the order in which we got them from painting, and the contents of the boxes should be displayed on the screen. buffer seems to work, but I'm not sure if it is true. - Oleg Shankovskyi
    • I also did not indicate an important point: there are two types of figures, circles and squares. The circle painting service should work at a speed of one circle per second, the square painting service should work one square in two seconds. This, as I understand it, should influence the order in which the figures are obtained from painting - Oleg Shankovskyi