了解使用 Java Streams 解决问题的直接途径,Java Streams 是一个允许我们快速有效地处理大量数据的框架。
当我们对列表中的元素进行分组时,我们可以随后聚合分组元素的字段以执行有意义的操作,帮助我们分析数据。一些示例是加法、平均值或最大值/最小值。这些单个字段的聚合可以使用 Java Streams 和 Collectors 轻松完成。该文档提供了如何进行这些类型计算的简单示例。
但是,还有更复杂的聚合,例如加权平均值、几何平均值。此外,可能需要同时聚合多个字段。在本文中,我们将展示使用 Java Streams 解决此类问题的直接途径。使用这个框架使我们能够快速有效地处理大量数据。
我们假设读者对Java Streams和实用程序Collectors类有基本的了解。
让我们考虑一个简单的例子来展示我们想要解决的问题类型。我们将使它非常通用,以便我们可以轻松地概括它。让我们考虑TaxEntry
由以下代码定义的实体列表:
public class TaxEntry { private String state; private String city; private int numEntries; private double price; //Constructors, getters, hashCode, equals etc }
计算给定城市的条目总数非常简单:
Map<String, Integer> totalNumEntriesByCity = taxes.stream().collect(Collectors.groupingBy(TaxEntry::getCity, Collectors.summingInt(TaxEntry::getNumEntries)));
Collectors.summingInt(TaxEntry::getNumEntries)));
Collectors.groupingBy
接受两个参数:一个分类器函数进行分组,一个收集器对属于给定组的所有元素进行下游聚合。我们TaxEntry::getCity
用作分类器函数。对于下游,我们使用Collectors::summingInt
which 返回一个Collector
总和我们为每个分组元素获得的税收条目的数量。
如果我们尝试找到复合分组,事情会稍微复杂一些。例如,对于前面的示例,给定州和 城市的条目总数。有几种方法可以做到这一点,但一个非常简单的方法是首先定义:
record StateCityGroup(String state, String city) {}
请注意,我们使用的是 Javarecord
,这是一种定义不可变类的简洁方法。此外,Java 编译器为我们生成字段访问器方法hashCode
、、等号和toString
实现。有了这个,现在的解决方案很简单:
Map<StateCityGroup, Integer> totalNumEntriesForStateCity = taxes.stream().collect(groupingBy(p -> new StateCityGroup(p.getState(), p.getCity()), Collectors.summingInt(TaxEntrySimple::getNumEntries)) );
因为Collectors::groupingBy
我们使用 lambda 表达式设置分类器函数,该表达式创建一个StateCityGroup
封装每个州-城市的新记录。下游 Collector 和之前一样。
注意:为了简洁起见,在代码示例中,我们将假设 Collectors 类的所有方法都是静态导入的,因此我们不必显示它们的类限定。
如果我们想同时进行多个聚合,事情开始变得更加复杂。例如,查找给定州和城市的条目数和平均价格之和。该库没有为这个问题提供简单的解决方案。
为了开始解决这个问题,我们从之前的聚合中获取线索,并定义一个记录来封装所有需要聚合的字段:
record TaxEntryAggregation (int totalNumEntries, double averagePrice ) {}
现在,我们如何同时对两个字段进行聚合?正如以下代码中所建议的那样,总是有可能进行两次流收集以分别查找每个聚合:
Map<StateCityGroup, TaxEntryAggregation> aggregationByStateCity = taxes.stream().collect( groupingBy(p -> new StateCityGroup(p.getState(), p.getCity()), collectingAndThen(Collectors.toList(), list -> {int entries = list.stream().collect( summingInt(TaxEntrySimple::getNumEntries)); double priceAverage = list.stream().collect( averagingDouble(TaxEntrySimple::getPrice)); return new TaxEntryAggregation(entries, priceAverage);})));
分组像以前一样完成,但对于下游,我们使用Collectors::collectingAndThen
(第 3 行)进行聚合。这个函数有两个参数:
Collectors::toList()
在第 3 行中使用)TaxEntryAggregation
记录 中返回想象一下,我们想同时进行更多的字段聚合。我们需要相应地增加下游列表中的流数量。代码变得效率低下、重复性非常高且不太理想。我们应该寻找更好的替代品。
此外,问题还不止于此,一般来说,我们受限于可以使用 Collectors 辅助类进行的聚合类型。他们的方法 summing*、averaging* 和 summarizing* 仅支持整数、长整数和双精度本机类型。如果我们有更复杂的类型,比如BigInteger
or ,我们该怎么办BigDecimal
?
雪上加霜的是,summarizing* 方法仅提供 min、max、count、sum 和 average 的汇总统计数据。如果我们想要执行更复杂的计算,例如加权平均值或几何平均值怎么办?
有些人会争辩说我们总是可以编写自定义收集器,但这需要了解收集器接口并很好地理解流收集器流程。使用 Collectors 类中的实用方法提供的内置收集器更直接。在下一节中,我们将展示一些关于如何实现此目的的策略。
让我们考虑一个简单的例子,它将突出我们在上一节中提到的挑战。假设我们有以下实体:
public class TaxEntry { private String state; private String city; private BigDecimal rate; private BigDecimal price; record StateCityGroup(String state, String city) { } //Constructors, getters, hashCode/equals etc }
我们首先询问每个不同的州-城市对如何找到条目的总数以及rate
与price
(∑(rate * price)) 的乘积的总和。请注意,我们正在使用BigDecimal
.
正如我们在上一节中所做的那样,我们定义了一个封装聚合的类:
record RatePriceAggregation(int count, BigDecimal ratePrice) {}
起初可能看起来令人惊讶,但是对于后面跟着简单聚合的分组的直接解决方案是使用Collectors::toMap
.让我们看看我们将如何做到这一点:
Map<StateCityGroup, RatePriceAggregation> mapAggregation = taxes.stream().collect( toMap(p -> new StateCityGroup(p.getState(), p.getCity()), p -> new RatePriceAggregation(1, p.getRate().multiply(p.getPrice())), (u1,u2) -> new RatePriceAggregation( u1.count() + u2.count(), u1.ratePrice().add(u2.ratePrice())) ));
(第Collectors::toMap
2 行)接受三个参数,我们执行以下实现:
StateCityGroup
为地图的键。这将按州和城市对元素进行分组(第 2 行)。RatePriceAggregation
一个计数为 1 以及 rate 和 price 的乘积的初始化(第 3 行)。BinaryOperator
用于合并多个元素映射到同一个州-城市键的情况。我们将计数和价格相加以进行汇总(第 4 行)。让我们演示如何设置一些示例数据:
List<TaxEntry> taxes = Arrays.asList( new TaxEntry("New York", "NYC", BigDecimal.valueOf(0.2), BigDecimal.valueOf(20.0)), new TaxEntry("New York", "NYC", BigDecimal.valueOf(0.4), BigDecimal.valueOf(10.0)), new TaxEntry("New York", "NYC", BigDecimal.valueOf(0.6), BigDecimal.valueOf(10.0)), new TaxEntry("Florida", "Orlando", BigDecimal.valueOf(0.3), BigDe
从前面的代码示例中获取纽约的结果很简单:
System.out.println("New York: " + mapAggregation.get(new StateCityGroup("New York", "NYC")));
这打印:
New York: RatePriceAggregation[count=3, ratePrice=14.00]
这是一个直接的实现,它决定了多个字段和非原始数据类型(BigDecimal
在我们的例子中)的分组和聚合。但是,它的缺点是它没有任何终结器允许您执行额外的操作。例如,你不能做任何类型的平均值。
为了展示这个问题,让我们考虑一个更复杂的问题。假设我们想要找到费率-价格的加权平均值,以及每个州和城市对的所有价格的总和。特别是,要找到加权平均值,我们需要计算属于每个州-城市对的所有条目的费率和价格的乘积之和,然后除以每个案例的条目总数 n: 1/n ∑(费率 * 价格)。
为了解决这个问题,我们开始定义一个包含聚合的记录:
TaxEntryAggregation(int count, BigDecimal weightedAveragePrice, BigDecimal totalPrice) {}
有了这个,我们可以进行以下实现:
Map<StateCityGroup, TaxEntryAggregation> groupByAggregation = taxes.stream().collect( groupingBy(p -> new StateCityGroup(p.getState(), p.getCity()), mapping(p -> new TaxEntryAggregation(1, p.getRate().multiply(p.getPrice()), p.getPrice()), collectingAndThen(reducing(new TaxEntryAggregation(0, BigDecimal.ZERO, BigDecimal.ZERO), (u1,u2) -> new TaxEntryAggregation(u1.count() + u2.count(), u1.weightedAveragePrice().add(u2.weightedAveragePrice()), u1.totalPrice().add(u2.totalPrice())) ), u -> new TaxEntryAggregation(u.count(), u.weightedAveragePrice().divide(BigDecimal.valueOf(u.count()), 2, RoundingMode.HALF_DOWN), u.totalPrice()) ) ) ));
我们可以看到代码稍微复杂一些,但可以让我们得到我们正在寻找的解决方案。我们将更详细地关注它:
Collectors::groupingBy
(第 2 行):StateCityGroup
记录Collectors::mapping
(第 3 行):TaxEntryAggregation
将初始计数分配为 1 的新条目,将税率乘以价格,然后设置价格(第 3 行)。Collectors::collectingAndThen
(第 4 行),正如我们将看到的,这将允许我们对下游收集器应用一个完成转换。
Collectors::reducing
(第 4 行)TaxEntryAggregation
以涵盖没有下游元素的情况(第 4 行)。TaxEntryAggregation
包含字段聚合的新表达式(第 5、6 7 行)TaxEntryAggregation
(第 9、10、11 行)。我们看到这种实现不仅允许我们同时进行多个字段聚合,而且还可以在多个阶段执行复杂的计算。
这可以很容易地推广到解决更复杂的问题。路径很简单:定义一条记录,封装所有需要聚合的字段,Collectors::mapping
用来初始化记录,然后申请Collectors::collectingAndThen
做归约和最终聚合。
和以前一样,我们可以获得纽约的聚合:
System.out.println("Finished aggregation: " + groupByAggregation.get(new StateCityGroup("New York", "NYC")));
我们得到结果:
Finished aggregation: TaxEntryAggregation[count=3, weightedAveragePrice=4.67, totalPrice=40.0]
还值得指出的是,由于TaxEntryAggregation
是 Java record
,它是不可变的,因此可以使用流收集器库提供的支持来并行计算。
我们已经展示了几种策略来使用聚合进行复杂的多字段分组,这些聚合包括具有多字段和跨字段计算的非原始数据类型。这是一个使用 Java 流和 Collectors API 的记录列表,因此它为我们提供了快速有效地处理大量数据的能力。