In functional programming, a monad is a structure that represents a calculation in a chain of successive steps. The type and structure of a monad determine the chain of operations; in our case, a sequence of methods with built-in functions of a given type.This tutorial will teach you how to work with streams and show you how to handle the various methods available in the Stream API. We will analyze the order of operations and trace how the sequence of methods in the chain affects performance. Let's
flatMap acquainted with the powerful methods of the Stream API, such as reduce , collect and flatMap . At the end of the manual we will pay attention to parallel work with threads. List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); // C1 // C2 filter , map and sorted are intermediate, and forEach is terminal. For a complete list of available stream methods, refer to the documentation . Such a chain of streaming operations is also known as an operation pipeline.stream() and parllelStream() methods for creating sequential and parallel streams. Parallel threads are able to work in multi-thread mode (on multiple threads) and will be discussed at the end of the tutorial. For now, consider sequential streams: Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println); // a1 stream() method for the list returns a normal stream object. Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println); // a1 Stream.of() to create a stream from multiple object references.IntStream , LongStream , DoubleStream .IntStream.range() using IntStream.range() : IntStream.range(1, 4) .forEach(System.out::println); // 1 // 2 // 3 sum() and average() Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0 mapToInt() , mapToLong() , mapToDouble() : Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); // 3 mapToObj() : IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }); forEach method: Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s)); filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c filter method and then through forEach and only then, after the first element passes through the entire chain of methods, the next element begins to be processed. Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); }); // map: d2 // anyMatch: D2 // map: a2 // anyMatch: A2 anyMatch method returns true as soon as the predicate is applied to the input element. In this case, this is the second element of the sequence - “A2”. Accordingly, thanks to the “vertical” execution of the thread chain, the map will only be called twice. Thus, instead of displaying all the elements of the stream, the map will be called the minimum possible number of times.map and filter and terminal method forEach . Consider how these methods are performed: Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); // map: d2 // filter: D2 // map: a2 // filter: A2 // forEach: A2 // map: b1 // filter: B1 // map: b3 // filter: B3 // map: c // filter: C map and filter methods are called 5 times during the execution time — once for each element of the original collection, while forEach is called only once — for the element that passed the filter.filter in the first place: Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // map: a2 // forEach: A2 // filter: b1 // filter: b3 // filter: c Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2 sorted method runs horizontally. In this case, sorted is called 8 times for several combinations of the elements of the incoming collection. Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // filter: b1 // filter: b3 // filter: c // map: a2 // forEach: A2 sorted is not called at all. filter reduces the input collection to one element. In the case of large input data, performance will benefit significantly. Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception noneMatch after anyMatch in one thread results in the following exception: java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28) Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok get method call creates a new thread in which you can safely call the desired terminal method.flatMap deeper into more complex methods: collect , flatMap and reduce . class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); Collect very useful terminal method that serves to convert stream elements into a result of another type, for example, List, Set or Map.Collect accepts Collector , which contains four different methods: supplier (supplier). battery (accumulator), combiner, finisher (finisher). At first glance, this looks very difficult, but Java 8 supports various built-in collectors through the Collectors class, where the most used methods are implemented. List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered); // [Peter, Pamela] Collectors.toSet() . Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p)); // age 18: [Max] // age 23: [Peter, Pamela] // age 12: [David] Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge); // 19.0 IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary); // IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23} String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase); // In Germany Max and Peter and Pamela are of legal age. IllegalStateException . You can optionally add a merge function to bypass the exception: Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map); // {18=Max, 23=Peter;Pamela, 12=David} Collector.of() . We need four components of our collector: supplier, battery, connector, finisher. Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisher String names = persons .stream() .collect(personNameCollector); System.out.println(names); // MAX | PETER | PAMELA | DAVID StringJoiner that allows the collector to build a string for us. In the first stage, the provider constructs a StringJoiner with an assigned delimiter. The battery is used to add each name to the StringJoiner .StringJoiner to one. And at the end of the finisher constructs the desired string from StringJoiner s.map method. Map is a kind of limited method, since each object can be mapped to just one other object. But what if you want to map one object to many others, or not to display it at all? This is where the flatMap method flatMap . FlatMap turns each stream object into a stream of other objects. The contents of these streams are then packed into the return stream of the flatMap method.flatMap in action, we build a suitable type hierarchy for an example: class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } } List<Foo> foos = new ArrayList<>(); // create foos IntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i))); // create bars foos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name)))); FlatMap accepts a function that should return a stream of objects. Thus, to access the bar objects of each foo , we just need to select the appropriate function: foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); // Bar1 <- Foo1 // Bar2 <- Foo1 // Bar3 <- Foo1 // Bar1 <- Foo2 // Bar2 <- Foo2 // Bar3 <- Foo2 // Bar1 <- Foo3 // Bar2 <- Foo3 // Bar3 <- Foo3 IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); FlatMap also available in the Optional class, introduced in Java 8. The FlatMap from the Optional class returns an optional object of another class. This can be used to avoid cluttering up null checks. class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; } null checks to avoid a NullPointException : Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); } Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println); flatMap call returns an Optional wrapper for the desired object, if present, or for null if there is no object. persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println); // Pamela reduce method takes an accumulating function with a binary operator (BinaryOperator). Here reduce is a bi-function (BiFunction), where both arguments belong to the same type. In our case, to type Person . The bi-function is practically the same as the функция (Function), however, it takes 2 arguments. In our example, the function compares the age of two people and returns the item with a greater age.reduce method takes both the initial value and the accumulator with the binary operator. This method can be used to create a new item. We have a Person with a name and age, consisting of the addition of all the names and the sum of past years: Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age); // name=MaxPeterPamelaDavid; age=76 reduce method takes three parameters: the initial value, the accumulator with the bi-function, and the unifying function of the type of the binary operator. Since the initial value of the type is not limited to the type of Person, a reduction can be used to determine the sum of the living years of each person: Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum); // 76 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Max // accumulator: sum=18; person=Peter // accumulator: sum=41; person=Pamela // accumulator: sum=64; person=David Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Pamela // accumulator: sum=0; person=David // accumulator: sum=0; person=Max // accumulator: sum=0; person=Peter // combiner: sum1=18; sum2=23 // combiner: sum1=23; sum2=12 // combiner: sum1=41; sum2=35 ForkJoinPoolaccessible via static method call ForkJoinPool.commonPool(). The size of the main thread pool can reach 5 threads of execution - the exact number depends on the number of available physical processor cores. ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 parallelStream()for creating parallel data streams. You can also call an intermediate method parallel()to turn a serial stream into a parallel stream.System.out: Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2] ForkJoinPool. The output sequence may differ, since the execution sequence of each specific thread is not defined (thread).sort: Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1] sortexecuted sequentially and only in the main stream . In fact, when running a stream in parallel under the hood of a method sortfrom the Stream API Arrays, a class sorting method added in Java 8 is hidden Arrays.parallelSort(). As stated in the documentation, this method, based on the length of the incoming collection, determines how exactly - the sorting will be performed in parallel or sequentially:If the length of a particular array is less than the minimum “grain”, sorting is performed by executing the Arrays.sort method.Let's return to the example with the method
reducefrom the previous chapter. We have already found out that the unifying function is called only when working in parallel with the thread. Consider which threads are involved: List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; }); accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2] ForkJoinPool , так широко используемый в JVM. Так что применение медленных блокирующих методов потока может негативно отразиться на производительности всей программы, за счет блокирования потоков (threads), используемых для обработки в других задачах.Source: https://habr.com/ru/post/437038/