本文为您介绍如何为实时计算Flink版自定义聚合函数(UDAF)搭建开发环境、编写业务代码及上线。
注意 阿里云实时计算Flink版共享模式暂不支持自定义函数,仅独享模式支持自定义函数。自定义聚合函数(UDAF)可以将多条记录聚合成1条记录。
/* * @param <T> UDAF的输出结果的类型。 * @param <ACC> UDAF的accumulator的类型。accumulator是UDAF计算中用来存放计算中间结果的数据类型。您可以需要根据需要自行设计每个UDAF的accumulator。 */ public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction { /* * 初始化AggregateFunction的accumulator。 * 系统在进行第一个aggregate计算之前,调用一次此方法。 */ public ACC createAccumulator(); /* * 系统在每次aggregate计算完成后,调用此方法。 */ public T getValue(ACC accumulator); }
public void accumulate(ACC accumulator, ...[用户指定的输入参数]...);说明
createAccumulator、getValue和accumulate 3个方法一起使用,可以设计出一个最基本的UDAF。但是实时计算Flink版一些特殊的场景需要您提供retract和merge两个方法才能完成。
通常,计算都是对无限流的一个提前的观测值(early firing)。既然有early firing,就会有对发出的结果的修改,这个操作叫作撤回(retract)。SQL翻译优化器会帮助您自动判断哪些情况下会产生撤回的数据,哪些操作需要处理带有撤回标记的数据。但是您需要实现一个retract方法来处理撤回的数据。public void retract(ACC accumulator, ...[您指定的输入参数]...);说明
public void merge(ACC accumulator, Iterable<ACC> its);说明
搭建开发环境请参见环境搭建。
import org.apache.flink.table.functions.AggregateFunction; public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> { //定义存放count UDAF状态的accumulator的数据的结构。 public static class CountAccum { public long total; } //初始化count UDAF的accumulator。 public CountAccum createAccumulator() { CountAccum acc = new CountAccum(); acc.total = 0; return acc; } //getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。 public Long getValue(CountAccum accumulator) { return accumulator.total; } //accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。 public void accumulate(CountAccum accumulator, Object iValue) { accumulator.total++; } public void merge(CountAccum accumulator, Iterable<CountAccum> its) { for (CountAccum other : its) { accumulator.total += other.total; } } }
说明 AggregateFunction的子类支持open和close方法作为可选方法,请参见 自定义标量函数(UDF)或 自定义表值函数(UDTF)的写法。
参数名称 | 说明 |
---|---|
上传方式 | 实时计算控制台上仅支持本地上传。 说明 本地上传JAR包的大小上限为300 MB。如果JAR包大小超过300 MB,请在集群绑定的OSS控制台上,或通过OpenAPI的方式上传JAR包。 |
资源选择 | 单击选择资源,选择需要引用的资源。 |
资源名称 | 输入资源名称。 |
资源备注 | 输入资源备注信息。 |
资源类型 | 选择引用资源类型,JAR、DICTIONARY或PYTHON。 |
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
自定义聚合函数(UDAF)的上线和启动步骤, 请参见上线和启动。
-- UDAF计算count CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf'; create table sls_stream( a int, b bigint, c varchar ) with ( type='sls', endPoint='yourEndpoint', accessKeyId='yourAccessId', accessKeySecret='yourAccessSecret', startTime='2017-07-04 00:00:00', project='<yourPorjectName>', logStore='stream-test2', consumerGroup='consumerGroupTest3' ); create table rds_output( len1 bigint, len2 bigint ) with ( type='rds', url='yourDatabaseURL', tableName='<yourDatabaseTableName>', userName='<yourDatabaseUserName>', password='<yourDatabasePassword>' ); insert into rds_output select count(a), countUdaf(a) from sls_stream;