from pyflink.dataset import ExecutionEnvironment from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings from pyflink.table.catalog import HiveCatalog from pyflink.table import SqlDialect env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() t_env = BatchTableEnvironment.create(environment_settings=env_settings) catalog = HiveCatalog("myhive", "ods", "/home/hadoop/hive-3.1.2/conf") # Register the catalog t_env.register_catalog("myhive", catalog) # set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive") t_env.use_catalog("myhive") t_env.get_config().set_sql_dialect(SqlDialect.HIVE) # Create a catalog table t_env.execute_sql("""CREATE TABLE IF NOT EXISTS sink_parent_info( etl_date STRING ,id BIGINT ,user_id BIGINT ,height DECIMAL(5,2) ,weight DECIMAL(5,2) ) """) # should return the tables in current catalog and database. t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) t_env.execute_sql(f""" CREATE TEMPORARY TABLE source_parent_info( id bigint ,user_id bigint ,height decimal(5,2) ,weight decimal(5,2) ) with ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xxxx:3306/xxxx', 'connector.driver'= 'com.mysql.cj.jdbc.Driver', 'connector.table' = 'parent_info', 'connector.username' = 'root', 'connector.password' = 'xxxx', 'connector.write.flush.interval' = '1s') """) t_env.execute_sql(""" INSERT INTO sink_parent_info SELECT id ,user_id ,height ,weight FROM source_parent_info """).wait()
参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830