在Flink中对流数据进行去重计算是常有操作,如流量域对独立访客之类的统计,去重思路一般有三个:
这里以自定义布隆过滤器的方式,实现Flink窗口计算中独立访客的统计,数据集样例如下:
布隆过滤器简单点说就是哈希算法+bitmap,如上图,对字符串结合多种哈希算法,基于bitmap作为存储,由于只用0/1存储,所以可以大量节省存储空间,也就特别适合在上百亿数据里面做去重这种动作。在后续要进行字符串查找时,对要查找的字符串同样计算这多个哈希算法,根据在bitmap上的位置,可以确认该字符串一定不在或者极大概率在(由于哈希冲突问题会有极小概率误判)。
引申一下,如上所述,能对哈希冲突进行更好的优化,便能更好解决误判问题,当然也不能无限的增加多种哈希算法的策略,会相应带来计算效率的下降。
在本次开发中,使用自定义的布隆过滤器,其中对哈希算法部分做了几点优化:
myBloomFilter = new MyBloomFilter(1 << 30);
for (char c : value.toCharArray()){ result += result * 31 + c; }
另外,谷歌提供的工具Guava也包含了布隆过滤器,加入相关依赖即可使用,主要参数如下源码,输入要建立的过滤器容器大小及误判概率即可。
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, int expectedInsertions, double fpp) { return create(funnel, (long)expectedInsertions, fpp); }
package com.test.UVbloomfilter; import bean.UserBehavior; import bean.UserVisitorCount; import java.sql.Timestamp; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import redis.clients.jedis.Jedis; public class UserVisitorTest { public static void main(String[] args) throws Exception { //建立环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setParallelism(1); //指定时间语义 WatermarkStrategy<UserBehavior> wms = WatermarkStrategy .<UserBehavior>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() { @Override public long extractTimestamp(UserBehavior element, long recordTimestamp) { return element.getTimestamp() * 1000L; } }); //读取数据、映射、过滤 SingleOutputStreamOperator<UserBehavior> userBehaviorDS = env .readTextFile("input/UserBehavior.csv") .map(new MapFunction<String, UserBehavior>() { @Override public UserBehavior map(String value) throws Exception { String[] split = value.split(","); return new UserBehavior(Long.parseLong(split[0]) , Long.parseLong(split[1]) , Integer.parseInt(split[2]) , split[3] , Long.parseLong(split[4])); } }) //.filter(data -> "pv".equals(data.getBehavior())) //lambda表达式写法 .filter(new FilterFunction<UserBehavior>() { @Override public boolean filter(UserBehavior value) throws Exception { if (value.getBehavior().equals("pv")) { return true; }return false; }}) .assignTimestampsAndWatermarks(wms); //去重按全局去重,故使用行为分组,仅为后续开窗使用、开窗 WindowedStream<UserBehavior, String, TimeWindow> windowDS = userBehaviorDS.keyBy(UserBehavior::getBehavior) .window(TumblingEventTimeWindows.of(Time.hours(1))); SingleOutputStreamOperator<UserVisitorCount> processDS = windowDS .trigger(new MyTrigger()).process(new UserVisitorWindowFunc()); processDS.print(); env.execute(); } //自定义触发器:来一条计算一条(访问Redis一次) private static class MyTrigger extends Trigger<UserBehavior, TimeWindow> { @Override public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; //触发计算和清除窗口元素。 } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { } } private static class UserVisitorWindowFunc extends ProcessWindowFunction<UserBehavior,UserVisitorCount,String,TimeWindow> { //声明Redis连接 private Jedis jedis; //声明布隆过滤器 private MyBloomFilter myBloomFilter; //声明每个窗口总人数的key private String hourUVCountKey; @Override public void open(Configuration parameters) throws Exception { jedis = new Jedis("hadoop102",6379); hourUVCountKey = "HourUV"; myBloomFilter = new MyBloomFilter(1 << 30); //2^30 } @Override public void process(String s, Context context, java.lang.Iterable<UserBehavior> elements, Collector<UserVisitorCount> out) throws Exception { //1.取出数据 UserBehavior userBehavior = elements.iterator().next(); //2.提取窗口信息 String windowEnd = new Timestamp(context.window().getEnd()).toString(); //3.定义当前窗口的BitMap Key String bitMapKey = "BitMap_" + windowEnd; //4.查询当前的UID是否已经存在于当前的bitMap中 long offset = myBloomFilter.getOffset(userBehavior.getUserId().toString()); Boolean exists = jedis.getbit(bitMapKey, offset); //5.根据数据是否存在做下一步操作 if (!exists){ //将对应offset位置改为1 jedis.setbit(bitMapKey,offset,true); //累加当前窗口的综合 jedis.hincrBy(hourUVCountKey,windowEnd,1); } //输出数据 String hget = jedis.hget(hourUVCountKey, windowEnd); out.collect(new UserVisitorCount("UV",windowEnd,Integer.parseInt(hget))); } } private static class MyBloomFilter { //减少哈希冲突优化1:增加过滤器容量为数据3-10倍 //定义布隆过滤器容量,最好传入2的整次幂数据 private long cap; public MyBloomFilter(long cap) { this.cap = cap; } //传入一个字符串,获取在BitMap中的位置 public long getOffset(String value){ long result = 0L; //减少哈希冲突优化2:优化哈希算法 //对字符串每个字符的Unicode编码乘以一个质数31再相加 for (char c : value.toCharArray()){ result += result * 31 + c; } //取模,使用位与运算代替取模效率更高 return result & (cap - 1); }}}
输出结果在Redis查看如下:
学习交流,有任何问题还请随时评论指出交流。