Java 8th 函数式编程:流式数据处理

第一次接触到流式数据处理的时候,第一感觉是流式数据处理让集合操作变得简洁了许多,通常我们需要多行代码才能完成的操作,借助于流式数据处理可以在一行中实现。比如我们希望对一个包含整数的集合筛选出所有的偶数,并将其封装成为一个新的集合返回,那么在 8th 之前,我们需要通过如下代码实现:

1
2
3
4
5
6
List<Integer> evens = new ArrayList<>();
for (final Integer num : nums) {
if (num % 2 == 0) {
evens.add(num);
}
}

借助 java 8th 的流式数据处理,我们可以将代码简化为:

1
List<Integer> evens = nums.stream().filter(num -> num % 2 == 0).collect(Collectors.toList());

先简单解释一下上面这行代码的语义,stream() 操作将集合转换成一个流,filter() 执行我们自定义的筛选处理,这里是通过 lambda 表达式筛选出所有偶数,最后通过 collect() 对结果进行封装处理,并通过 Collectors.toList() 指定将结果封装成为一个 List 集合返回。

由上面的例子可以看到流式数据处理能够极大简化对于集合的操作,实际上不光是集合,包括数组、文件等,只要是可以转换成流,我们都可以借助流式数据处理简化代码实现。8th 通过内部迭代来实现对流的处理,一个流式数据处理可以分为三个部分:转换成流、中间操作、终端操作。如下图:

image

以集合为例,一个流式数据处理操作我们首先需要调用 stream() 函数将其转换成流,再调用相应的 “中间操作” 达到我们需要对集合进行的处理,比如筛选、转换等,最后通过 “终端操作” 对前面的结果进行封装,返回我们需要的结果。

一. 中间操作

这里先定义一个简单的学生实体类,用于后面的例子演示:

1
2
3
4
5
6
7
8
9
10
public class Student {
private long id;
private String name;
private int age;
private int grade;
private String major;
private String school;

// 省略getter和setter
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 初始化
List<Student> students = new ArrayList<Student>() {
{
add(new Student(20160001, "孔明", 20, 1, "土木工程", "武汉大学"));
add(new Student(20160002, "伯约", 21, 2, "信息安全", "武汉大学"));
add(new Student(20160003, "玄德", 22, 3, "经济管理", "武汉大学"));
add(new Student(20160004, "云长", 21, 2, "信息安全", "武汉大学"));
add(new Student(20161001, "翼德", 21, 2, "机械与自动化", "华中科技大学"));
add(new Student(20161002, "元直", 23, 4, "土木工程", "华中科技大学"));
add(new Student(20161003, "奉孝", 23, 4, "计算机科学", "华中科技大学"));
add(new Student(20162001, "仲谋", 22, 3, "土木工程", "浙江大学"));
add(new Student(20162002, "鲁肃", 23, 4, "计算机科学", "浙江大学"));
add(new Student(20163001, "丁奉", 24, 5, "土木工程", "南京大学"));
}
};

1.1 过滤

过滤,顾名思义就是按照给定的要求从集合中筛选出满足条件的元素,8th 提供的筛选操作包括:filter、distinct、limit、skip。

  • filter

在前面的例子中已经演示了如何使用 filter,其定义为: Stream<T> filter(Predicate<? super T> predicate),filter 接受一个谓词 Predicate,我们可以通过这个谓词定义筛选条件,在介绍 lambda 表达式时我们介绍过 Predicate 是一个函数式接口,它包含一个 test(T t) 方法,用于测试条件是否满足。现在我们希望从集合 students 中筛选出所有武汉大学的学生,那么我们可以通过 filter 实现,并将筛选操作以参数形式传递给 filter:

1
2
3
List<Student> whuStudents = students.stream()
.filter(student -> "武汉大学".equals(student.getSchool()))
.collect(Collectors.toList());
  • distinct

distinct 操作类似于我们在写 SQL 语句时添加的 DISTINCT 关键字,用于去重处理,distinct 基于 Object.equals(Object) 实现,以最开始的例子为例,假设我们希望筛选出所有不重复的偶数,那么可以添加 distinct 操作对筛选结果进行去重:

1
2
3
List<Integer> evens = nums.stream()
.filter(num -> num % 2 == 0).distinct()
.collect(Collectors.toList());
  • limit

limit 操作也类似于 SQL 语句中的 LIMIT 关键字,不过相对功能较弱,limit 返回包含前 n 个元素的流,当集合大小小于n时则返回实际长度,比如下面的例子返回前两个专业为土木工程的学生:

1
2
3
List<Student> civilStudents = students.stream()
.filter(student -> "土木工程".equals(student.getMajor())).limit(2)
.collect(Collectors.toList());

说到 limit,不得不提及一下另外一个流操作: sorted。该操作用于对流中元素进行排序,sorted 要求待比较的元素必须实现 Comparable 接口,如果没有实现也不要紧,我们可以将比较器作为参数传递给 sorted(Comparator<? super T> comparator),比如我们希望筛选出专业为土木工程的学生,并按年龄从小到大排序,筛选出年龄最小的两个学生,那么可以实现为:

1
2
3
4
5
List<Student> sortedCivilStudents = students.stream()
.filter(student -> "土木工程".equals(student.getMajor()))
.sorted(Comparator.comparingInt(Student::getAge))
.limit(2)
.collect(Collectors.toList());
  • skip

skip 操作与 limit 操作相反,如同其字面意思一样,skip 用于跳过前 n 个元素,比如我们希望找出排序在第二名之后的土木工程专业的学生,那么可以实现为:

1
2
3
4
List<Student> civilStudents = students.stream()
.filter(student -> "土木工程".equals(student.getMajor()))
.skip(2)
.collect(Collectors.toList());

通过 skip 方法就会跳过前面两个元素,返回由后面所有元素构造的流,如果 n 大于满足条件的集合的长度,则会返回一个空的集合。

1.2 映射

在 SQL 中,借助 SELECT 关键字后面添加需要的字段名称,可以仅输出我们需要的结果字段,而流式数据处理的映射操作也同样用于实现这一操作,在流式数据处理中主要包含两类映射操作:map 和 flatMap。

  • map

举例说明,假设我们希望筛选出所有专业为计算机科学的学生姓名,那么我们可以在 filter 筛选的基础之上,通过 map 将学生实体映射成为学生姓名字符串,具体实现如下:

1
2
3
List<String> names = students.stream()
.filter(student -> "计算机科学".equals(student.getMajor()))
.map(Student::getName).collect(Collectors.toList());

除了上面这类基础的 map,java 8th 还提供了 mapToDouble(ToDoubleFunction<? super T> mapper)mapToInt(ToIntFunction<? super T> mapper)mapToLong(ToLongFunction<? super T> mapper) 映射分别返回对应类型的流,8th 为这些流设定了一些特殊的操作,比如我们希望计算所有专业为计算机科学学生的年龄之和,那么可以实现如下:

1
2
3
int totalAge = students.stream()
.filter(student -> "计算机科学".equals(student.getMajor()))
.mapToInt(Student::getAge).sum();

通过将 Student 按照年龄直接映射为 IntStream,我们可以直接调用类型提供的 sum() 方法进行求和,此外使用这些数值流的好处还在于可以避免 jvm 装箱操作所带来的性能消耗。

  • flatMap

flatMap 与 map 的主要区别在于 flatMap 是将一个流中的每个值都转成一个个流,然后再将这些流扁平化成为一个流 。举例说明,假设我们有一个字符串数组 String[] strs = {"java8", "is", "easy", "to", "use"};,我们希望输出构成这一数组的所有非重复字符,那么可能会首先想到如下实现:

1
2
3
4
List<String[]> distinctStrs = Arrays.stream(strs)
.map(str -> str.split("")) // 映射成为Stream<String[]>
.distinct()
.collect(Collectors.toList());

在执行 map 操作以后,我们得到是一个包含多个字符串(构成一个字符串的字符数组)的流,此时执行 distinct 操作是基于在这些字符串数组之间的对比,所以达不到我们希望的目的,此时的输出为:

1
2
3
4
5
[j, a, v, a, 8]
[i, s]
[e, a, s, y]
[t, o]
[u, s, e]

distinct 只有对于一个包含多个字符的流进行操作才能达到我们的目的,即对 Stream<String> 进行操作。此时 flatMap 就可以派上用场:

1
2
3
4
5
List<String> distinctStrs = Arrays.stream(strs)
.map(str -> str.split("")) // 映射成为Stream<String[]>
.flatMap(Arrays::stream) // 扁平化为Stream<String>
.distinct()
.collect(Collectors.toList());

flatMap 将由 map 映射得到的 Stream<String[]> 转换成由各个字符串数组映射成的流 Stream<String>,再将这些小的流扁平化成为一个由所有字符串构成的大扁平流 Steam<String>,从而达到我们的目的。

与 map 类似,flatMap 也提供了针对特定类型的映射操作:flatMapToDouble(Function<? super T,? extends DoubleStream> mapper)flatMapToInt(Function<? super T,? extends IntStream> mapper)flatMapToLong(Function<? super T,? extends LongStream> mapper)

二. 终端操作

终端操作是流式数据处理的最后一步,我们可以在终端操作中实现对流查找、归约等操作。

2.1 查找

  • allMatch

allMatch 用于检测元素是否全部都满足指定的参数行为,如果全部满足则返回 true,例如我们希望检测是否所有的学生都已满 18 周岁,那么可以实现为:

1
boolean isAdult = students.stream().allMatch(student -> student.getAge() >= 18);
  • anyMatch

anyMatch 则是检测是否存在一个或多个元素满足指定的参数行为,如果满足则返回 true,例如我们希望检测是否有来自武汉大学的学生,那么可以实现为:

1
boolean hasWhu = students.stream().anyMatch(student -> "武汉大学".equals(student.getSchool()));
  • noneMathch

noneMatch 用于检测是否不存在满足指定行为的元素,如果不存在则返回 true,例如我们希望检测是否不存在专业为计算机科学的学生,可以实现如下:

1
boolean noneCs = students.stream().noneMatch(student -> "计算机科学".equals(student.getMajor()));
  • findFirst

findFirst 用于返回满足条件的第一个元素,比如我们希望选出专业为土木工程的排在第一位的学生,那么可以实现如下:

1
Optional<Student> optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findFirst();

findFirst 不携带参数,具体的查找条件可以通过 filter 设置,我们可以发现 findFirst 返回的是一个 Optional 类型,关于 Optional 的具体讲解可以参考本系列对应的文章。

  • findAny

findAny 相对于 findFirst 的区别在于 findAny 不一定返回第一个,而是返回任意一个,比如我们希望返回任意一个专业为土木工程的学生,可以实现如下:

1
Optional<Student> optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findAny();

实际上对于顺序流式数据处理而言,findFirst 和 findAny 返回的结果是一样的,至于为什么这样设计是因为在下一篇我们介绍的 并行流式数据处理 ,当我们启用并行流式数据处理的时候,查找第一个元素往往会有很多限制,如果不是特别需求,在并行流式数据处理中使用 findAny 的性能要比 findFirst 要高。

2.2 归约

前面的例子中我们大部分都是通过 collect(Collectors.toList()) 对数据进行封装返回,如果我们的目标不是返回一个新的集合,而是希望对经过参数化操作后的集合进行进一步的运算,那么我们可用对集合实施归约操作。8th 的流式数据处理提供了 reduce 方法来达到这一目的。

前面我们通过 mapToInt 将 Stream<Student> 映射成为 IntStream,并通过该类型的 sum 方法求得所有学生的年龄之和,实际上我们通过归约操作也可以达到这一目的,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 前面例子中的方法
int totalAge = students.stream()
.filter(student -> "计算机科学".equals(student.getMajor()))
.mapToInt(Student::getAge).sum();

// 归约操作
int totalAge = students.stream()
.filter(student -> "计算机科学".equals(student.getMajor()))
.map(Student::getAge)
.reduce(0, (a, b) -> a + b);

// 进一步简化
int totalAge2 = students.stream()
.filter(student -> "计算机科学".equals(student.getMajor()))
.map(Student::getAge)
.reduce(0, Integer::sum);

// 采用无初始值的重载版本,需要注意返回Optional
Optional<Integer> totalAge = students.stream()
.filter(student -> "计算机科学".equals(student.getMajor()))
.map(Student::getAge)
.reduce(Integer::sum); // 去掉初始值

2.3 收集

前面采用的 collect(Collectors.toList()) 是一个简单的收集操作,是对处理结果的封装,对应的还有 toSettoMap 可以满足我们对于结果组织的需求。这些方法均来自于 java.util.stream.Collectors,我们可以称之为收集器。

2.3.1 归约

收集器也提供了相应的归约操作,但是与 reduce 在内部实现上是有区别的,收集器更加适用于可变容器上的归约操作,这些收集器广义上均基于 Collectors.reducing() 实现。

  • 例1:求学生的总人数
1
2
3
4
long count = students.stream().collect(Collectors.counting());

// 进一步简化
long count = students.stream().count();
  • 例2:求年龄的最大值和最小值
1
2
3
4
5
6
7
8
// 求最大年龄
Optional<Student> olderStudent = students.stream().collect(Collectors.maxBy((s1, s2) -> s1.getAge() - s2.getAge()));

// 进一步简化
Optional<Student> olderStudent2 = students.stream().collect(Collectors.maxBy(Comparator.comparing(Student::getAge)));

// 求最小年龄
Optional<Student> olderStudent3 = students.stream().collect(Collectors.minBy(Comparator.comparing(Student::getAge)));
  • 例3:求年龄总和
1
int totalAge4 = students.stream().collect(Collectors.summingInt(Student::getAge));

对应的还有 summingLongsummingDouble

  • 例4:求年龄的平均值
1
double avgAge = students.stream().collect(Collectors.averagingInt(Student::getAge));

对应的还有 averagingLongaveragingDouble

  • 例5:一次性得到元素个数、总和、均值、最大值、最小值
1
IntSummaryStatistics statistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));

输出:

1
IntSummaryStatistics{count=10, sum=220, min=20, average=22.000000, max=24}

对应的还有 summarizingLongsummarizingDouble

  • 例6:字符串拼接
1
2
3
4
String names = students.stream().map(Student::getName).collect(Collectors.joining());
// 输出:孔明伯约玄德云长翼德元直奉孝仲谋鲁肃丁奉
String names = students.stream().map(Student::getName).collect(Collectors.joining(", "));
// 输出:孔明, 伯约, 玄德, 云长, 翼德, 元直, 奉孝, 仲谋, 鲁肃, 丁奉
2.3.2 分组

在数据库操作中,我们可以通过 GROUP BY 关键字对查询到的数据进行分组,流式数据处理也为我们提供了这样的功能 Collectors.groupingBy 来操作集合。比如我们可以按学校对上面的学生进行分组:

1
Map<String, List<Student>> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool));

groupingBy 接收一个分类器 Function<? super T, ? extends K> classifier,我们可以自定义分类器来实现需要的分类效果。

上面演示的是一级分组,我们还可以定义多个分类器实现 多级分组 ,比如我们希望在按学校分组的基础之上再按照专业进行分组,实现如下:

1
2
3
Map<String, Map<String, List<Student>>> groups2 = students.stream().collect(
Collectors.groupingBy(Student::getSchool, // 一级分组,按学校
Collectors.groupingBy(Student::getMajor))); // 二级分组,按专业

实际上在 groupingBy 的第二个参数不是只能传递 groupingBy,还可以传递任意 Collector 类型,比如我们可以传递一个 Collector.counting,用以统计每个组的个数:

1
Map<String, Long> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool, Collectors.counting()));

如果我们不添加第二个参数,则编译器会默认帮我们添加一个 Collectors.toList()

2.3.3 分区

分区可以看做是分组的一种特殊情况,在分区中 key 只有两种情况:true 或 false,目的是将待分区集合按照条件一分为二,流式数据处理利用 ollectors.partitioningBy() 方法实现分区,该方法接收一个谓词,例如我们希望将学生分为武大学生和非武大学生,那么可以实现如下:

1
Map<Boolean, List<Student>> partition = students.stream().collect(Collectors.partitioningBy(student -> "武汉大学".equals(student.getSchool())));

分区相对分组的优势在于我们可以同时得到两类结果,在一些应用场景下可以一步得到我们需要的所有结果,比如将数组分为奇数和偶数。

以上介绍的所有收集器均实现自接口 java.util.stream.Collector,该接口的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public interface Collector<T, A, R> {
/**
* A function that creates and returns a new mutable result container.
*
* @return a function which returns a new, mutable result container
*/
Supplier<A> supplier();

/**
* A function that folds a value into a mutable result container.
*
* @return a function which folds a value into a mutable result container
*/
BiConsumer<A, T> accumulator();

/**
* A function that accepts two partial results and merges them. The
* combiner function may fold state from one argument into the other and
* return that, or may return a new result container.
*
* @return a function which combines two partial results into a combined
* result
*/
BinaryOperator<A> combiner();

/**
* Perform the final transformation from the intermediate accumulation type
* {@code A} to the final result type {@code R}.
*
* <p>If the characteristic {@code IDENTITY_TRANSFORM} is
* set, this function may be presumed to be an identity transform with an
* unchecked cast from {@code A} to {@code R}.
*
* @return a function which transforms the intermediate result to the final
* result
*/
Function<A, R> finisher();

/**
* Returns a {@code Set} of {@code Collector.Characteristics} indicating
* the characteristics of this Collector. This set should be immutable.
*
* @return an immutable set of collector characteristics
*/
Set<Characteristics> characteristics();

}

我们也可以实现该接口来定义自己的收集器,此处不再展开。

三. 并行流式数据处理

流式数据处理中的很多都适合采用 分而治之 的思想,从而在处理较大集合时极大的提升代码的性能,8th 的设计者也看到了这一点,所以提供了 并行流式数据处理 。上面的例子中我们都是调用 stream() 方法启动流式数据处理,8th 还提供了 parallelStream() 方法以启动并行流式数据处理,parallelStream() 本质上基于 jdk 1.7 的 Fork-Join 框架实现,其默认的线程数为宿主机的内核数。

启动并行流式数据处理虽然简单,只需要将 stream() 替换成 parallelStream() 即可,但既然是并行,就会涉及到多线程安全问题,所以在启用之前要先确认并行是否值得(并行的效率不一定高于顺序执行),另外就是要保证线程安全。此两项无法保证,那么并行毫无意义,毕竟结果的正确性要比速度更加重要,以后有时间再来详细分析一下并行流式数据处理的具体实现和最佳实践。