本人主要分享日常工作过程中值得关切的 flink代码编写技巧。
以下字段必备
created_time datetime
modified_time datetime
is_deleted smallint(6) NULL COMMENT "是否删除,1是0否",
doris_delete
tinyint(4) NULL DEFAULT "0" COMMENT "删除标记" 此字段是业务逻辑删除字段,当发现数据被物理删除时,使用此字段标记为1
alter table uc_student_bak enable feature "BATCH_DELETE"; 此处很关键标记表可以批量删除,这样当 merge标记为 delete时, doris_delete =1 的数据会被删除掉
实现 RichSinkFunction 接口 , 各种不同的 sink 存储支持 最好都单独搞一个class 继承 RichSinkFunction 接口
附属贴上 sink doris代码
public class DorisSinkFunction extends RichSinkFunction<String> { private static final Logger log = LoggerFactory.getLogger(DorisSinkFunction.class); //累加器对象 private final LongCounter counter = new LongCounter(); private HttpClientBuilder builder; private String loadUrl; private String authorization; private String username; private String password; private String profile; private String mergeType; private String dbName; private String tableName; public static DorisSinkFunction of(String profile, String mergeType, String dbName, String tableName) { return new DorisSinkFunction(profile, mergeType, dbName, tableName); } private DorisSinkFunction(String profile, String mergeType, String dbName, String tableName) { this.profile = profile; this.mergeType = mergeType; this.dbName = dbName; this.tableName = tableName; } private String basicAuthHeader() { final String tobeEncode = username + ":" + password; byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); return "Basic " + new String(encoded); } private HttpClientBuilder httpClientBuilder() { return HttpClients.custom() //添加重试策略 .setRetryHandler(new HttpRequestRetryHandler() { @Override public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { //异常超过3次停止重试 if (executionCount > 3) { return false; } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); return false; } return true; } }) //支持重定向 .setRedirectStrategy(new DefaultRedirectStrategy() { @Override protected boolean isRedirectable(String method) { return true; } }); } @Override public void open(Configuration parameters) throws Exception { //获取配置 final Properties props = PropertyUtils.getDorisProps(profile); this.username = props.getProperty("username"); this.password = props.getProperty("password"); //初始化authorization this.authorization = basicAuthHeader(); //初始化http client builder this.builder = httpClientBuilder(); //构建doris stream load url this.loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", props.getProperty("feHost"), props.getProperty("httpPort"), dbName, tableName); //注册累加器 getRuntimeContext().addAccumulator("counter", counter); } @Override public void close() throws Exception { log.warn(" write doris http client execute close, dt: {}", DateTimeUtils.getCurrentDt()); } @Override public void invoke(String value, Context context) throws Exception { try (CloseableHttpClient client = this.builder.build()) { //创建put对象 final HttpPut put = new HttpPut(loadUrl); put.setHeader(HttpHeaders.EXPECT, "100-continue"); put.setHeader(HttpHeaders.AUTHORIZATION, this.authorization); put.setHeader("strip_outer_array", "true");//批量插入传json数组必须要有此配置 put.setHeader("format", "json"); put.setHeader("label", UUID.randomUUID().toString().replace("-", "")); //添加doris batch delete header if ("MERGE".equalsIgnoreCase(mergeType)) { put.setHeader("merge_type", "MERGE"); put.setHeader("delete", "doris_delete=1"); } put.setEntity(new StringEntity(value, "UTF-8")); //execute try (final CloseableHttpResponse response = client.execute(put)) { String loadResult = null; String loadStatus = null; if (response.getEntity() != null) { loadResult = EntityUtils.toString(response.getEntity()); loadStatus = JSONObject.parseObject(loadResult).getString("Status"); } final int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != 200 || !"Success".equalsIgnoreCase(loadStatus)) { String msg = String.format(" stream_load_failed, statusCode=%s loadResult=%s", statusCode, loadResult); log.error(msg); throw new RuntimeException(msg); } } catch (Exception e) { log.error(" stream_load_to_doris execute error: {}", e); throw new RuntimeException(" stream_load_to_doris execute error. ", e); } } catch (Exception e) { log.error(" stream_load_to_doris invoke error: {}", e); throw new RuntimeException(" stream_load_to_doris invoke error. ", e); } //累加器自加 counter.add(1); } }
专注于配置化解决任务运行问题
可以每个实时任务 一个 properties文件记录关键元数据,每个任务基于这些元数据进行任务运行;常用的properties格式如下:
job-config.properties 一般和运行脚本在一个目录
小节: 以下从指定的CK启动,则不会管 kafka source 的 StartFrom设置的是啥,只会从CK state中存储的 offset开始消费;
要设置env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);不然cancel后cp就清除了
# 运行环境 prod 生产 test 测试 profile=prod # 任务名称 jobName=dwd-rt-xx # 消费组 groupId=flink_dwd_rt_xx # 消费topic sourceTopic=topic1 # sink topic sinkTopic=topic2 # 待存储的表, 可能是 TIDB CK DORIS HBASE ORACLE MYSQL 等各种数据存储类型 sinkTable=table1 # 待存储的topic sinkTopic= topic1 # sink 支持的数据库类型 sink=doris,tidb # 以下主要定义维表 join是 比如根据 b1 从维度表中获取对应的字段值,然后执行 然后执行 map merge 就是维表join的结果,最终map中key变更为 b2 xxx_mapping=a, b1->b2
以上任务配置完之后,需要运行flink 任务在没有实时计算平台的情况下, 最简单的办法就是搞个提交堡垒机 ,在该堡垒机上需要设置如下信息:
以下脚本运行时可以把公共变量抽象到单独的脚本中去,比如 common_env.sh
然后在运行脚本中 source ../../common_env.sh 引入这个公共脚本。
#!/bin/bash # flink 启动客户端地址 flink_bin=/xxx/flink-1.2.1-yarn/flink-1.12.1/bin/flink # ck 地址 cp_root_path=hdfs://xxxcluster/flink_realedw/cluster/checkpoints # 提交yarn任务实时队列 queue=xxxqueue # 任务名称 app_name="ods-gk-xxx" # 实时任务 main-class class_ref=cn.xxx.xxx.OdsGroupSummary # 当前实时任务jar包地址 jar_dir=../xxx-1.0-SNAPSHOT-shaded.jar # 当前作业依赖配置文件相对路径,真实任务中会解析该配置文件 config_path=job-config.properties # 实时任务默认并行度 parallelism=1 # grep JobID log.log | cut -d" " -f7 从flink启动日志中解析出 JobID # grep 'yarn application -kill' log.log |cut -d" " -f5 从启动日志中解析出 yarn-job-ID # --config-path $config_path 此参数对应业务配置文件的地址 case $1 in "cancel"){ echo "================ cancel flilk job ================" yarn_app_id=`grep 'yarn application -kill' log.log |cut -d" " -f5` job_id=`grep JobID log.log | cut -d" " -f7` echo "LastAppId: $yarn_app_id LastJobID: $job_id" $flink_bin cancel -m yarn-cluster -yid $yarn_app_id $job_id };; "cp"){ echo "================ start from checkpoint ================" job_id=`grep JobID log.log | cut -d" " -f7` cp_path=`hadoop fs -ls ${cp_root_path}/${job_id}/ | grep chk- | awk -F" " '{print$8}' |sort -nr |head -1` echo "LastJobID: $job_id CheckpointPath: $cp_path" nohup $flink_bin run -d -t yarn-per-job \ -Dyarn.application.queue=$queue \ -Dyarn.application.name=$app_name \ -p $parallelism \ -c $class_ref \ -s $cp_path \ $jar_dir \ --config-path $config_path >./log.log 2>&1 & };; "sp"){ echo "================ start from savepoint ================" echo "SavepointPath: $2" nohup $flink_bin run -d -t yarn-per-job \ -Dyarn.application.queue=$queue \ -Dyarn.application.name=$app_name \ -p $parallelism \ -c $class_ref \ -s $2 \ $jar_dir \ --config-path $config_path >./log.log 2>&1 & };; *){ echo "================ start ================" nohup $flink_bin run -d -t yarn-per-job \ -Dyarn.application.queue=$queue \ -Dyarn.application.name=$app_name \ -p $parallelism \ -c $class_ref \ $jar_dir \ --config-path $config_path >./log.log 2>&1 & };; esac # 将shell脚本进程ID写入 pid.pid 文件 echo $! > ./pid.pid
对于HBASE的操作,可以使用建立外表基于 HBASE创建,通过对外表 hive的操作达到操作 HBASE的目的。
## hbase DDL create 'rtdw:dwd_rt_dim_ecproductdb_product_xxx', 'cf' ## hive on hbase DDL CREATE EXTERNAL TABLE dwd.dwd_rt_dim_ecproductdb_product_xxx_hb ( key String, id String, product_id String ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:id,cf:product_id") TBLPROPERTIES ("hbase.table.name" = "rtdw:dwd_rt_dim_ecproductdb_product_xxx"); ## hive sql insert overwrite table dwd.dwd_rt_dim_ecproductdb_product_course_hb values('111','222','xxx1'),('222','333','xxx2') ## 以上通过hive插入之后,会发现最近效果显现在HBASE中,HBASE也有结果了
TypeUtils.castToString TypeUtils 类中包含了很多强制将 Object 类型 转换为其他类型的方法。
拼装SQL中的各种类型信息可以直接参考 java.sql.Types 类下面定义了所有的类型信息。