高级操作

数据流执行大量的不同操作。我们已经了解了一些最重要的操作,例如filtermap。我将它们留给你来探索所有其他的可用操作(请见数据流的Javadoc)。下面让我们深入了解一些更复杂的操作:collectflatMapreduce

这一节的大部分代码示例使用下面的Person列表来演示:

class Person {
    String name;
    int age;
    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
    @Override
    public String toString() {
        return name;
    }
}
List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

collect

collect是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如ListSet或者Mapcollect接受收集器(Collector),它由四个不同的操作组成:供应器(supplier)、累加器(accumulator)、组合器(combiner)和终止器(finisher)。这在开始听起来十分复杂,但是Java8通过内置的Collectors类支持多种内置的收集器。所以对于大部分常见操作,你并不需要自己实现收集器。

让我们以一个非常常见的用例来开始:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());
System.out.println(filtered);    // [Peter, Pamela]

就像你看到的那样,它非常简单,只是从流的元素中构造了一个列表。如果需要以Set来替代List,只需要使用Collectors.toSet()就好了。

下面的例子按照年龄对所有人进行分组:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));
personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

收集器十分灵活。你也可以在流的元素上执行聚合,例如,计算所有人的平均年龄:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge);     // 19.0

如果你对更多统计学方法感兴趣,概要收集器返回一个特殊的内置概要统计对象,所以我们可以简单计算最小年龄、最大年龄、算术平均年龄、总和和数量。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));
System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下面的例子将所有人连接为一个字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));
System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

连接收集器接受分隔符,以及可选的前缀和后缀。

为了将数据流中的元素转换为映射,我们需要指定键和值如何被映射。要记住键必须是唯一的,否则会抛出IllegalStateException异常。你可以选择传递一个合并函数作为额外的参数来避免这个异常。

既然我们知道了一些最强大的内置收集器,让我们来尝试构建自己的特殊收集器吧。我们希望将流中的所有人转换为一个字符串,包含所有大写的名称,并以|分割。为了完成它,我们通过Collector.of()创建了一个新的收集器。我们需要传递一个收集器的四个组成部分:供应器、累加器、组合器和终止器。

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher
String names = persons
    .stream()
    .collect(personNameCollector);
System.out.println(names);  // MAX | PETER | PAMELA | DAVID

由于Java中的字符串是不可变的,我们需要一个助手类StringJointer。让收集器构造我们的字符串。供应器最开始使用相应的分隔符构造了这样一个StringJointer。累加器用于将每个人的大写名称加到StringJointer中。组合器知道如何把两个StringJointer合并为一个。最后一步,终结器从StringJointer构造出预期的字符串。

flatMap

我们已经了解了如何通过使用map操作,将流中的对象转换为另一种类型。map有时十分受限,因为每个对象只能映射为一个其它对象。但如何我希望将一个对象转换为多个或零个其他对象呢?flatMap这时就会派上用场。

flatMap将流中的每个元素,转换为其它对象的流。所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。这些流的内容之后会放进flatMap所返回的流中。

在我们了解flatMap如何使用之前,我们需要相应的类型体系:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();
    Foo(String name) {
        this.name = name;
    }
}
class Bar {
    String name;
    Bar(String name) {
        this.name = name;
    }
}

下面,我们使用我们自己的关于流的知识来实例化一些对象:

List<Foo> foos = new ArrayList<>();
// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

现在我们拥有了含有三个foo的列表,每个都含有三个bar

flatMap接受返回对象流的函数。所以为了处理每个foo上的bar对象,我们需要传递相应的函数:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

像你看到的那样,我们成功地将含有三个foo对象中的流转换为含有九个bar对象的流。

最后,上面的代码示例可以简化为流式操作的单一流水线:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " + f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

flatMap也可用于Java8引入的Optional类。OptionalflatMap操作返回一个Optional或其他类型的对象。所以它可以用于避免烦人的null检查。

考虑像这样更复杂的层次结构:

class Outer {
    Nested nested;
}
class Nested {
    Inner inner;
}
class Inner {
    String foo;
}

为了处理外层示例上的内层字符串foo,你需要添加多个null检查来避免潜在的NullPointerException

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

可以使用OptionalflatMap操作来完成相同的行为:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

如果存在的话,每个flatMap的调用都会返回预期对象的Optional包装,否则为nullOptional包装。

reduce

归约操作将所有流中的元素组合为单一结果。Java8支持三种不同类型的reduce方法。第一种将流中的元素归约为流中的一个元素。让我们看看我们如何使用这个方法来计算出最老的人:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

reduce方法接受BinaryOperator积累函数。它实际上是两个操作数类型相同的BiFunctionBiFunction就像是Function,但是接受两个参数。示例中的函数比较两个人的年龄,来返回年龄较大的人。

第二个reduce方法接受一个初始值,和一个BinaryOperator累加器。这个方法可以用于从流中的其它Person对象中构造带有聚合后名称和年龄的新Person对象。

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三个reduce对象接受三个参数:初始值,BiFunction累加器和BinaryOperator类型的组合器函数。由于初始值的类型不一定为Person,我们可以使用这个归约函数来计算所有人的年龄总和。:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum);  // 76

你可以看到结果是76。但是背后发生了什么?让我们通过添加一些调试输出来扩展上面的代码:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });
// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

你可以看到,累加器函数做了所有工作。它首先使用初始值0和第一个人Max来调用累加器。接下来的三步中sum会持续增加,直到76。

等一下。好像组合器从来没有调用过?以并行方式执行相同的流会揭开这个秘密:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

这个流的并行执行行为会完全不同。现在实际上调用了组合器。由于累加器被并行调用,组合器需要用于计算部分累加值的总和。