用户可以通过 MySQL 协议,使用 INSERT 语句进行数据导入
INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。 INSERT 语句支持以下两种语法:
INSERT INTO table SELECT ... INSERT INTO table VALUES(...)
对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。
因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频次的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。
该方式仅用于线下简单测试或低频少量的操作。
或者可以使用以下方式进行批量的插入操作:
INSERT INTO example_tbl VALUES (1000, "baidu1", 3.25) (2000, "baidu2", 4.25) (3000, "baidu3", 5.25);
用于将本地文件导入到doris中。Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。
该方式中涉及 HOST:PORT 都是对应的HTTP 协议端口。
• BE 的 HTTP 协议端口,默认为 8040。
• FE 的 HTTP 协议端口,默认为 8030。
但须保证客户端所在机器网络能够联通FE, BE 所在机器。
-- 创建表 drop table if exists load_local_file_test; CREATE TABLE IF NOT EXISTS load_local_file_test ( id INT, name VARCHAR(50), age TINYINT ) unique key(id) DISTRIBUTED BY HASH(id) BUCKETS 3;
# 创建文件 1,zss,28 2,lss,28 3,ww,88 # 导入数据 ## 语法示例 curl \ -u user:passwd \ # 账号密码 -H "label:load_local_file_test" \ # 本次任务的唯一标识 -T 文件地址 \ http://主机名:端口号/api/库名/表名/_stream_load # user:passwd 为在 Doris 中创建的用户。初始用户为 admin / root,密码初始状态下为空。 # host:port 为 BE 的 HTTP 协议端口,默认是 8040,可以在 Doris 集群 WEB UI页面查看。 # label: 可以在 Header 中指定 Label 唯一标识这个导入任务。 curl \ -u root:123 \ -H "label:load_local_file" \ -H "column_separator:," \ -T /root/data/loadfile.txt \ http://doitedu01:8040/api/test/load_local_file_test/_stream_load
建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。
# 准备数据 {"id":1,"name":"liuyan","age":18} {"id":2,"name":"tangyan","age":18} {"id":3,"name":"jinlian","age":18} {"id":4,"name":"dalang","age":18} {"id":5,"name":"qingqing","age":18} curl \ -u root: \ -H "label:load_local_file_json_20221126" \ -H "columns:id,name,age" \ -H "max_filter_ratio:0.1" \ -H "timeout:1000" \ -H "exec_mem_limit:1G" \ -H "where:id>1" \ -H "format:json" \ -H "read_json_by_line:true" \ -H "merge_type:delete" \ -T /root/data/json.txt \ http://doitedu01:8040/api/test/load_local_file_test/_stream_load -H "merge_type:append" \ # 会把id = 3 的这条数据删除 -H "merge_type:MERGE" \ -H "delete:id=3"
• 源数据在 Broker 可以访问的存储系统中,如 HDFS。
• 数据量在几十到百 GB 级别。
-- 新建一张表 drop table if exists load_hdfs_file_test1; CREATE TABLE IF NOT EXISTS load_hdfs_file_test1 ( id INT, name VARCHAR(50), age TINYINT ) unique key(id) DISTRIBUTED BY HASH(id) BUCKETS 3;
将本地的数据导入到hdfs上面 hdfs dfs -put ./loadfile.txt hdfs://linux01:8020/ hdfs dfs -ls hdfs://linux01:8020/
-- 导入语法 LOAD LABEL test.label_202204( [MERGE|APPEND|DELETE] -- 不写就是append DATA INFILE ( "file_path1"[, file_path2, ...] -- 描述数据的路径 这边可以写多个 ,以逗号分割 ) [NEGATIVE] -- 负增长 INTO TABLE `table_name` -- 导入的表名字 [PARTITION (p1, p2, ...)] -- 导入到哪些分区,不符合这些分区的就会被过滤掉 [COLUMNS TERMINATED BY "column_separator"] -- 指定分隔符 [FORMAT AS "file_type"] -- 指定存储的文件类型 [(column_list)] -- 指定导入哪些列 [COLUMNS FROM PATH AS (c1, c2, ...)] -- 从路劲中抽取的部分列 [SET (column_mapping)] -- 对于列可以做一些映射,写一些函数 -- 这个参数要写在要写在set的后面 [PRECEDING FILTER predicate] -- 在mapping前做过滤做一些过滤 [WHERE predicate] -- 在mapping后做一些过滤 比如id>10 [DELETE ON expr] --根据字段去做一些抵消消除的策略 需要配合MERGE [ORDER BY source_sequence] -- 导入数据的时候保证数据顺序 [PROPERTIES ("key1"="value1", ...)] -- 一些配置参数
-- 将hdfs上的数据load到表中 LOAD LABEL test.label_20221125 ( DATA INFILE("hdfs://linux01:8020/test.txt") INTO TABLE `load_hdfs_file_test` COLUMNS TERMINATED BY "," (id,name,age) ) with HDFS ( "fs.defaultFS"="hdfs://linux01:8020", "hadoop.username"="root" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" ); -- 这是一个异步的操作,所以需要去查看下执行的状态 show load order by createtime desc limit 1\G;
从 HDFS 导入数据,使用通配符匹配两批两批文件。分别导入到两个表中
LOAD LABEL example_db.label2 ( DATA INFILE("hdfs://linux01:8020/input/file-10*") INTO TABLE `my_table1` PARTITION (p1) COLUMNS TERMINATED BY "," FORMAT AS "parquet" (id, tmp_salary, tmp_score) SET ( salary= tmp_salary + 1000, score = tmp_score + 10 ), DATA INFILE("hdfs://linux01:8020/input/file-20*") INTO TABLE `my_table2` COLUMNS TERMINATED BY "," (k1, k2, k3) ) with HDFS ( "fs.defaultFS"="hdfs://linux01:8020", "hadoop.username"="root" ) -- 导入数据,并提取文件路径中的分区字段 LOAD LABEL example_db.label10 ( DATA INFILE("hdfs://linux01:8020/user/hive/warehouse/table_name/dt=20221125/*") INTO TABLE `my_table` FORMAT AS "csv" (k1, k2, k3) COLUMNS FROM PATH AS (dt) ) WITH BROKER hdfs ( "username"="root", "password"="123" ); -- 对待导入数据进行过滤。 LOAD LABEL example_db.label6 ( DATA INFILE("hdfs://linux01:8020/input/file") INTO TABLE `my_table` (k1, k2, k3) SET ( k2 = k2 + 1 ) PRECEDING FILTER k1 = 1 ==》前置过滤 WHERE k1 > k2 ==》 后置过滤 ) WITH BROKER hdfs ( "username"="root", "password"="123" ); -- 只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。
当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。
取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 HELP CANCEL LOAD 查看。
CANCEL LOAD [FROM db_name] WHERE LABEL="load_label";
Doris 可以创建外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT 的方式导入外部表的数据。
Doris 外部表目前支持的数据源包括:MySQL,Oracle,Hive,PostgreSQL,SQLServer,Iceberg,ElasticSearch
-- 整体语法 CREATE [EXTERNAL] TABLE table_name ( col_name col_type [NULL | NOT NULL] [COMMENT "comment"] ) ENGINE=HIVE [COMMENT "comment"] PROPERTIES ( -- 我要映射的hive表在哪个库里面 -- 映射的表名是哪一张 -- hive的元数据服务地址 'property_name'='property_value', ... ); -- 参数说明: -- 1.外表列 -- 列名要与 Hive 表一一对应 -- 列的顺序需要与 Hive 表一致 -- 必须包含 Hive 表中的全部列 -- Hive 表分区列无需指定,与普通列一样定义即可。 -- 2.ENGINE 需要指定为 HIVE -- 3.PROPERTIES 属性: -- hive.metastore.uris:Hive Metastore 服务地址 -- database:挂载 Hive 对应的数据库名 -- table:挂载 Hive 对应的表名
完成在 Doris 中建立 Hive 外表后,除了无法使用 Doris 中的数据模型(rollup、预聚合、物化视图等)外,与普通的 Doris OLAP 表并无区别
-- 在Hive 中创建一个测试用表: CREATE TABLE `user_info` ( `id` int, `name` string, `age` int ) stored as orc; insert into user_info values (1,'zss',18); insert into user_info values (2,'lss',20); insert into user_info values (3,'ww',25); -- Doris 中创建外部表 CREATE EXTERNAL TABLE `hive_user_info` ( `id` int, `name` varchar(10), `age` int ) ENGINE=HIVE PROPERTIES ( 'hive.metastore.uris' = 'thrift://linux01:9083', 'database' = 'db1', 'table' = 'user_info' );
外部表创建好后,就可以直接在doris中对这个外部表进行查询了
直接查询外部表,无法利用到doris自身的各种查询优化机制!
select * from hive_user_info; -- 将数据从外部表导入内部表 -- 数据从外部表导入内部表后,就可以利用doris自身的查询优势了! -- 假设要导入的目标内部表为: doris_user_info (需要提前创建) CREATE TABLE IF NOT EXISTS doris_user_info ( id INT, name VARCHAR(50), age TINYINT ) unique key(id) DISTRIBUTED BY HASH(id) BUCKETS 3; -- 就是用sql查询,从外部表中select出数据后,insert到内部表即可 insert into doris_user_info select * from hive_user_info;
注意:
Hive 表 Schema 变更不会自动同步,需要在 Doris 中重建 Hive 外表。
当前 Hive 的存储格式仅支持 Text,Parquet 和 ORC 类型
Binlog Load提供了一种使Doris增量同步用户在Mysql数据库中对数据更新操作的CDC(Change Data Capture)功能。
基本原理
当前版本设计中,Binlog Load需要依赖canal作为中间媒介,让canal伪造成一个从节点去获取Mysql主节点上的Binlog并解析,再由Doris去获取Canal上解析好的数据,主要涉及Mysql端、Canal端以及Doris端
在Mysql Cluster模式的主从同步中,二进制日志文件(Binlog)记录了主节点上的所有数据变化,数据在Cluster的多个节点间同步、备份都要通过Binlog日志进行,从而提高集群的可用性。架构通常由一个主节点(负责写)和一个或多个从节点(负责读)构成,所有在主节点上发生的数据变更将会复制给从节点。
注意:目前必须要使用Mysql 5.7及以上的版本才能支持Binlog Load功能。
# 打开mysql的二进制binlog日志功能,则需要编辑my.cnf配置文件设置一下。 find / -name my.cnf /etc/my.cnf
# 修改mysqld中的一些配置文件 [mysqld] server_id = 1 log-bin = mysql-bin binlog-format = ROW #binlog-format 的三种模式 #ROW 记录每一行数据的信息 #Statement 记录sql语句 #Mixed 上面两种的混合 # 重启 MySQL 使配置生效 systemctl restart mysqld
-- 创建用户并授权 -- 设置这些参数可以使得mysql的密码简单化 set global validate_password_length=4; set global validate_password_policy=0; -- 新增一个canal的用户,让他监听所有库中的所有表,并且设置密码为canal GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ; -- 刷新一下权限 FLUSH PRIVILEGES; -- 准备测试表 CREATE TABLE `user_doris2` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL, `gender` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
Canal 是属于阿里巴巴 otter 项目下的一个子项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,用于解决跨机房同步的业务场景,建议使用 canal 1.1.5及以上版本。
下载地址:https://github.com/alibaba/canal/releases
# 上传并解压 canal deployer压缩包 mkdir /opt/apps/canal tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/apps/canal # 在 conf 文件夹下新建目录并重命名 # 一个 canal 服务中可以有多个 instance,conf/下的每一个目录即是一个实例,每个实例下面都有独立的配置文件 mkdir /opt/apps/canel/conf/doris # 拷贝配置文件模板 cp /opt/apps/canal/conf/example/instance.properties /opt/apps/canal/conf/doris/ # 修改 conf/canal.properties 的配置 vi canal.properties # 进入找到canal.destinations = example # 将其修改为 我们自己配置的目录 canal.destinations = doris # 修改 instance 配置文件 vi instance.properties # 修改: canal.instance.master.address=doitedu01:3306 # 启动 sh bin/startup.sh
注意:canal client 和 canal instance 是一一对应的,Binlog Load 已限制多个数据同步作 业不能连接到同一个 destination。
基本语法: CREATE SYNC [db.]job_name ( channel_desc, channel_desc ... ) binlog_desc -- 参数说明: -- job_name:是数据同步作业在当前数据库内的唯一标识 -- channel_desc :用来定义任务下的数据通道,可表示 MySQL 源表到 doris 目标表的映射关系。在设置此项时,如果存在多个映射关系,必须满足 MySQL 源表应该与 doris 目标表是一一对应关系,其他的任何映射关系(如一对多关系),检查语法时都被视为不合法。 -- column_mapping:主要指MySQL源表和doris目标表的列之间的映射关系,如果不指定,FE 会默认源表和目标表的列按顺序一一对应。但是我们依然建议显式的指定列的映射关系,这样当目标表的结构发生变化(比如增加一个 nullable 的列),数据同步作业依然可以进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。 -- binlog_desc:定义了对接远端 Binlog 地址的一些必要信息,目前可支持的对接类型只有 canal 方式,所有的配置项前都需要加上 canal 前缀。 -- canal.server.ip: canal server 的地址 -- canal.server.port: canal server 的端口 -- canal.destination: 前文提到的 instance 的字符串标识 -- canal.batchSize: 每批从 canal server 处获取的 batch 大小的最大值,默认 8192 -- canal.username: instance 的用户名 -- canal.password: instance 的密码 -- canal.debug: 设置为 true 时,会将 batch 和每一行数据的详细信息都打印出来,会影响性能。 -- Doris 创建与 Mysql 对应的目标表 CREATE TABLE `binlog_mysql` ( `id` int(11) NOT NULL COMMENT "", `name` VARCHAR(50) NOT NULL COMMENT "", `age` int(11) NOT NULL COMMENT "" , `gender` VARCHAR(50) NOT NULL COMMENT "" ) ENGINE=OLAP UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1; CREATE SYNC test.job20221228 ( FROM test.binlog_test INTO binlog_test ) FROM BINLOG ( "type" = "canal", "canal.server.ip" = "linux01", "canal.server.port" = "11111", "canal.destination" = "doris", "canal.username" = "canal", "canal.password" = "canal" ); -- 查看作业状态 -- 展示当前数据库的所有数据同步作业状态。 SHOW SYNC JOB; -- 展示数据库 `test_db` 下的所有数据同步作业状态。 SHOW SYNC JOB FROM `test`; -- 停止名称为 `job_name` 的数据同步作业 STOP SYNC JOB [db.]job_name -- 暂停名称为 `job_name` 的数据同步作业 PAUSE SYNC JOB [db.]job_name -- 恢复名称为 `job_name` 的数据同步作业 RESUME SYNC JOB `job_name`
数据导出(Export)是 Doris 提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据,以文本的格式,通过 Broker 进程导出到远端存储上,如 HDFS / 对象存储(支持S3协议) 等。
原理
Export 作业会生成多个查询计划,每个查询计划负责扫描一部分 Tablet。每个查询计划扫描的 Tablet 个数由 FE 配置参数 export_tablet_num_per_task 指定,默认为 5。即假设一共 100 个 Tablet,则会生成 20 个查询计划。用户也可以在提交作业时,通过作业属性 tablet_num_per_task 指定这个数值。
一个作业的多个查询计划顺序执行
一个查询计划扫描多个分片,将读取的数据以行的形式组织,每 1024 行为一个 batch,调用 Broker 写入到远端存储上。
查询计划遇到错误会整体自动重试 3 次。如果一个查询计划重试 3 次依然失败,则整个作业失败。
Doris 会首先在指定的远端存储的路径中,建立一个名为 __doris_export_tmp_12345 的临时目录(其中 12345 为作业 id)。导出的数据首先会写入这个临时目录。每个查询计划会生成一个文件,文件名示例:
export-data-c69fcf2b6db5420f-a96b94c1ff8bccef-1561453713822
其中 c69fcf2b6db5420f-a96b94c1ff8bccef 为查询计划的 query id。1561453713822 为文件生成的时间戳。当所有数据都导出后,Doris 会将这些文件 rename 到用户指定的路径中
示例:导出到hdfs
EXPORT TABLE test.event_info_log1 -- 库名.表名 to "hdfs://linux01:8020/event_info_log1" -- 导出到那里去 PROPERTIES ( "label" = "event_info_log1", "column_separator"=",", "exec_mem_limit"="2147483648", "timeout" = "3600" ) WITH BROKER "broker_name" ( "username" = "root", "password" = "" ); -- 1.label:本次导出作业的标识。后续可以使用这个标识查看作业状态。 -- 2.column_separator:列分隔符。默认为 \t。支持不可见字符,比如 '\x07'。 -- 3.columns:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列。 -- 4.line_delimiter:行分隔符。默认为 \n。支持不可见字符,比如 '\x07'。 -- 5.exec_mem_limit: 表示 Export 作业中,一个查询计划在单个 BE 上的内存使用限制。默认 2GB。单位字节。 -- 6.timeout:作业超时时间。默认 2小时。单位秒。 -- 7.tablet_num_per_task:每个查询计划分配的最大分片数。默认为 5。 -- 查看导出状态 show EXPORT \G;
注意事项
SELECT INTO OUTFILE 语句可以将查询结果导出到文件中。目前支持通过 Broker进程, 通过 S3 协议, 或直接通过 HDFS 协议,导出到远端存储,如 HDFS,S3,BOS,COS (腾讯云)上。
-- 语法 query_stmt -- 查询语句 INTO OUTFILE "file_path" --导出文件的路劲 [format_as] -- 指定文件存储的格式 [properties] -- 一些配置文件
file_path:指向文件存储的路径以及文件前缀。如 hdfs://path/to/my_file_.最终的文件名将由 my_file_,文件序号以及文件格式后缀组成。其中文件序号由 0 开始,数量为文件被分割的数量
-- 如 my_file_abcdefg_0.csv my_file_abcdefg_1.csv my_file_abcdegf_2.csv -- [format_as]:指定导出格式。默认为 CSV -- [properties]:指定相关属性。目前支持通过 Broker 进程,hdfs协议等 -- Broker 相关属性需加前缀 broker. -- HDFS 相关属性需加前缀 hdfs. 其中hdfs.fs.defaultFS 用于填写 namenode地址和端口,属于必填项。 -- 如: ("broker.prop_key" = "broker.prop_val", ...) ("hdfs.fs.defaultFS" = "xxx", "hdfs.hdfs_user" = "xxx") -- 其他属性: -- column_separator:列分隔符,仅对 CSV 格式适用。默认为 \t。 -- line_delimiter:行分隔符,仅对 CSV 格式适用。默认为 \n。 -- max_file_size:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。 -- schema:PARQUET 文件 schema 信息。仅对 PARQUET 格式适用。导出文件格式为 PARQUET 时,必须指定 schema。
使用 broker 方式,将简单查询结果导出
select * from log_detail where id >2 INTO OUTFILE "hdfs://doitedu01:8020/doris-out/broker_a_" FORMAT AS CSV PROPERTIES ( "broker.name" = "broker_name", "column_separator" = ",", "line_delimiter" = "\n", "max_file_size" = "100MB" );
使用 HDFS 方式导出
EXPLAIN SELECT * FROM log_detail INTO OUTFILE "hdfs://doris-out/hdfs_" FORMAT AS CSV PROPERTIES ( "fs.defaultFS" = "hdfs://doitedu01:8020", "hadoop.username" = "root", "column_separator" = "," );