C/C++教程

flink-sql-client提交sql脚本文件

本文主要是介绍flink-sql-client提交sql脚本文件,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

标题: flink-sql-client提交sql脚本文件
日期: 2021-10-22 22:11:34
标签: [flink,sql-client]
分类: flink

我们知道,sql-client.sh可以提供给我们一个sql交互界面,让我们没执行一个sql,就可以看到执行结果,也可以交互式查询表的结果。

其实,我们也可以通过sql-client提交sql脚本,我们来看下。

flink

./bin/sql-client.sh -h 对应的帮助参数:

(base) [chenzuoli@chenzuolis-MacBook /Volumes/chenzuoli/Data/docker_img/flink-1.12.1]$./bin/sql-client.sh -h
./sql-client [MODE] [OPTIONS]

The following options are available:

Mode "embedded" submits Flink jobs from the local machine.

  Syntax: embedded [OPTIONS]
  "embedded" mode options:
     -d,--defaults <environment file>      The environment properties with which
                                           every new session is initialized.
                                           Properties might be overwritten by
                                           session properties.
     -e,--environment <environment file>   The environment properties to be
                                           imported into the session. It might
                                           overwrite default environment
                                           properties.
     -h,--help                             Show the help message with
                                           descriptions of all options.
     -hist,--history <History file path>   The file which you want to save the
                                           command history into. If not
                                           specified, we will auto-generate one
                                           under your user's home directory.
     -j,--jar <JAR file>                   A JAR file to be imported into the
                                           session. The file might contain
                                           user-defined classes needed for the
                                           execution of statements such as
                                           functions, table sources, or sinks.
                                           Can be used multiple times.
     -l,--library <JAR directory>          A JAR file directory with which every
                                           new session is initialized. The files
                                           might contain user-defined classes
                                           needed for the execution of
                                           statements such as functions, table
                                           sources, or sinks. Can be used
                                           multiple times.
     -pyarch,--pyArchives <arg>            Add python archive files for job. The
                                           archive files will be extracted to
                                           the working directory of python UDF
                                           worker. Currently only zip-format is
                                           supported. For each archive file, a
                                           target directory be specified. If the
                                           target directory name is specified,
                                           the archive file will be extracted to
                                           a name can directory with the
                                           specified name. Otherwise, the
                                           archive file will be extracted to a
                                           directory with the same name of the
                                           archive file. The files uploaded via
                                           this option are accessible via
                                           relative path. '#' could be used as
                                           the separator of the archive file
                                           path and the target directory name.
                                           Comma (',') could be used as the
                                           separator to specify multiple archive
                                           files. This option can be used to
                                           upload the virtual environment, the
                                           data files used in Python UDF (e.g.:
                                           --pyArchives
                                           file:///tmp/py37.zip,file:///tmp/data
                                           .zip#data --pyExecutable
                                           py37.zip/py37/bin/python). The data
                                           files could be accessed in Python
                                           UDF, e.g.: f = open('data/data.txt',
                                           'r').
     -pyexec,--pyExecutable <arg>          Specify the path of the python
                                           interpreter used to execute the
                                           python UDF worker (e.g.:
                                           --pyExecutable
                                           /usr/local/bin/python3). The python
                                           UDF worker depends on Python 3.5+,
                                           Apache Beam (version == 2.23.0), Pip
                                           (version >= 7.1.0) and SetupTools
                                           (version >= 37.0.0). Please ensure
                                           that the specified environment meets
                                           the above requirements.
     -pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.
                                           These files will be added to the
                                           PYTHONPATH of both the local client
                                           and the remote python UDF worker. The
                                           standard python resource file
                                           suffixes such as .py/.egg/.zip or
                                           directory are all supported. Comma
                                           (',') could be used as the separator
                                           to specify multiple files (e.g.:
                                           --pyFiles
                                           file:///tmp/myresource.zip,hdfs:///$n
                                           amenode_address/myresource2.zip).
     -pyreq,--pyRequirements <arg>         Specify a requirements.txt file which
                                           defines the third-party dependencies.
                                           These dependencies will be installed
                                           and added to the PYTHONPATH of the
                                           python UDF worker. A directory which
                                           contains the installation packages of
                                           these dependencies could be specified
                                           optionally. Use '#' as the separator
                                           if the optional parameter exists
                                           (e.g.: --pyRequirements
                                           file:///tmp/requirements.txt#file:///
                                           tmp/cached_dir).
     -s,--session <session identifier>     The identifier for a session.
                                           'default' is the default identifier.
     -u,--update <SQL update statement>    Experimental (for testing only!):
                                           Instructs the SQL Client to
                                           immediately execute the given update
                                           statement after starting up. The
                                           process is shut down after the
                                           statement has been submitted to the
                                           cluster and returns an appropriate
                                           return code. Currently, this feature
                                           is only supported for INSERT INTO
                                           statements that declare the target
                                           sink table.

其中第一个参数-d,可以指定一些环境上的参数配置。

在这里插入图片描述

接下来,我们看看conf/sql-client-defaults.yaml文件,这个文件其实就是对应的配置文件。
创建测试用的数据文件:

mkdir sql_test
vim sql_test/book-store.csv

枪炮、病菌和钢铁,18,社会学
APP UI设计之道,20,设计
通证经济,22,经济学
区块链的真正商机,21,经济学

我们再来创建一个自己的配置文件,读取csv文件,然后select出来,新建文件conf/book-store.yaml

vim conf/book-store.yaml

tables:
  - name: BookStore
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/Users/zhaoqin/temp/202004/26/book-store.csv"
    format:
      type: csv
      fields:
        - name: BookName
          type: VARCHAR
        - name: BookAmount
          type: INT
        - name: BookCatalog
          type: VARCHAR
      line-delimiter: "\n"
      comment-prefix: ","
    schema:
      - name: BookName
        type: VARCHAR
      - name: BookAmount
        type: INT
      - name: BookCatalog
        type: VARCHAR
  - name: MyBookView
    type: view
    query: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"


execution:
  planner: blink                    # optional: either 'blink' (default) or 'old'
  type: streaming                   # required: execution mode either 'batch' or 'streaming'
  result-mode: table                # required: either 'table' or 'changelog'
  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)
  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
  min-idle-state-retention: 0       # optional: table program's minimum idle state time
  max-idle-state-retention: 0       # optional: table program's maximum idle state time

                                    #   (default database of the current catalog by default)
  restart-strategy:                 # optional: restart strategy
    type: fallback                  #   "fallback" to global restart strategy by default

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  response-timeout: 5000

通过指定配置文件的方式,来启动一个session,执行相应的source-table和sink-table。
其中关于book-store.yaml配置文件,有几点需要注意:
a. tables.type等于source-table,表明这是数据源的配置信息;
b. tables.connector描述了详细的数据源信息,path是book-store.csv文件的完整路径,connector的type指定为filesystem,这跟我们写sql的时候指定的connector参数是一致的;
c. tables.format描述了文件内容,type为csv格式;
d. tables.schema描述了数据源表的表结构;
ed. type为view表示MyBookView是个视图(参考数据库的视图概念);

下面来看一下测试结果:

./bin/start-cluster.sh
./bin/sql-client.sh embedded -d conf/book-store.yaml

进入sql-client sql交互界面之后,可以看到环境已经配置好了,

Flink SQL> show tables;
BookStore
MyBookView

Flink SQL> desc BookStore;
+-------------+--------+------+-----+--------+-----------+
|        name |   type | null | key | extras | watermark |
+-------------+--------+------+-----+--------+-----------+
|    BookName | STRING | true |     |        |           |
|  BookAmount |    INT | true |     |        |           |
| BookCatalog | STRING | true |     |        |           |
+-------------+--------+------+-----+--------+-----------+
3 rows in set

Flink SQL> desc MyBookView
> ;
+-------------+--------+------+-----+--------+-----------+
|        name |   type | null | key | extras | watermark |
+-------------+--------+------+-----+--------+-----------+
| BookCatalog | STRING | true |     |        |           |
|      Amount |    INT | true |     |        |           |
+-------------+--------+------+-----+--------+-----------+
2 rows in set

可以看到两个表已经创建好了,我们可以看一下数据:

select * from MyBookView;

BookCatalog                    Amount
        社会学                        18
        设计                        20
        经济学                        43

对不对,ok了,你要是yaml文件中写有sink-table那么,直接就提交了一个flink job到flink集群了,是不是达到了提交flink sql脚本文件的效果了。

好了,今天就这样,因为这几天在倒腾公司数据平台组开发的一个 流数据平台,发现他们是通过sql-client,提交到k8s上的,这一个提交任务方式,着实让我感到意外。因为之前翻译过一篇官方提供的flink submit job的文章,里面提到了四种提交方式:

  1. local cluster
  2. application mode
  3. per job mode
  4. session mode
    我以为只有这四种呢,其实仔细看,sql-client提交sql的方式类似于session的方式,在整个session启动过程中,你可以不听地执行sql语句,session关闭,则任务关闭。

ok,下次见。

flink, yyds.


书山有路勤为径,学海无涯苦作舟。

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:
程序员写书

喜欢宠物的朋友可以关注:【电巴克宠物Pets】
电巴克宠物

一起学习,一起进步。

这篇关于flink-sql-client提交sql脚本文件的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!