Table和SQL的关系:SQL是Table的继承封装(这点在Flink的概念有所体现),比如说:StreamTableEnvironment继承自TableEnvironment便可体现出来。故官文中Table的使用均可在SQL中体现出来,比如说自定义函数
,Table API & SQL下的自定义函数中只给出了Table方式的TableEnvironment 创建自定义函数,我们可以修改为ste对象实现在SQL中。
利用FlinkSQL进行Redis维表信息转换。redis获取维表信息后存储在函数中。
producer代码引用:FlinkSQL使用DDL创建Kafka生产和消费者其中的生产数据类型由json改为csv(此文中补充有)。
或者使用Table的方式:Flink SQL & Table简单实例
生产者DDL:
String ddl = "CREATE TABLE CbryProduce(\n" + "phoneNum STRING,\n" + "rechargeNum STRING,\n" + "provinceCode STRING,\n" + "cityCode STRING,\n" + "rechargeChannelCode STRING\n" + ") WITH(\n" + "'connector.type'='kafka',\n" + "'connector.version'='universal',\n" + "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n" + //"'connector.properties.bootstrap.servers'='localhost:9092',\n" + "'connector.topic'='event_topic_1',\n" + "'format.type'='csv',\n" + "'format.field-delimiter'='|'\n" + ")\n" ;
String insert2 = "insert into CbryProduce(phoneNum,rechargeNum,provinceCode,cityCode,rechargeChannelCode)" + "values('1024','100','051','0750','2')";
如何生成JedisCluster对象插入数据:
Redis(一) Jedis单机和集群连接
Redis(三)redisTemplate实操和五种基础数据类型
// 模拟数据创建 // Map<String, String> cityDimensionMap = new HashedMap(); // cityDimensionMap.put("0020", "广州"); // cityDimensionMap.put("0750", "深圳"); // // Map<String, String> rechargeChannelsMap = new HashedMap(); // rechargeChannelsMap.put("1", "手机app充值"); // rechargeChannelsMap.put("2", "营业厅充值"); // // jedisCluster.hmset("CityCode", cityDimensionMap); // jedisCluster.hmset("RechargeChannels", rechargeChannelsMap); // System.out.println(jedisCluster.hgetAll("CityCode")); // System.out.println(jedisCluster.hgetAll("RechargeChannels")); // System.out.println(jedisCluster.get("testttt")); //空值返回null
如何使用FlinkSQL:FlinkSQL使用DDL创建Kafka生产和消费者或者使用Table的方式:Flink SQL & Table简单实例
这里在ScalarFunction我们只要通过自定义/重载一个eval方法
即可:
如下:我们对自定义函数传入一个cityNum
返回cityCodeMap对应的值。
// define function logic // 自定义SQL函数 public static class cityCodeTranslateFunction extends ScalarFunction{ Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode"); public String eval(String cityNum) { String res = cityCodeMap.get(cityNum); return res == null ? "Error" : res; } }
将我们的自定义函数引入SQL的StreamTableEnvironment执行环境中
//StreamTableEnvironment继承自TableEnvironment ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class); ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class);
Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)" + " from CbryConsumer"); DataStream<Row> result = ste.toAppendStream(queryTable, Row.class); result.printToErr();
1> 1024,100,0750,深圳,051,2,营业厅充值
1> 1024,100,0020,广州,051,1,手机app充值
public class UserDefinedFuctions { static JedisCluster jedisCluster; private static GenericObjectPoolConfig getGenericObjectPoolConfig() { GenericObjectPoolConfig genericObjectPool = new GenericObjectPoolConfig(); genericObjectPool.setMaxIdle(10); genericObjectPool.setMaxTotal(100); genericObjectPool.setMinEvictableIdleTimeMillis(30000); // 逐出连接的最小空闲时间 30s genericObjectPool.setSoftMinEvictableIdleTimeMillis(60000); // 空闲逐出时间1分钟 return genericObjectPool; } static { HostAndPort hostAndPort = new HostAndPort("ip", 7000); HostAndPort hostAndPort2 = new HostAndPort("ip", 7001); HostAndPort hostAndPort3 = new HostAndPort("ip", 7000); HostAndPort hostAndPort4 = new HostAndPort("ip", 7001); HostAndPort hostAndPort5 = new HostAndPort("ip", 7000); HostAndPort hostAndPort6 = new HostAndPort("ip", 7001); Set<HostAndPort> hostAndPortSet = new HashSet<>(); hostAndPortSet.add(hostAndPort); hostAndPortSet.add(hostAndPort2); hostAndPortSet.add(hostAndPort3);hostAndPortSet.add(hostAndPort4);hostAndPortSet.add(hostAndPort5);hostAndPortSet.add(hostAndPort6); jedisCluster = new JedisCluster(hostAndPortSet, 6000, 6000, 10, password,UserDefinedFuctions.getGenericObjectPoolConfig()); // 模拟数据创建 // Map<String, String> cityDimensionMap = new HashedMap(); // cityDimensionMap.put("0020", "广州"); // cityDimensionMap.put("0750", "深圳"); // // Map<String, String> rechargeChannelsMap = new HashedMap(); // rechargeChannelsMap.put("1", "手机app充值"); // rechargeChannelsMap.put("2", "营业厅充值"); // // jedisCluster.hmset("CityCode", cityDimensionMap); // jedisCluster.hmset("RechargeChannels", rechargeChannelsMap); // System.out.println(jedisCluster.hgetAll("CityCode")); // System.out.println(jedisCluster.hgetAll("RechargeChannels")); // System.out.println(jedisCluster.get("testttt")); //空值返回null } // define function logic // 自定义SQL函数 public static class cityCodeTranslateFunction extends ScalarFunction{ Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode"); public String eval(String cityNum) { String res = cityCodeMap.get(cityNum); return res == null ? "Error" : res; } } public static class rechargeChannelTranslateFunction extends ScalarFunction{ Map<String, String> rechargeChannelsMap = jedisCluster.hgetAll("RechargeChannels"); public String eval(String rechargeChannel) { String res = rechargeChannelsMap.get(rechargeChannel); return res == null ? "Error" : res; } } public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode() // .useOldPlanner() // flink .useBlinkPlanner() // blink .build(); StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings); String ddl = "CREATE TABLE CbryConsumer(\n" + "phoneNum String,\n" + "rechargeNum String,\n" + "provinceCode String,\n" + "cityCode String,\n" + "rechargeChannelCode String\n" + ") WITH(\n" + "'connector.type'='kafka',\n" + "'connector.version'='universal',\n" + "'connector.properties.group.id'='g2_group',\n" + "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n" + "'connector.topic'='event_topic_1',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'format.type'='csv',\n" + "'format.field-delimiter'='|'\n" + ")\n" ; ste.executeSql(ddl); //StreamTableEnvironment继承自TableEnvironment ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class); ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class); Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)" + " from CbryConsumer"); DataStream<Row> result = ste.toAppendStream(queryTable, Row.class); result.printToErr(); try { env.execute(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
在实际使用下,我们不可能说实现一个函数写一次代码,如何实现”交互“形态的自定义函数呢? 答曰:使用java的多态进行重载构造函数:
// define function logic // 自定义SQL函数 public static class AutoAdaptaMapDefineFunction extends ScalarFunction { Map<String, String> redisMap; public AutoAdaptaMapDefineFunction(String dimensionName) { redisMap = jedisCluster.hgetAll(dimensionName); } public String eval(String dimensionKey) { String res = redisMap.get(dimensionKey); return res == null ? "Error" : res; } } //ste.createTemporarySystemFunction("cityTranslate", new AutoAdaptaMapDefineFunction("CityCodeDimensionMapKey"));
对于交互式会话,还可以在使用或注册函数之前对其进行参数化。在这种情况下,可以将函数实例而不是函数类用作临时函数。
它要求参数是可序列化的
,以便将函数实例传送到集群。
void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass); void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);
这也就给我们提供了自定义类加载器,指定特定class对象进行函数注入Flink作业的可能。