在微服务和云原生愈发流行的今天,数据的分布也愈发脱离单库单机而更加复杂,使用的数据库类型也会更多,但业务的复杂依然会带来了大量的数据查询和导出需求,而很多时候我们很难为数据量的大部分系统创建完整的BI数仓系统,这时候你是不是觉得为这些需求查询和导出数据就会是一个十分困难且耗时的工作?Syncany-SQL就是这样一个工具,来在不依赖数据库的情况下完成不同库表、不同机器和不同数据库类型间直接关联查询和聚合计算后直接导出到常用文件的工具。
直接在本地运行MySQL语法结构的SQL的执行引擎,只使用检查查询从常用MySQL、MongoDB、PostgreSQL、sqlserver、elasticsearch、influxdb、clickhouse、sqlite数据库加载数据及读取execl、csv、json和普通文本文件后,在内存中完成join和聚合运算,单条SQL的执行结果可以临时保存在内存中以便作为后续SQL的输入表继续处理,并且INSERT INTO语句执行指定”仅插入 I“、”存在更新否则插入 UI“、”存在更新否则插入其余删除 UDI“、”删除后插入 DI“四种合并数据模式,针对大数据量也可以分批执行。
整个程序共有两个项目组成,其中
https://github.com/snower/syncany-sql 负责解析SQL
https://github.com/snower/syncany 负责执行
可使用pip直接安装
pip3 install syncanysql # 执行成功后,执行 echo "select now(), 'hello world';" | syncany-sql # 看到正确输出即为安装成功
因大多数情况使用数据库Driver很少,所以默认不安装数据库Driver,需要依据自己使用的数据库类型自行安装数据库Driver
pip3 install pymongo>=3.6.1 pip3 install PyMySQL>=0.8.1 pip3 install openpyxl>=2.5.0 pip3 install psycopg2>=2.8.6 pip3 install elasticsearch>=6.3.1 pip3 install influxdb>=5.3.1 pip3 install clickhouse_driver>=0.1.5 pip3 install redis>=3.5.3 pip3 install pymssql>=2.2.7
配置文件支持json和yaml格式,默认加载当前目录的"config.[json|yaml]”和用户目录下"~/.syncany/config.[json|yaml]"文件,当前目录配置文件优先级高于用户目录,合并配置项后为最终加载配置。
详细介绍可查询示例配置文件信息 https://github.com/snower/syncany-sql/blob/main/docs/configure.md
配置完成后,命令行启动时会自动加载配置文件,并且添加的数据库信息将会在真正使用到的时候才会发起连接,并不会在程序启动就会尝试连接配置的数据库。
# 我们在配置文件中添加了以下数据库配置 databases: - name: test_db driver: mysql host: '127.0.0.1' port: 3306 user: 'root' passwd: '123456' db: 'test' charset: 'utf8mb4' # 编写SQL是,假如我们需要查询127.0.0.1的test库,我们应该这样编写SQL select * from test_db;
-- 查询访问量最高的3个IP SELECT seg0 AS ip, COUNT(*) AS cnt FROM `file://data/access.log?sep= ` GROUP BY seg0 ORDER BY cnt DESC LIMIT 3;
# 一个查询json文件的例子 https://github.com/snower/syncany-sql/tree/main/examples/demo SELECT a.site_id, b.name AS site_name, IF(c.site_amount > 0, c.site_amount, 0) AS site_amount, MAX(a.timeout_at) AS timeout_at, MAX(a.vip_timeout_at) AS vip_timeout_at, now() as `created_at?` FROM (SELECT YIELD_DATA(sites) AS site_id, IF(vip_type = '2', GET_VALUE(rules, 0, 'timeout_time'), '') AS timeout_at, IF(vip_type = '1', GET_VALUE(rules, 0, 'timeout_time'), '') AS vip_timeout_at FROM `data/demo.json` WHERE start_date >= '2021-01-01') a JOIN `data/sites.json` b ON a.site_id = b.site_id JOIN (SELECT site_id, SUM(amount) AS site_amount FROM `data/orders.json` WHERE status <= 0 GROUP BY site_id) c ON a.site_id = c.site_id GROUP BY a.site_id;
更多常用例子可查看 https://github.com/snower/syncany-sql/tree/main/examples
from syncanysql import ScriptEngine with ScriptEngine() as engine: # 执行SQL engine.execute(''' INSERT INTO `top_ips` SELECT ip, cnt FROM (SELECT seg0 AS ip, COUNT(*) AS cnt FROM `file:///var/log/nginx/access.log?sep= ` GROUP BY seg0) a ORDER BY cnt DESC LIMIT 3; ''') # 获取执行结果 print(engine.pop_memory_datas("top_ips"))
因为直接可以把查询结果通过insert into语句直接写入csv或excel文件,以及可以非常方便的直接在不同库表、不同机器和不同数据库类型间直接join和进行聚合运算,也可以方便的使用内存保存中间结果,所以可以大幅简化编写复杂SQL和数据导出的效率。
在指定insert into数据合并类型后可以非常方便的用于数据同步,在数据量级不高的系统中,配合superset之类的图表系统可以非常快的完成BI系统搭建,命令行的执行模式也可以使用Jenkins或者云函数之类的系统大幅提高稳定性或动态扩展能力。