In functional programming, a monad is a structure that represents a calculation in a chain of successive steps. The type and structure of a monad determine the chain of operations; in our case, a sequence of methods with built-in functions of a given type.This tutorial will teach you how to work with streams and show you how to handle the various methods available in the Stream API. We will analyze the order of operations and trace how the sequence of methods in the chain affects performance. Let's
flatMap
acquainted with the powerful methods of the Stream API, such as reduce
, collect
and flatMap
. At the end of the manual we will pay attention to parallel work with threads. List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); // C1 // C2
filter
, map
and sorted
are intermediate, and forEach
is terminal. For a complete list of available stream methods, refer to the documentation . Such a chain of streaming operations is also known as an operation pipeline.stream()
and parllelStream()
methods for creating sequential and parallel streams. Parallel threads are able to work in multi-thread mode (on multiple threads) and will be discussed at the end of the tutorial. For now, consider sequential streams: Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println); // a1
stream()
method for the list returns a normal stream object. Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println); // a1
Stream.of()
to create a stream from multiple object references.IntStream
, LongStream
, DoubleStream
.IntStream.range()
using IntStream.range()
: IntStream.range(1, 4) .forEach(System.out::println); // 1 // 2 // 3
sum()
and average()
Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0
mapToInt()
, mapToLong()
, mapToDouble()
: Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); // 3
mapToObj()
: IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3
Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; });
forEach
method: Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s));
filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c
filter
method and then through forEach
and only then, after the first element passes through the entire chain of methods, the next element begins to be processed. Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); }); // map: d2 // anyMatch: D2 // map: a2 // anyMatch: A2
anyMatch
method returns true as soon as the predicate is applied to the input element. In this case, this is the second element of the sequence - “A2”. Accordingly, thanks to the “vertical” execution of the thread chain, the map
will only be called twice. Thus, instead of displaying all the elements of the stream, the map
will be called the minimum possible number of times.map
and filter
and terminal method forEach
. Consider how these methods are performed: Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); // map: d2 // filter: D2 // map: a2 // filter: A2 // forEach: A2 // map: b1 // filter: B1 // map: b3 // filter: B3 // map: c // filter: C
map
and filter
methods are called 5 times during the execution time — once for each element of the original collection, while forEach
is called only once — for the element that passed the filter.filter
in the first place: Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // map: a2 // forEach: A2 // filter: b1 // filter: b3 // filter: c
Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));
sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2
sorted
method runs horizontally. In this case, sorted
is called 8 times for several combinations of the elements of the incoming collection. Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // filter: b1 // filter: b3 // filter: c // map: a2 // forEach: A2
sorted
is not called at all. filter
reduces the input collection to one element. In the case of large input data, performance will benefit significantly. Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception
noneMatch
after anyMatch
in one thread results in the following exception: java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok
get
method call creates a new thread in which you can safely call the desired terminal method.flatMap
deeper into more complex methods: collect
, flatMap
and reduce
. class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12));
Collect
very useful terminal method that serves to convert stream elements into a result of another type, for example, List, Set or Map.Collect
accepts Collector
, which contains four different methods: supplier (supplier). battery (accumulator), combiner, finisher (finisher). At first glance, this looks very difficult, but Java 8 supports various built-in collectors through the Collectors
class, where the most used methods are implemented. List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered); // [Peter, Pamela]
Collectors.toSet()
. Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p)); // age 18: [Max] // age 23: [Peter, Pamela] // age 12: [David]
Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge); // 19.0
IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary); // IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase); // In Germany Max and Peter and Pamela are of legal age.
IllegalStateException
. You can optionally add a merge function to bypass the exception: Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map); // {18=Max, 23=Peter;Pamela, 12=David}
Collector.of()
. We need four components of our collector: supplier, battery, connector, finisher. Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisher String names = persons .stream() .collect(personNameCollector); System.out.println(names); // MAX | PETER | PAMELA | DAVID
StringJoiner
that allows the collector to build a string for us. In the first stage, the provider constructs a StringJoiner
with an assigned delimiter. The battery is used to add each name to the StringJoiner
.StringJoiner
to one. And at the end of the finisher constructs the desired string from StringJoiner
s.map
method. Map
is a kind of limited method, since each object can be mapped to just one other object. But what if you want to map one object to many others, or not to display it at all? This is where the flatMap
method flatMap
. FlatMap
turns each stream object into a stream of other objects. The contents of these streams are then packed into the return stream of the flatMap
method.flatMap
in action, we build a suitable type hierarchy for an example: class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } }
List<Foo> foos = new ArrayList<>(); // create foos IntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i))); // create bars foos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
FlatMap
accepts a function that should return a stream of objects. Thus, to access the bar objects of each foo , we just need to select the appropriate function: foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); // Bar1 <- Foo1 // Bar2 <- Foo1 // Bar3 <- Foo1 // Bar1 <- Foo2 // Bar2 <- Foo2 // Bar3 <- Foo2 // Bar1 <- Foo3 // Bar2 <- Foo3 // Bar3 <- Foo3
IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name));
FlatMap
also available in the Optional
class, introduced in Java 8. The FlatMap
from the Optional
class returns an optional object of another class. This can be used to avoid cluttering up null
checks. class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; }
null
checks to avoid a NullPointException
: Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); }
Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println);
flatMap
call returns an Optional
wrapper for the desired object, if present, or for null
if there is no object. persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println); // Pamela
reduce
method takes an accumulating function with a binary operator (BinaryOperator). Here reduce
is a bi-function (BiFunction), where both arguments belong to the same type. In our case, to type Person . The bi-function is practically the same as the функция
(Function), however, it takes 2 arguments. In our example, the function compares the age of two people and returns the item with a greater age.reduce
method takes both the initial value and the accumulator with the binary operator. This method can be used to create a new item. We have a Person with a name and age, consisting of the addition of all the names and the sum of past years: Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age); // name=MaxPeterPamelaDavid; age=76
reduce
method takes three parameters: the initial value, the accumulator with the bi-function, and the unifying function of the type of the binary operator. Since the initial value of the type is not limited to the type of Person, a reduction can be used to determine the sum of the living years of each person: Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum); // 76
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Max // accumulator: sum=18; person=Peter // accumulator: sum=41; person=Pamela // accumulator: sum=64; person=David
Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Pamela // accumulator: sum=0; person=David // accumulator: sum=0; person=Max // accumulator: sum=0; person=Peter // combiner: sum1=18; sum2=23 // combiner: sum1=23; sum2=12 // combiner: sum1=41; sum2=35
ForkJoinPool
accessible via static method call ForkJoinPool.commonPool()
. The size of the main thread pool can reach 5 threads of execution - the exact number depends on the number of available physical processor cores. ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
parallelStream()
for creating parallel data streams. You can also call an intermediate method parallel()
to turn a serial stream into a parallel stream.System.out
: Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2]
ForkJoinPool
. The output sequence may differ, since the execution sequence of each specific thread is not defined (thread).sort
: Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1]
sort
executed sequentially and only in the main stream . In fact, when running a stream in parallel under the hood of a method sort
from the Stream API Arrays
, a class sorting method added in Java 8 is hidden Arrays.parallelSort()
. As stated in the documentation, this method, based on the length of the incoming collection, determines how exactly - the sorting will be performed in parallel or sequentially:If the length of a particular array is less than the minimum “grain”, sorting is performed by executing the Arrays.sort method.Let's return to the example with the method
reduce
from the previous chapter. We have already found out that the unifying function is called only when working in parallel with the thread. Consider which threads are involved: List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; });
accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
ForkJoinPool
, так широко используемый в JVM. Так что применение медленных блокирующих методов потока может негативно отразиться на производительности всей программы, за счет блокирования потоков (threads), используемых для обработки в других задачах.Source: https://habr.com/ru/post/437038/