依赖pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-2.2_2.11</artifactId> <version>1.12.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-hbase-2.2_2.11</artifactId> <version>1.12.1</version> </dependency> < <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>module-info.class</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <!--这块很重要,采用追加的方式--> <transformers combine.children="append"> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.juneyaoair.dataplatform.service.RealTimeLableApplication</mainClass> </transformer> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
Flink SQL Source 连接Kafka
CREATE TABLE KAFKA_SOURCE_TBL_MEMBER_INFO_HBASE ( table_name STRING NOT NULL, op_type STRING, op_ts STRING, op_ts_time as CAST(op_ts as timestamp), current_ts STRING, pos STRING, before ROW < `ADDRESS_STATUS` STRING, `AUDITOR_ID` STRING, `BENEFIC_TYPE` STRING, `CARD_STATUS_CODE` STRING, `CHILD_SIGN` STRING, `CLASS_PREFER` STRING, `COMMENTS` STRING, `DEGRADE_SIGN` STRING, `EFFECTIVE_DATE` STRING, `EXPIRE_DATE` STRING, `ID` STRING, `INVITER_CARD_NO` STRING, `IS_ACCOUNT_CLOSED` STRING, `IS_LEVEL_EXPIRE` STRING, `IS_MILEAGE_EXPIRE` STRING, `IS_SMALL_EXEMPT_PWD` STRING, `IS_SMOKING` STRING, `IS_TEST_MEMBER` STRING, `MAIL_ADDRESS_TYPE` STRING, `MAIL_LANGUAGE_CODE` STRING, `MEAL_PREFER` STRING, `MEMBER_LEVEL_CODE` STRING, `MEMBER_STATUS_CODE` STRING, `MEMBER_STATUS_DATE` STRING, `MULTIPLIER_MILES_SIGN` STRING, `OPERATE_DATE` STRING, `OPERATE_USER_ID` STRING, `QUALIFICATION_REASON_CODE` STRING, `REDEEM_SIGN` STRING, `REGIST_DATE` STRING, `REGIST_SOURCE` STRING, `SEAT_PREFER` STRING, `SMS_STATUS` STRING, `SPECIAL_ASSISTANCE` STRING, `STATEMENT_SEND_SIGN` STRING, `STATEMENT_SEND_TYPE` STRING, `SUBMIT_DATE` STRING, `UNITED_CARD_SIGN` STRING, `UPDATE_DATE` STRING, `UPDATE_STATUS_SIGN` STRING, `UPDATE_USER_ID` STRING, `UPGRADE_SIGN` STRING >, after ROW < `ADDRESS_STATUS` STRING, `AUDITOR_ID` STRING, `BENEFIC_TYPE` STRING, `CARD_STATUS_CODE` STRING, `CHILD_SIGN` STRING, `CLASS_PREFER` STRING, `COMMENTS` STRING, `DEGRADE_SIGN` STRING, `EFFECTIVE_DATE` STRING, `EXPIRE_DATE` STRING, `ID` STRING, `INVITER_CARD_NO` STRING, `IS_ACCOUNT_CLOSED` STRING, `IS_LEVEL_EXPIRE` STRING, `IS_MILEAGE_EXPIRE` STRING, `IS_SMALL_EXEMPT_PWD` STRING, `IS_SMOKING` STRING, `IS_TEST_MEMBER` STRING, `MAIL_ADDRESS_TYPE` STRING, `MAIL_LANGUAGE_CODE` STRING, `MEAL_PREFER` STRING, `MEMBER_LEVEL_CODE` STRING, `MEMBER_STATUS_CODE` STRING, `MEMBER_STATUS_DATE` STRING, `MULTIPLIER_MILES_SIGN` STRING, `OPERATE_DATE` STRING, `OPERATE_USER_ID` STRING, `QUALIFICATION_REASON_CODE` STRING, `REDEEM_SIGN` STRING, `REGIST_DATE` STRING, `REGIST_SOURCE` STRING, `SEAT_PREFER` STRING, `SMS_STATUS` STRING, `SPECIAL_ASSISTANCE` STRING, `STATEMENT_SEND_SIGN` STRING, `STATEMENT_SEND_TYPE` STRING, `SUBMIT_DATE` STRING, `UNITED_CARD_SIGN` STRING, `UPDATE_DATE` STRING, `UPDATE_STATUS_SIGN` STRING, `UPDATE_USER_ID` STRING, `UPGRADE_SIGN` STRING >, data_row AS case when op_type = 'D' then before else after end, watermark for op_ts_time as op_ts_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'SEM.FFPTEST.HOFFPDEV.TBL_MEMBER_INFO', 'properties.bootstrap.servers' = '172.22.17.26:9092,172.22.17.27:9092,172.22.17.28:9092', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id' = 'kafka-flink-sync-hbase-local', 'properties.fetch.max.bytes' = '5242880', 'format' = 'json' )
Flink SQL Sink Hbase 需要提前在Hbase中建表
CREATE TABLE SYNC_HBASE_SINK_TBL_MEMBER_INFO( rowkey STRING, field ROW < `ADDRESS_STATUS` STRING, `AUDITOR_ID` STRING, `BENEFIC_TYPE` STRING, `CARD_STATUS_CODE` STRING, `CHILD_SIGN` STRING, `CLASS_PREFER` STRING, `COMMENTS` STRING, `DEGRADE_SIGN` STRING, `EFFECTIVE_DATE` STRING, `EXPIRE_DATE` STRING, `ID` STRING, `INVITER_CARD_NO` STRING, `IS_ACCOUNT_CLOSED` STRING, `IS_LEVEL_EXPIRE` STRING, `IS_MILEAGE_EXPIRE` STRING, `IS_SMALL_EXEMPT_PWD` STRING, `IS_SMOKING` STRING, `IS_TEST_MEMBER` STRING, `MAIL_ADDRESS_TYPE` STRING, `MAIL_LANGUAGE_CODE` STRING, `MEAL_PREFER` STRING, `MEMBER_LEVEL_CODE` STRING, `MEMBER_STATUS_CODE` STRING, `MEMBER_STATUS_DATE` STRING, `MULTIPLIER_MILES_SIGN` STRING, `OPERATE_DATE` STRING, `OPERATE_USER_ID` STRING, `QUALIFICATION_REASON_CODE` STRING, `REDEEM_SIGN` STRING, `REGIST_DATE` STRING, `REGIST_SOURCE` STRING, `SEAT_PREFER` STRING, `SMS_STATUS` STRING, `SPECIAL_ASSISTANCE` STRING, `STATEMENT_SEND_SIGN` STRING, `STATEMENT_SEND_TYPE` STRING, `SUBMIT_DATE` STRING, `UNITED_CARD_SIGN` STRING, `UPDATE_DATE` STRING, `UPDATE_STATUS_SIGN` STRING, `UPDATE_USER_ID` STRING, `UPGRADE_SIGN` STRING >, primary key (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'ods:tbl_member_info', 'zookeeper.quorum' = '172.22.31.53:2181' )
Flink SQL Dml
insert into SYNC_HBASE_SINK_TBL_MEMBER_INFO select ID AS rowkey, ROW( `ADDRESS_STATUS`, `AUDITOR_ID`, `BENEFIC_TYPE`, `CARD_STATUS_CODE`, `CHILD_SIGN`, `CLASS_PREFER`, `COMMENTS`, `DEGRADE_SIGN`, `EFFECTIVE_DATE`, `EXPIRE_DATE`, `ID`, `INVITER_CARD_NO`, `IS_ACCOUNT_CLOSED`, `IS_LEVEL_EXPIRE`, `IS_MILEAGE_EXPIRE`, `IS_SMALL_EXEMPT_PWD`, `IS_SMOKING`, `IS_TEST_MEMBER`, `MAIL_ADDRESS_TYPE`, `MAIL_LANGUAGE_CODE`, `MEAL_PREFER`, `MEMBER_LEVEL_CODE`, `MEMBER_STATUS_CODE`, `MEMBER_STATUS_DATE`, `MULTIPLIER_MILES_SIGN`, `OPERATE_DATE`, `OPERATE_USER_ID`, `QUALIFICATION_REASON_CODE`, `REDEEM_SIGN`, `REGIST_DATE`, `REGIST_SOURCE`, `SEAT_PREFER`, `SMS_STATUS`, `SPECIAL_ASSISTANCE`, `STATEMENT_SEND_SIGN`, `STATEMENT_SEND_TYPE`, `SUBMIT_DATE`, `UNITED_CARD_SIGN`, `UPDATE_DATE`, `UPDATE_STATUS_SIGN`, `UPDATE_USER_ID`, `UPGRADE_SIGN` ) from ( select data_row.`ADDRESS_STATUS` AS ADDRESS_STATUS, data_row.`AUDITOR_ID` AS AUDITOR_ID, data_row.`BENEFIC_TYPE` AS BENEFIC_TYPE, data_row.`CARD_STATUS_CODE` AS CARD_STATUS_CODE, data_row.`CHILD_SIGN` AS CHILD_SIGN, data_row.`CLASS_PREFER` AS CLASS_PREFER, data_row.`COMMENTS` AS COMMENTS, data_row.`DEGRADE_SIGN` AS DEGRADE_SIGN, data_row.`EFFECTIVE_DATE` AS EFFECTIVE_DATE, data_row.`EXPIRE_DATE` AS EXPIRE_DATE, data_row.`ID` AS ID, data_row.`INVITER_CARD_NO` AS INVITER_CARD_NO, data_row.`IS_ACCOUNT_CLOSED` AS IS_ACCOUNT_CLOSED, data_row.`IS_LEVEL_EXPIRE` AS IS_LEVEL_EXPIRE, data_row.`IS_MILEAGE_EXPIRE` AS IS_MILEAGE_EXPIRE, data_row.`IS_SMALL_EXEMPT_PWD` AS IS_SMALL_EXEMPT_PWD, data_row.`IS_SMOKING` AS IS_SMOKING, data_row.`IS_TEST_MEMBER` AS IS_TEST_MEMBER, data_row.`MAIL_ADDRESS_TYPE` AS MAIL_ADDRESS_TYPE, data_row.`MAIL_LANGUAGE_CODE` AS MAIL_LANGUAGE_CODE, data_row.`MEAL_PREFER` AS MEAL_PREFER, data_row.`MEMBER_LEVEL_CODE` AS MEMBER_LEVEL_CODE, data_row.`MEMBER_STATUS_CODE` AS MEMBER_STATUS_CODE, data_row.`MEMBER_STATUS_DATE` AS MEMBER_STATUS_DATE, data_row.`MULTIPLIER_MILES_SIGN` AS MULTIPLIER_MILES_SIGN, data_row.`OPERATE_DATE` AS OPERATE_DATE, data_row.`OPERATE_USER_ID` AS OPERATE_USER_ID, data_row.`QUALIFICATION_REASON_CODE` AS QUALIFICATION_REASON_CODE, data_row.`REDEEM_SIGN` AS REDEEM_SIGN, data_row.`REGIST_DATE` AS REGIST_DATE, data_row.`REGIST_SOURCE` AS REGIST_SOURCE, data_row.`SEAT_PREFER` AS SEAT_PREFER, data_row.`SMS_STATUS` AS SMS_STATUS, data_row.`SPECIAL_ASSISTANCE` AS SPECIAL_ASSISTANCE, data_row.`STATEMENT_SEND_SIGN` AS STATEMENT_SEND_SIGN, data_row.`STATEMENT_SEND_TYPE` AS STATEMENT_SEND_TYPE, data_row.`SUBMIT_DATE` AS SUBMIT_DATE, data_row.`UNITED_CARD_SIGN` AS UNITED_CARD_SIGN, data_row.`UPDATE_DATE` AS UPDATE_DATE, data_row.`UPDATE_STATUS_SIGN` AS UPDATE_STATUS_SIGN, data_row.`UPDATE_USER_ID` AS UPDATE_USER_ID, data_row.`UPGRADE_SIGN` AS UPGRADE_SIGN FROM default_catalog.default_database.KAFKA_SOURCE_TBL_MEMBER_INFO_HBASE ) t