摘要:
Stream是Java8的一大亮点,是对容器对象功能的增强,它专注于对容器对象进行各种非常便利、高效的 聚合操作(aggregate operation)或者大批量数据操作。Stream API借助于同样新出现的Lambda表达式,极大的提高编程效率和程序可读性。同时,它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用fork/join并行方式来拆分任务和加速处理过程。所以说,Java8中首次出现的 java.util.stream是一个函数式语言+多核时代综合影响的产物。
版权声明:
本文转载于陈争云,占宇剑和司磊在developerWorks上发表的《Java8中的Streams API详解》一文。
1、为什么需要Stream ?
Stream作为Java8的一大亮点,它与java.io包里的InputStream和OutputStream是完全不同的概念。它也不同于StAX对XML解析的Stream,也不是Amazon Kinesis对大数据实时处理的Stream。Java8中的Stream是对容器对象功能的增强,它专注于对容器对象进行各种非常便利、高效的 聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。Stream API借助于同样新出现的Lambda表达式,极大的提高编程效率和程序可读性。同时,它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用fork/join并行方式来拆分任务和加速处理过程。通常,编写并行代码很难而且容易出错, 但使用Stream API无需编写一行多线程的代码,就可以很方便地写出高性能的并发程序。所以说,Java8中首次出现的 java.util.stream是一个函数式语言+多核时代综合影响的产物。
2、什么是聚合操作
在传统的J2EE应用中,Java代码经常不得不依赖于关系型数据库的聚合操作来完成诸如:
这类的操作。但在当今这个数据大爆炸的时代,在数据来源多样化、数据海量化的今天,很多时候不得不脱离 RDBMS,或者以底层返回的数据为基础进行更上层的数据统计。而Java的集合API中,仅仅有极少量的辅助型方法,更多的时候是程序员需要用Iterator来遍历集合,完成相关的聚合应用逻辑,这是一种远不够高效、笨拙的方法。在Java7中,如果要发现type为grocery的所有交易,然后返回以交易值降序排序好的交易ID集合,我们需要这样写:
List<Transaction> groceryTransactions = new Arraylist<>(); for(Transaction t: transactions){ if(t.getType() == Transaction.GROCERY){ groceryTransactions.add(t); } }
Collections.sort(groceryTransactions, new Comparator(){
public int compare(Transaction t1, Transaction t2){
return t2.getValue().compareTo(t1.getValue());
}
});
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
transactionsIds.add(t.getId());
}
而在 Java 8 使用 Stream,代码更加简洁易读;而且使用并发模式,程序执行速度更快。
List<Integer> transactionsIds = transactions.parallelStream() .filter(t -> t.getType() == Transaction.GROCERY) .sorted(comparing(Transaction::getValue).reversed()) .map(Transaction::getId).collect(toList());
1、什么是流?
Stream不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的Iterator。原始版本的Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的Stream,用户只要给出需要对其包含的元素执行什么操作,比如,“过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream会隐式地在内部进行遍历,做出相应的数据转换。Stream就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。
而和迭代器又不同的是,Stream可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个item读完后再读下一个item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream的并行操作依赖于Java7中引入的Fork/Join框架(JSR166y)来拆分任务和加速处理过程。
Stream 的另外一大特点是,数据源本身可以是无限的。
2、流的构成
当我们使用一个流的时候,通常包括三个基本步骤:获取一个数据源(source)→ 数据转换 → 执行操作获取想要的结果。每次转换原有Stream对象不改变,返回一个新的Stream对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道,如下图所示:
3、Stream的生成方式
(1)从Collection和数组获得
(2)从BufferedReader获得
(3)静态工厂
(4)自己构建
(5)其他
4、流的操作类型
流的操作类型分为两种:
Intermediate:一个流可以后面跟随零个或多个intermediate操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
Terminal:一个流只能有一个terminal操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以,这必定是流的最后一个操作。Terminal操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个side effect。
在对一个Stream进行多次转换操作(Intermediate 操作),每次都对Stream的每个元素进行转换,而且是执行多次,这样时间复杂度就是N(转换次数)个for循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是lazy的,多个转换操作只会在Terminal操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在Terminal 操作的时候循环Stream对应的集合,然后对每个元素执行所有的函数。
还有一种操作被称为short-circuiting。用以指:对于一个intermediate操作,如果它接受的是一个无限大(infinite/unbounded)的Stream,但返回一个有限的新Stream;对于一个terminal操作,如果它接受的是一个无限大的Stream,但能在有限的时间计算出结果。
当操作一个无限大的 Stream,而又希望在有限时间内完成操作,则在管道内拥有一个short-circuiting操作是必要非充分条件。
简单说,对Stream的使用就是实现一个filter-map-reduce过程,产生一个最终结果,或者导致一个副作用(side effect)。
1).流的构造与转换
下面提供最常见的几种构造Stream的例子:
// 1. Individual values Stream stream = Stream.of("a", "b", "c");
// 2. Arrays
String [] strArray = new String[] {
“a”, “b”, “c”};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);
// 3. Collections
List<String> list = Arrays.asList(strArray);
stream = list.stream();
需要注意的是,对于基本数值型,目前有三种对应的包装类型Stream:IntStream、LongStream、DoubleStream。当然我们也可以用 Stream<Integer>、Stream<Long>和Stream<Double>,但是boxing/unboxing会很耗时,所以特别为这三种基本数值型提供了对应的Stream。
Java8中还没有提供其它数值型Stream,因为这将导致扩增的内容较多。而常规的数值型聚合运算可以通过上面三种Stream进行。
IntStream.of(new int[]{ 1, 2, 3}).forEach(System.out::println); IntStream.range(1, 3).forEach(System.out::println); IntStream.rangeClosed(1, 3).forEach(System.out::println);
流也可以转换为其它数据结构,例如:
// 1. Array String[] strArray1 = stream.toArray(String[]::new); // 2. Collection List<String> list1 = stream.collect(Collectors.toList()); List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new)); Set set1 = stream.collect(Collectors.toSet()); Stack stack1 = stream.collect(Collectors.toCollection(Stack::new)); // 3. String String str = stream.collect(Collectors.joining()).toString();
2).流的操作
接下来,当把一个数据结构包装成Stream后,就要开始对里面的元素进行各类操作了。常见的操作可以归类如下:
Intermediate 操作
map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered
Terminal 操作
forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
Short-circuiting 操作
anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit
我们下面看一下Stream的比较典型用法。
(1).Intermediate 操作
map/flatMap
我们先来看map,它的作用就是把inputStream的每个元素映射成outputStream的另外一个元素,例如:
List<Integer> nums = Arrays.asList(1, 2, 3, 4); List<Integer> squareNums = nums.stream().map(n -> n * n) .collect(Collectors.toList());
从上面例子可以看出,map生成的是个1:1映射,每个输入元素都按照规则转换成为另外一个元素。还有一些场景,是一对多映射关系的,这时需要flatMap,例如:
Stream<List<Integer>> inputStream = Stream.of( Arrays.asList(1), Arrays.asList(2, 3), Arrays.asList(4, 5, 6) ); Stream<Integer> outputStream = inputStream. flatMap((childList) -> childList.stream());
flatMap把inputStream中的层级结构 扁平化,就是将最底层元素抽出来放到一起,最终output的新Stream里面已经没有List了,都是直接的数字。
filter
filter对原始Stream进行某项测试,通过测试的元素被留下来生成一个新Stream。
// 留下偶数 Integer[] sixNums = { 1, 2, 3, 4, 5, 6}; Integer[] evens = Stream.of(sixNums).filter(n -> n%2 == 0).toArray(Integer[]::new);
forEach
forEach方法接收一个Lambda表达式,然后在Stream的每一个元素上执行该表达式。
// 对一个人员集合遍历,找出男性并打印姓名。 roster.stream().filter(p -> p.getGender() == Person.Sex.MALE) .forEach(p -> System.out.println(p.getName()));
可以看出来,forEach是为Lambda而设计的,保持了最紧凑的风格。当需要为多核系统优化时,可以parallelStream().forEach(),只是此时原有元素的次序没法保证,并行的情况下将改变串行时操作的行为,此时forEach本身的实现不需要调整,而Java8以前的for循环代码可能需要加入额外的多线程逻辑。但一般认为,forEach和常规for循环的差异不涉及到性能,它们仅仅是函数式风格与传统 Java 风格的差别。
另外一点需要注意,forEach是terminal操作。因此,它执行后,Stream 的元素就被“消费”掉了,你无法对一个Stream进行两次terminal运算。下面的代码是错误的:
stream.forEach(element -> doOneThing(element)); stream.forEach(element -> doAnotherThing(element));
相反,具有相似功能的intermediate操作peek可以达到上述目的。如下是出现在Stream api javadoc上的一个示例:
// peek 对每个元素执行操作并返回一个新的 Stream Stream.of("one", "two", "three", "four").filter(e -> e.length() > 3) .peek(e -> System.out.println("Filtered value: " + e)).map(String::toUpperCase) .peek(e -> System.out.println("Mapped value: " + e)).collect(Collectors.toList());
forEach 不能修改自己包含的本地变量值,也不能用break/return之类的关键字提前结束循环。
findFirst
这是一个termimal兼short-circuiting操作,它总是返回Stream的第一个元素或者空。这里比较重点的是它的返回值类型Optional:这也是一个模仿 Scala 语言中的概念,作为一个容器,它可能含有某值,或者不包含,使用它的目的是尽可能避免NullPointerException。
// Optional 的两个用例:以下两组示例是等价的
// Java 8
Optional.ofNullable(text).ifPresent(System.out::println);
// Pre-Java 8
if (text != null) {
System.out.println(text);
}
//----------
// Java 8
return Optional.ofNullable(text).map(String::length).orElse(-1);
// Pre-Java 8
return if (text != null) ? text.length() : -1;
};
在更复杂的if (xx != null)的情况中,使用Optional代码的可读性更好,而且它提供的是编译时检查,能极大的降低NPE这种Runtime Exception 对程序的影响,或者迫使程序员更早的在编码阶段处理空值问题,而不是留到运行时再发现和调试。
Stream中的findAny、max/min、reduce等方法等返回Optional值。还有例如IntStream.average()返回OptionalDouble等等。
reduce
这个方法的主要作用是把Stream元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面Stream的第一个、第二个、第n个元素组合。从这个意义上说,字符串拼接、数值的 sum、min、max、average都是特殊的reduce。例如Stream的sum就相当于:
Integer sum = integers.reduce(0, (a, b) -> a+b);
或
Integer sum = integers.reduce(0, Integer::sum);
也有没有起始值的情况,这时会把 Stream 的前面两个元素组合起来,返回的是 Optional。
// reduce 的用例
// 字符串连接,concat = “ABCD”
String concat = Stream.of(“A”, “B”, “C”, “D”).reduce("", String::concat);
// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
// 求和,sumValue = 10, 有起始值
int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
// 求和,sumValue = 10, 无起始值
sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
// 过滤,字符串连接,concat = “ace”
concat = Stream.of(“a”, “B”, “c”, “D”, “e”, “F”).
filter(x -> x.compareTo(“Z”) > 0).
reduce("", String::concat);
上面代码例如第一个示例的reduce(),第一个参数(空白字符)即为起始值,第二个参数(String::concat)为 BinaryOperator。这类有起始值的 reduce() 都返回具体的对象。而对于第四个示例没有起始值的 reduce(),由于可能没有足够的元素,返回的是 Optional,请留意这个区别。
limit/skip
limit返回Stream的前面n个元素;skip则是扔掉前n个元素(它是由一个叫 subStream的方法改名而来)。
//limit 和 skip 对运行次数的影响 public void testLimitAndSkip() { List<Person> persons = new ArrayList(); for (int i = 1; i <= 10000; i++) { Person person = new Person(i, "name" + i); persons.add(person); } List<String> personList2 = persons.stream(). map(Person::getName).limit(10).skip(3).collect(Collectors.toList()); System.out.println(personList2); } private class Person { public int no; private String name; public Person (int no, String name) { this.no = no; this.name = name; } public String getName() { System.out.println(name); return name; } }
输出结果为:
name1
name2
name3
name4
name5
name6
name7
name8
name9
name10
[name4, name5, name6, name7, name8, name9, name10]
这是一个有10,000个元素的Stream,但在short-circuiting操作limit和skip的作用下,管道中map操作指定的getName()方法的执行次数为 limit 所限定的10次,而最终返回结果在跳过前3个元素后只有后面7个返回。
有一种情况是limit/skip无法达到short-circuiting目的的,就是把它们放在Stream的排序操作后,原因跟sorted这个intermediate操作有关:此时系统并不知道Stream排序后的次序如何,所以sorted中的操作看上去就像完全没有被limit或者skip一样。
// limit 和 skip 对 sorted 后的运行次数无影响 List<Person> persons = new ArrayList(); for (int i = 1; i <= 5; i++) { Person person = new Person(i, "name" + i); persons.add(person); } List<Person> personList2 = persons.stream().sorted((p1, p2) -> p1.getName().compareTo(p2.getName())).limit(2).collect(Collectors.toList()); System.out.println(personList2);
输出结果为:
name2
name1
name3
name2
name4
name3
name5
name4
[stream.StreamDW
P
e
r
s
o
n
@
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
n
u
m
b
e
r
"
>
816
<
/
s
p
a
n
>
f
27
d
,
s
t
r
e
a
m
.
S
t
r
e
a
m
D
W
Person@<span class="hljs-number">816</span>f27d,stream.StreamDW
Person@<spanclass="hljs−number">816</span>f27d,stream.StreamDWPerson@87aac27]
即虽然最后的返回元素数量是 2,但整个管道中的 sorted 表达式执行次数没有像前面例子相应减少。最后有一点需要注意的是,对一个parallel的Stream 管道来说,如果其元素是有序的,那么limit操作的成本会比较大,因为它的返回对象必须是前n个也有一样次序的元素。取而代之的策略是取消元素间的次序,或者不要用parallel Stream。
sorted
对Stream的排序通过sorted进行,它比数组的排序更强之处在于你可以首先对Stream进行各类map、filter、limit、skip甚至distinct来减少元素数量后再排序,这能帮助程序明显缩短执行时间。例如:
// 优化:排序前进行 limit 和 skip List<Person> persons = new ArrayList(); for (int i = 1; i <= 5; i++) { Person person = new Person(i, "name" + i); persons.add(person); }
List<Person> personList2 = persons.stream().limit(2).sorted((p1, p2) -> p1.getName().compareTo(p2.getName())).collect(Collectors.toList());
System.out.println(personList2);
结果会简单很多:
name2 name1 [stream.StreamDW$Person@6ce253f1,stream.StreamDW$Person@53d8d10a]
当然,这种优化是有business logic上的局限性的:即不要求排序后再取值。
min/max/distinct
min和max的功能也可以通过对Stream元素先排序,再findFirst来实现,但前者的性能会更好为O(n),而sorted的成本是O(nlogn)。同时它们作为特殊的reduce方法被独立出来也是因为求最大最小值是很常见的操作。
// 找出最长一行的长度 BufferedReader br = new BufferedReader(new FileReader("c:\\SUService.log")); int longest = br.lines().mapToInt(String::length).max().getAsInt(); br.close(); System.out.println(longest);
distinct
下面的例子则使用distinct来找出不重复的单词。
// 找出全文的单词,转小写,并排序 List<String> words = br.lines().flatMap(line -> Stream.of(line.split(" "))). filter(word -> word.length() > 0).map(String::toLowerCase).distinct().sorted() .collect(Collectors.toList()); br.close(); System.out.println(words);
Match
Stream有三个match方法,从语义上说:
(1).allMatch:Stream 中全部元素符合传入的 predicate,返回 true;
(2).anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true;
(3).noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true.
它们都不是要遍历全部元素才能返回结果。例如allMatch只要一个元素不满足条件,就skip剩下的所有元素,返回false。对清单13中的Person类稍做修改,加入一个age属性和getAge方法。
// 使用 Match List<Person> persons = new ArrayList(); persons.add(new Person(1, "name" + 1, 10)); persons.add(new Person(2, "name" + 2, 21)); persons.add(new Person(3, "name" + 3, 34)); persons.add(new Person(4, "name" + 4, 6)); persons.add(new Person(5, "name" + 5, 55));
boolean isAllAdult = persons.stream().allMatch(p -> p.getAge() > 18);
System.out.println("All are adult? " + isAllAdult);
boolean isThereAnyChild = persons.stream().anyMatch(p -> p.getAge() < 12);
System.out.println("Any child? " + isThereAnyChild);
输出结果:
All are adult? false
Any child? true
总之,Stream 的特性可以归纳为:
不是数据结构;
它没有内部存储,它只是用操作管道从source(数据结构、数组、generator function、IO channel)抓取数据;
它也绝不修改自己所封装的底层数据结构的数据。例如Stream的filter操作会产生一个不包含被过滤元素的新Stream,而不是从source删除那些元素;
所有Stream的操作必须以lambda表达式为参数;
不支持索引访问;
你可以请求第一个元素,但无法请求第二个,第三个,或最后一个;
很容易生成数组或者List;
惰性化;
很多Stream操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始;
Intermediate操作永远是惰性化的;
并行能力;
当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的;
可以是无限的。集合有固定大小,Stream 则不必。limit(n)和findFirst()这类的short-circuiting操作可以对无限的Stream进行运算并很快完成。
Java8中的Streams API详解