业务线与系统越来越多,系统或业务间数据同步需求也越频繁。当前互联网业务系统大多MySQL数据存储与处理方案:
因此,公司亟需一套灵活易用的系统间数据同步与处理方案,让特定业务数据可很方便在其他业务或组件间流转,助推业务快速迭代。
当前业界针对系统数据同步较常见的方案有同步双写、异步双写、侦听binlog等方式,各有优劣。本文以MySQL同步到ES案例讲解。
最简单方案,在将数据写到MySQL时,同时将数据写到ES,实现数据双写。
在同步双写基础加个MQ,实现异步写。
第二种方案基础上,主要解决业务耦合问题,所以引入数据变动自动监测与处理机制。
基础组件的设计主要考虑尽量做到对业务无侵入,业务接入无感知,同时系统耦合度低,综上选型方案三,同时考虑该方案在可复用和可扩展还存在短板,所以在此基础又做优化。
需求数据源都是MySQL,所以先考虑选择组件对MySQL数据变动做实时监听,业界成熟方案最熟悉的就是[canal],功能完善度,社区活跃度,稳定性等都符合。所以,基于canal对方案三优化,以满足多系统数据同步,达到业务解耦、可复用、可扩展。
通过统一的“消息分发服务”实现与Canal Client对接,并将消息按统一格式分发到不同MQ集群,通过统一的“消息消费服务”去消费消息并回调业务接口,业务系统无需关注数据流转,只需关注特定业务的数据处理和数据组装。
“消息分发服务”和“消息消费服务”对各业务线,实现了数据流转过程中的功能复用。“消息消费服务”中的可分发到不同的MQ集群,和“消息消费服务”中的配置指定数据源输出实现了功能扩展。
将数据的订阅与数据的消费通过MQ进行解耦,“数据订阅消息分发服务”的职责是对接Canal Client,解析数据变更消息,转换为常用的JSON格式的消息报文,按照业务配置规则分发到不同的MQ集群、路由。
Canal主要是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费:
从Canal服务高可用设计可见,Canal Client当有多个实例启动时,会保证只有一个实例在运行,消费binlog消息。而承载Canal Client的"数据订阅消息分发服务"会部署在多台服务器,由于服务发布时每台服务器启动时间不同,所有Canal Client活跃实例都会集中在先启动的那台服务器运行,消费binlog消息。
其余服务器运行的Canal Client都处备用状态,不能充分利用每台服务器资源。因此希望不同destination分摊在不同服务器执行,但所在服务器宕机时会自动转移到其他服务器执行,这样充分利用每一台服务器,提供binlog消息消费性能。
为此,引入elasticjob-lite组件,利用分片特性二次封装,实现侦听destination在某台服务器中上下线的变更事件。
ElasticJob 中任务分片项的概念,使任务可在分布式环境运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更。
若作业分 4 片,用两台服务器执行,则每个服务器分到 2 片:
新增Job服务器时,ElasticJob 会通过注册中心的临时节点的变化感知到新服务器,并在下次任务调度时重分片,新服务器会承载一部分作业分片:
当作业服务器在运行中宕机时,注册中心同样会通过临时节点感知,并将在下次运行时将分片转移至仍存活的服务器,以达到作业高可用。本次由于服务器宕机而未执行完的作业,则可以通过失效转移的方式继续执行。
该系统使用方包含公司各业务线,如何保障线上问题后,各业务不相互影响。
在MQ集群和队列级别都支持基于业务的资源隔离;将从canal中拉取出来的变更消息,按规则分发到不同MQ集群,设置统一路由键规则, 以便各业务在对接时申请自己业务的MQ队列,按需绑定对应MQ集群和消息路由。
通过配置将不同的destination映射到不同的MQ集群和ZK集群,可达到性能横向扩展。
canal从binlog中获取消息后,将批量消息拆分成单条消息,进行分片规则运算后发送到指定rabbitmq交换机和路由键,以便根据不同业务场景,按不同业务规则绑定到不同队列,通过消费服务进行消息消费处理,同时会建立一个名为“exchange.canal”的exchange,类型为 topic,路由键规则:key.canal.{destination}.{database}.{table}.{sharding},sharding按pkName-value排序后的hashcode取模分片,队列命名规则约定:queue.canal.{appId}.{bizName} 如:
queue.canal.trade_center.order_search.0 绑定 key.canal.dev-instance.trade_order.order_item.0 queue.canal.trade_center.order_search.0 绑定 key.canal.dev-instance.trade_order.order_extend.0 ...
为实现消息的消费与业务系统解耦,独立出"数据订阅消费服务”。消费从”数据订阅消息分发服务“中投递的数据变更MQ消息,根据业务配置回调指定的业务回调接口。业务回调接口负责接收数据变更消息,组装需要执行的ES文档信息,返回给消费服务进行ES数据操作。
从binlog订阅的消息有3类操作:INSERT,UPDATE,DELETE,这里新增一个SELECT指令,作用是业务回调接口在收到该指令后,从数据库中重新获取最新的数据组装成需要执行的ES文档信息,返回给消费服务进行ES数据操作。
主要应用在全量同步,部分同步,文档刷新,消息补偿等场景。
新的业务功能上线时,会配置对应的队列绑定相关的路由键,订阅到业务场景需要的数据变更的消息。为避免每次有新业务接入需要重新更新消费服务代码,重新发布服务,需实现能定时加载配置表数据,实现动态添加MQ队列侦听的功能。
使用SimpleMessageListenerContainer容器设置消费队列的动态监听。为每个MQ集群创建一个SimpleMessageListenerContainer实例,并动态注册到Spring容器。
一个业务通常对应一个ES索引,一或多个MQ队列(队列绑定路由键的规则见: MQ消息分片规则):
一个queue,有多个consumer去消费, 因为无法保证先读到消息的 consumer 一定先完成操作,所以可能导致顺序错乱。因为不同消息都发到了一个queue,然后多个消费者又消费同一个queue的消息。为此,可创建多个queue,每个消费者只消费一个queue, 生产者按规则把消息放入同一queue(见:3.4.4.2 MQ消息分片规则),这样同一个消息就只会被同一个消费者顺序消费。
服务通常集群部署,天然每个queue就会有多个consumer。为解决这问题引入elasticjob-lite对MQ分片,如有2个服务实例,5个队列,可让实例1消费队列1、2、3,让实例2消费队列4、5。当其中有一个实例1挂掉时会自动将队列1、2、3的消费转移到实例2上,当实例1重启启动后队列1、2、3的消费会重新转移到实例1。
RabbitMQ消费顺序错乱原因通常是队列消费是单机多线程消费或消费者是集群部署,由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。如消费者A执行增加,消费者B执行修改,消费者C执行删除,但消费者C执行比消费者B快,消费者B又比消费者A快,导致消费 binlog 执行到ES时顺序错乱,本该增加、修改、删除,变成删除、修改、增加。
对此,可给 RabbitMQ 创建多个 queue,每个消费者单线程固定消费一个 queue 的消息,生产者发送消息的时候,同一个单号的消息发送到同一个 queue 中,由于同一个 queue 的消息有序,那同一单号的消息就只会被一个消费者顺序消费,从而保证消息顺序性:
但如何保证集群模式下,一个队列只在一台机器上进行单线程消费,若这台机器宕机如何进行故障转移。 对此,引入elasticjob-lite对MQ分片,如有2个服务实例,5个队列,我们可以让实例1消费队列1、2、3,让实例2消费队列4、5。当其中有一个实例1挂掉时会自动将队列1、2、3的消费转移到实例2上,当实例1重启启动后队列1、2、3的消费会重新转移到实例1。
对消息顺序消费敏感的业务场景,通过队列分片提升整体并发度。对消息顺序消费不敏感业务场景也可配置成某队列集群消费或单机并发消费。针对不同的业务场景合理选择不同的配置方案,提升整体性能。
通过Canal获取的变更消息只能满足增量订阅数据的业务场景,然而我们通常我们还需要进行一次全量的历史数据同步后增量数据的订阅才会有意义。对于业务数据表的id是自增模式时,可以通过给定一个最小id值,最大id值,然后进行切片,如100个一片,生成MQ报文,发送到MQ中。消费MQ消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
有的时候我们需要修复指定的数据,或业务表的id是非自增模式的,需要进行全量同步。可以通过部分同步的接口,指定一组需要同步的id列表,生成分片MQ报文,发送到MQ中。消费服务接收到同步MQ消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
当我们ES索引中有大批量的数据异常,需要重新刷新ES索引数据时,可以通过生成一个全量同步的任务,分页获取指定ES索引的文档ID列表,模拟生成部分同步消息报文,发送到MQ中。消费MQ消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
将同步失败的消息存储到消息重试表中,通过Job执行补偿,便于监控。补偿时将消息重置为 SELECT 类型的MQ报文。业务回调接口接收到消息后会从数据库中获取最新的数据更新ES文档。
目前ES官方推荐使用的客户端是RestHighLevelClient,我们在此基础上进行了二次封装开发,主要从扩展性和易用性方面考虑。
对于一些有数据隔离需求的业务场景,我们提供了一个ES数据隔离插件。在ES SDK中设计了一个搜索过滤器的接口,采用拦截器的方式对统计文档,搜索文档等方法的搜索条件参数进行拦截过滤。
/** * 搜索过滤器 */ public interface SearchSourceBuilderFilter { String getFilterName(); void filter(SearchSourceBuilder searchSourceBuilder); }
如果发现Canal Client 长时间获取不到binlog消息,可以去Canal Admin 后台去看一下Instance管理中的日志。大概率会出现“could not find first log file name in binary log index file”,这个是因为zk集群中缓存了binlog信息导致拉取的数据不对,包括定义了binlog position但是启动服务后不对也是同样的原因。
解决:
ES的Update By Query对应的就是关系型数据库的update set … where…语句;该命令在ES上支持得不是很成熟,可能会导致的问题有:批量更新时非事务模式执行(允许部分成功部分失败)、大批量操作会超时、频繁更新会报错(版本冲突)、脚本执行太频繁时又会触发断路器等。我们的解决办法也比较简单,直接在生产环境放弃使用updateByQuery方法,配置成使用先查询出符合条件的数据,然后分发到MQ中单条分别更新的模式。
关注我,紧跟本系列专栏文章,咱们下篇再续!
作者简介:魔都国企技术专家兼架构,多家大厂后台研发和架构经验,负责复杂度极高业务系统的模块化、服务化、平台化研发工作。具有丰富带团队经验,深厚人才识别和培养的积累。
参考: