该部分针对import table和Load Data相同的功能做命令示例演示,我们依旧以导入employees表的示例数据为例,演示MySQL Load Data的综合场景
数据自定义顺序导入
数据函数处理
自定义数据取值
[root@10-186-61-162 tmp]# cat employees_01.csv
“10001”,“1953-09-02”,“Georgi”,“Facello”,“M”,“1986-06-26”
“10003”,“1959-12-03”,“Parto”,“Bamford”,“M”,“1986-08-28”
“10002”,“1964-06-02”,“Bezalel”,“Simmel”,“F”,“1985-11-21”
“10004”,“1954-05-01”,“Chirstian”,“Koblick”,“M”,“1986-12-01”
“10005”,“1955-01-21”,“Kyoichi”,“Maliniak”,“M”,“1989-09-12”
“10006”,“1953-04-20”,“Anneke”,“Preusig”,“F”,“1989-06-02”
“10007”,“1957-05-23”,“Tzvetan”,“Zielinski”,“F”,“1989-02-10”
“10008”,“1958-02-19”,“Saniya”,“Kalloufi”,“M”,“1994-09-15”
“10009”,“1952-04-19”,“Sumant”,“Peac”,“F”,“1985-02-18”
“10010”,“1963-06-01”,“Duangkaew”,“Piveteau”,“F”,“1989-08-24”
10.186.61.162:3306 employees SQL > desc emp;
±------------±--------------±-----±----±--------±------+
| Field | Type | Null | Key | Default | Extra |
±------------±--------------±-----±----±--------±------+
| emp_no | int | NO | PRI | NULL | |
| birth_date | date | NO | | NULL | |
| first_name | varchar(14) | NO | | NULL | |
| last_name | varchar(16) | NO | | NULL | |
| full_name | varchar(64) | YES | | NULL | | – 表新增字段,导出数据文件中不存在
| gender | enum(‘M’,‘F’) | NO | | NULL | |
| hire_date | date | NO | | NULL | |
| modify_date | datetime | YES | | NULL | | – 表新增字段,导出数据文件中不存在
| delete_flag | varchar(1) | YES | | NULL | | – 表新增字段,导出数据文件中不存在
±------------±--------------±-----±----±--------±------+
2.1 用Load Data方式导入数据
具体参数含义不做说明,需要了解语法规则及含义可查看系列上一篇文章<MySQL Load Data的多种用法>
load data infile ‘/data/mysql/3306/tmp/employees_01.csv’
into table employees.emp
character set utf8mb4
fields terminated by ‘,’
enclosed by ‘"’
lines terminated by ‘\n’
(@C1,@C2,@C3,@C4,@C5,@C6)
set emp_no=@C1,
birth_date=@C2,
first_name=upper(@C3),
last_name=lower(@C4),
full_name=concat(first_name,’ ',last_name),
gender=@C5,
hire_date=@C6 ,
modify_date=now(),
delete_flag=if(hire_date<‘1988-01-01’,‘Y’,‘N’);
2.2 用import_table方式导入数据
util.import_table(
[
“/data/mysql/3306/tmp/employees_01.csv”,
],
{
“schema”: “employees”,
“table”: “emp”,
“dialect”: “csv-unix”,
“skipRows”: 0,
“showProgress”: True,
“characterSet”: “utf8mb4”,
“columns”: [1,2,3,4,5,6], ## 文件中多少个列就用多少个序号标识就行
“decodeColumns”: {
“emp_no”: “@1”, ## 对应文件中的第1列
“birth_date”: “@2”, ## 对应文件中的第2个列
“first_name”: “upper(@3)”, ## 对应文件中的第3个列,并做转为大写的处理
“last_name”: “lower(@4)”, ## 对应文件中的第4个列,并做转为大写的处理
“full_name”: “concat(@3,’ ',@4)”, ## 将文件中的第3,4列合并成一列生成表中字段值
“gender”: “@5”, ## 对应文件中的第5个列
“hire_date”: “@6”, ## 对应文件中的第6个列
“modify_date”: “now()”, ## 用函数生成表中字段值
“delete_flag”: “if(@6<‘1988-01-01’,‘Y’,‘N’)” ## 基于文件中第6列做逻辑判断,生成表中对应字段值
}
})
[root@10-186-61-162 tmp]# ls -lh
总用量 1.9G
-rw-r----- 1 mysql mysql 579 3月 24 19:07 employees_01.csv
-rw-r----- 1 mysql mysql 584 3月 24 18:48 employees_02.csv
-rw-r----- 1 mysql mysql 576 3月 24 18:48 employees_03.csv
-rw-r----- 1 mysql mysql 1.9G 3月 26 17:15 sbtest1.csv
util.import_table(
[
“/data/mysql/3306/tmp/employees_*”,
],
{
“schema”: “employees”,
“table”: “emp”,
“dialect”: “csv-unix”,
“skipRows”: 0,
“showProgress”: True,
“characterSet”: “utf8mb4”,
“columns”: [1,2,3,4,5,6], ## 文件中多少个列就用多少个序号标识就行
“decodeColumns”: {
“emp_no”: “@1”, ## 对应文件中的第1列
“birth_date”: “@2”, ## 对应文件中的第2个列
“first_name”: “upper(@3)”, ## 对应文件中的第3个列,并做转为大写的处理
“last_name”: “lower(@4)”, ## 对应文件中的第4个列,并做转为大写的处理
“full_name”: “concat(@3,’ ',@4)”, ## 将文件中的第3,4列合并成一列生成表中字段值
“gender”: “@5”, ## 对应文件中的第5个列
“hire_date”: “@6”, ## 对应文件中的第6个列
“modify_date”: “now()”, ## 用函数生成表中字段值
“delete_flag”: “if(@6<‘1988-01-01’,‘Y’,‘N’)” ## 基于文件中第6列做逻辑判断,生成表中对应字段值
}
})
util.import_table(
[
“/data/mysql/3306/tmp/employees_01.csv”,
“/data/mysql/3306/tmp/employees_02.csv”,
“/data/mysql/3306/tmp/employees_03.csv”
],
{
“schema”: “employees”,
“table”: “emp”,
“dialect”: “csv-unix”,
“skipRows”: 0,
“showProgress”: True,
“characterSet”: “utf8mb4”,
“columns”: [1,2,3,4,5,6], ## 文件中多少个列就用多少个序号标识就行
“decodeColumns”: {
“emp_no”: “@1”, ## 对应文件中的第1列
“birth_date”: “@2”, ## 对应文件中的第2个列
“first_name”: “upper(@3)”, ## 对应文件中的第3个列,并做转为大写的处理
“last_name”: “lower(@4)”, ## 对应文件中的第4个列,并做转为大写的处理
“full_name”: “concat(@3,’ ',@4)”, ## 将文件中的第3,4列合并成一列生成表中字段值
“gender”: “@5”, ## 对应文件中的第5个列
“hire_date”: “@6”, ## 对应文件中的第6个列
“modify_date”: “now()”, ## 用函数生成表中字段值
“delete_flag”: “if(@6<‘1988-01-01’,‘Y’,‘N’)” ## 基于文件中第6列做逻辑判断,生成表中对应字段值
}
})
3.2 并发导入
在实验并发导入前我们创建一张1000W的sbtest1表(大约2G数据),做并发模拟,import_table用threads参数作为并发配置, 默认为8个并发.
[root@10-186-61-162 tmp]# ls -lh
总用量 1.9G
-rw-r----- 1 mysql mysql 579 3月 24 19:07 employees_01.csv
-rw-r----- 1 mysql mysql 584 3月 24 18:48 employees_02.csv
-rw-r----- 1 mysql mysql 576 3月 24 18:48 employees_03.csv
-rw-r----- 1 mysql mysql 1.9G 3月 26 17:15 sbtest1.csv
util.import_table(
[
“/data/mysql/3306/tmp/sbtest1.csv”,
],
{
“schema”: “demo”,
“table”: “sbtest1”,
“dialect”: “csv-unix”,
“skipRows”: 0,
“showProgress”: True,
“characterSet”: “utf8mb4”,
“threads”: “8”
})
3.3 导入速率控制
可以通过maxRate和threads来控制每个并发线程的导入数据,如,当前配置线程为4个,每个线程的速率为2M/s,则最高不会超过8M/s
util.import_table(
[
“/data/mysql/3306/tmp/sbtest1.csv”,
],
{
“schema”: “demo”,
“table”: “sbtest1”,
“dialect”: “csv-unix”,
“skipRows”: 0,
“showProgress”: True,
“characterSet”: “utf8mb4”,
“threads”: “4”,
“maxRate”: “2M”
})
3.4 自定义chunk大小
默认的chunk大小为50M,我们可以调整chunk的大小,减少事务大小,如我们将chunk大小调整为1M,则每个线程每次导入的数据量也相应减少
util.import_table(
[
“/data/mysql/3306/tmp/sbtest1.csv”,
],
{
“schema”: “demo”,
“table”: “sbtest1”,
“dialect”: “csv-unix”,
“skipRows”: 0,
“showProgress”: True,
“characterSet”: “utf8mb4”,
“threads”: “4”,
“bytesPerChunk”: “1M”,
“maxRate”: “2M”
})
– import_table语句
util.import_table(
[
“/data/mysql/3306/tmp/sbtest1.csv”,
],
{
“schema”: “demo”,
“table”: “sbtest1”,
“dialect”: “csv-unix”,
“skipRows”: 0,
“showProgress”: True,
“characterSet”: “utf8mb4”
})
mosfet驱动芯片 https://www.zg886.cn