配置1:vim flink-conf.yml 流式写入hive需要配置检查点 # state.backend: filesystem state.backend: filesystem # 取消的时候保存检查点 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION # 60s 一次检查点 execution.checkpointing.interval: 60s # 检查点语意 execution.checkpointing.mode: EXACTLY_ONCE # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints state.checkpoints.dir: file:///tmp/flink12-checkpoints # Default target directory for savepoints, optional. # # state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints state.savepoints.dir: file:///tmp/flink12-savepoints 配置2,使用FlinkSQL-client需要配置 vim sql-client-defaults.yaml catalogs: #[] # empty list # A typical catalog definition looks like: - name: uathive type: hive hive-conf-dir: /etc/hive/conf default-database: temp
写sql作业
set execution.type=streaming; --使用hive方言 SET table.sql-dialect=hive; --创建一张hive分区表,按天,时分区 drop table if exists ods_hive_t_area; CREATE TABLE ods_hive_t_area ( `id` int COMMENT '代号', `name` string COMMENT '姓名', `area_id` int COMMENT '区域', `money` int COMMENT '销售额' ) PARTITIONED BY (dt STRING,hr string,mi string) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.trigger'='process-time', 'sink.partition-commit.delay'='1 min', --'sink.partition-commit.policy.kind'='metastore,success-file' 'sink.partition-commit.policy.kind'='success-file' );
drop table if exists ods_source_2hive_kafka_t_area; create table ods_source_2hive_kafka_t_area( `before` row(id int,name string,area_id int ,money int), `after` row(id int,name string,area_id int ,money int), op string ) with( 'connector' = 'kafka', 'topic' = 'ods_t_area1', 'properties.bootstrap.servers' = '10.16.74.34:9092', 'properties.group.id' = 'ods_t_area1_group2hive', --value值可为 latest-offset | earliest-offset 'scan.startup.mode' = 'earliest-offset', --此处的key用的format,默认是对josn中value的数据进行定义,此时='value.format', 当json中的数据有类型错误时,该字段会给null值。 'format' = 'json', --如果给true, 则错误格式可以忽略,给null值,如果给false,则会导致读取数据错误,读取中断, 仅限于json数据使用此选项 'json.ignore-parse-errors'='true' );
INSERT INTO ods_hive_t_area select case when op='d' and after is null then before.id else after.id end , case when op='d' and after is null then null else after.name end , case when op='d' and after is null then null else after.area_id end , case when op='d' and after is null then null else after.money end, cast(minute(localtimestamp) as string) FROM ods_source_2hive_kafka_t_area;
遇到的问题:
[hive@m764 lib]$ hadoop fs -ls -R /user/hive/warehouse/temp.db/ods_hive_t_area/ drwxrwxr-x - hive hive 0 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26 -rw-r--r-- 1 hive hive 0 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26/_SUCCESS -rw-r--r-- 1 hive hive 1156 2021-05-06 17:26 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26/part-f35d61fa-6a8d-4a51-a59f-83c597c6c42c-0-0 drwxrwxr-x - hive hive 0 2021-05-06 17:29 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27 -rw-r--r-- 1 hive hive 0 2021-05-06 17:29 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27/_SUCCESS -rw-r--r-- 1 hive hive 541 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27/part-f35d61fa-6a8d-4a51-a59f-83c597c6c42c-0-1
显示成功写入hive,有_seccess文件,但是select 不到数据
解决:刷新一下元数据
msck repair table ods_hive_t_area;
然后可以查到hive中的数据了