这篇文章介绍一下日常开发中并行流ParallelStream中隐藏的陷阱,这个问题其实离我们很近,特别是喜欢使用JDK1.8+
的流式编程的伙伴,应该会深有感触。标题中所谓的"陷阱",其实并不是ParallelStream自身的陷阱,而一般是开发者错误使用ParallelStream给自己埋下的陷阱。
下面举一个故意而为的例子,实际上应该不会有类似的业务代码:
public class ParallelStreamMain { public static void main(String[] args) throws Exception { List<List<Integer>> array = new ArrayList<>(); List<Integer> item1 = new ArrayList<>(); List<Integer> item2 = new ArrayList<>(); List<Integer> target = new ArrayList<>(100); array.add(item1); array.add(item2); array.parallelStream().forEach(x -> { for (int i = 0; i < 100000; i++) { target.add(i); } }); System.out.println(target.size()); } } 复制代码
某一次执行结果为:163913
。如果不停地执行这个main
方法,最终都会得到一个非200000
的结果,这里的问题就在于使用了并行流parallelStream()
方法。ParallelStream
底层使用了Fork/Join
框架实现,也就是应用了线程池ForkJoinPool
把并行流中的节点抽象为ForkJoinTask
进行计算,背后用到的"任务窃取"等原理这里就不进行展开,只需要明确:
ForkJoinPool
一般使用Runtime.getRuntime().availableProcessors()
(此值一般认为是物理机器的逻辑核心数量)作为并行度(parallelism
),简单认为是可并发执行的任务数,并不是工作线程数。ParallelStream
在流的节点中的所有操作都相当于在「一个多线程环境中」进行操作,里面的所有操作都会产生不可预期的结果,例如可能会数组越界、添加元素丢失、部分下标index
的引用为NULL
等等。写这篇文章不是有意为之,其实很早之前笔者曾经遇到一个比较隐蔽的生产故障,其中有一段访问量比较低的代码大致如下:
@Data private static class OrderDTO { private String orderId; private OrderStatus orderStatus; private BigDecimal amount; private Long customerId; } @Data private static class Order { private Long id; private String orderId; private Integer orderStatus; private BigDecimal amount; private Long customerId; private OffsetDateTime createTime; private OffsetDateTime editTime; } public void groupByOrderStatus(Long customerId) { List<Order> orders = orderDao.selectByCustomerId(customerId); List<OrderDTO> orderDTOList = new ArrayList<>(); orders.parallelStream().forEach(order -> { OrderDTO dto = new OrderDTO(); ...... orderDTOList.add(dto); }); Map<String, List<OrderDTO>> collect = orderDTOList.stream().collect(Collectors.groupingBy(item -> item.getOrderStatus().getCode())); ...... } 复制代码
该方法的功能是通过客户ID
查询订单列表,然后把订单列表转化为OrderDTO
列表,然后再按照订单状态字段进行分组。通过生产日志和测试回归发现,上面的代码段中groupByOrderStatus()
方法会偶发空指针异常。
初次出现问题的时候,由于开发者通过Lambda
表达式把多处代码压缩为1行,所以从异常栈比较难排查具体发生问题的代码,后面把Lambda
表达式以句点起点拆分为多行上线后观察一段时间,最终定位到发生空指针异常的代码段为Collectors.groupingBy(item -> item.getOrderStatus().getCode())
,也就是OrderDTO
实例中的orderStatus
为空对象。这里显然,groupByOrderStatus()
方法其实是被封闭在线程栈中调用,本不应该有多个线程去并发修改其中的内容,这里只剩下一个疑点:使用了parallelStream()
。后来直接把parallelStream()
修改为stream()
重新上线,该空指针问题不再复现。
Lambda/Stream
其实并不是天然线程安全的,线程安全的前提是它们本身被线程封闭调用,并且不引入多线程环境,像使用了并行流,本质就是引入了多线程环境。所以,在开发功能的时候,需要仔细思考一下:
Lambda
和流式编程?❝笔者有代码洁癖,当时还发现了上面的代码存在映射操作,正确来说应该使用map()函数,而不是forEach()去遍历元素重新装进去另一个列表,方法中的逻辑体现了原开发者其实对Lambda一知半解。
❞
回到最初那个问题,其实使用并行流也可以保证执行结果和预期一致,不过一定需要引入额外的同步机制,例如这里使用「监视器」进行同步:
public class ParallelStreamMain { public static void main(String[] args) throws Exception { List<List<Integer>> array = new ArrayList<>(); List<Integer> item1 = new ArrayList<>(); List<Integer> item2 = new ArrayList<>(); List<Integer> target = new ArrayList<>(100); array.add(item1); array.add(item2); final Object monitor = new Object(); array.parallelStream().forEach(x -> { synchronized (monitor) { for (int i = 0; i < 100000; i++) { target.add(i); } } }); System.out.println(target.size()); } } 复制代码
上面的方法无论执行多少次,最终都只会输出:200000
。这里在并行流中添加同步代码块的逻辑看起来确实比较滑稽,仅仅是为了说明如果在多线程环境下,对一个容器进行元素增加或者修改,只有添加额外的同步机制,才能保证最终的结果是符合预期的。ParallelStream是一个十分优秀的设计,但是需要考量其适用的场景,避免踏进自己为自己埋下的并发陷阱。
(本文完 c-1-d e-a-20200710)
技术公众号《Throwable文摘》(id:throwable-doge),不定期推送笔者原创技术文章(绝不抄袭或者转载):
本文使用 mdnice 排版