之前写过一篇文章聊聊在集群环境中本地缓存如何进行同步,今天聊的话题看着和那篇文章有点雷同,不过我们今天重点会放在方法论上,也不会拘泥于具体实现。在聊这个话题之前,大家可以思考一下,如果要实现多个实例数据同步触发,大家会怎么做?脑海里,是会浮现,我可以用消息队列或者定时器来实现?这种已经具象化的技术细节?还是进一步进行拆解?
假设大家已经思考好,我来说下我个人的思考逻辑。今天标题的内容,主要讲同步如何触发?内容已经圈定死,因此就不谈数据同步涉及的一致性,只谈如何触发这个动作。多节点实例触发的关键是,一旦触发,各个节点都要通知到位。那如何进行多个节点通知呢?答案就是通过广播。那如何感知是否通知到位呢?这个还真不好搞,那我们换个思路,如果通知不到位,我们的措施会是啥?正常我们的思路,会是通过补偿机制。
今天我们聚焦在广播这个动作,补偿机制暂不在本文讨论。下面通过一个案例实操下
从图中,我们会发现本案例是通过一个中间件来实现。那这个中间件是啥?是rocketmq、kafka还是其他具有广播功能的组件或者服务?答案是也不是。怎么说?我们这个中间件,其实是一层高层广播抽象,而非具体实现
1、定义高层广播抽象接口
@FunctionalInterface public interface DataSyncTrigger { void broadcast(Object data); }
2、定义通知事件类
注: 本文会采用spring的事件监听模式实现
public class DataSyncTriggerEvent extends ApplicationEvent { /** * Create a new ApplicationEvent. * * @param source the object on which the event initially occurred (never {@code null}) */ public DataSyncTriggerEvent(Object source) { super(source); } }
3、定义高层抽象广播的模板基类
@RequiredArgsConstructor public abstract class BaseDataSyncTrigger implements DataSyncTrigger, ApplicationContextAware { protected ApplicationContext applicationContext; protected final DataSyncTriggerProperty dataSyncTriggerProperty; @Override public void broadcast(Object data) { DataSyncTriggerEvent dataSyncTriggerEvent = new DataSyncTriggerEvent(data); applicationContext.publishEvent(dataSyncTriggerEvent); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } private Collection<DataSyncTriggerCallBack> listDataSyncTriggerCallBacks(){ try { Map<String, DataSyncTriggerCallBack> dataSyncTriggerCallBackMap = applicationContext.getBeansOfType(DataSyncTriggerCallBack.class); return Collections.unmodifiableList(dataSyncTriggerCallBackMap.values().stream().collect(Collectors.toList())); } catch (BeansException e) { } return Collections.emptyList(); } public void callBack(Object data){ Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks = listDataSyncTriggerCallBacks(); if(CollectionUtil.isNotEmpty(dataSyncTriggerCallBacks)){ if(dataSyncTriggerProperty.isTriggerCallBackAsync()){ callbackAsync(data, dataSyncTriggerCallBacks); }else{ callbackSync(data, dataSyncTriggerCallBacks); } } } private void callbackSync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) { for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) { dataSyncTriggerCallBack.execute(data); } } private void callbackAsync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) { for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) { ThreadUtil.execAsync(()->{ dataSyncTriggerCallBack.execute(data); }); } } }
4、定义抽象回调接口【扩展点】
当业务收到通知,可以通过该回调接口进行具体业务操作
@FunctionalInterface public interface DataSyncTriggerCallBack { void execute(Object data); }
5、定义具体广播实现类
注: 这个广播的具体实现方案就很多了,只要天生具备广播能力或者基于原来特性扩展出广播的组件都可以,比如rocketmq的广播机制、redis的pubsub机制、zookeeper的分布式协调能力、基于注册中心服务发现能力改造出来的广播能力等。本文就以redis的pubsub机制为例
Slf4j public class RedisDataSyncTrigger extends BaseDataSyncTrigger implements CommandLineRunner { private final RedisTemplate redisTemplate; public RedisDataSyncTrigger(RedisTemplate redisTemplate, DataSyncTriggerProperty dataSyncTriggerProperty) { super(dataSyncTriggerProperty); this.redisTemplate = redisTemplate; } @EventListener public void listener(DataSyncTriggerEvent dataSyncTriggerEvent){ SyncDataDTO syncDataDTO = SyncDataDTO.builder() .data(dataSyncTriggerEvent.getSource()) .timeStamp(System.currentTimeMillis()) .build(); try { redisTemplate.convertAndSend(REDIS_CHANNEL_KEY, syncDataDTO); } catch (Exception e) { log.error("redis publish channel 【" + REDIS_CHANNEL_KEY + "】 fail,cause:" + e.getMessage(),e); } } @Override public void run(String... args) throws Exception { doSubscribe(); } @SneakyThrows private void doSubscribe() { RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); RedisMessageListener redisMessageListener = applicationContext.getBean(RedisMessageListener.class); connection.subscribe(redisMessageListener,REDIS_CHANNEL_KEY.getBytes("utf-8")); log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Register listen channel : 【{}】",REDIS_CHANNEL_KEY); } }
具体redis订阅监听实现
@RequiredArgsConstructor @Slf4j public class RedisMessageListener implements MessageListener{ private final BaseDataSyncTrigger baseDataSyncTrigger; private final RedisTemplate redisTemplate; @Override public void onMessage(Message message, byte[] pattern) { byte[] body = message.getBody(); String dataJson = StrUtil.str(body, "utf-8"); if(JSONUtil.isJson(dataJson)){ try { SyncDataDTO dataDTO = (SyncDataDTO) redisTemplate.getHashValueSerializer().deserialize(body); baseDataSyncTrigger.callBack(dataDTO.getData()); } catch (Exception e) { log.error(e.getMessage(),e); } }else{ log.warn(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data 【{}】 is not match json format !!!",dataJson); } } }
6、测试验证
a、编写业务逻辑类
@Service @RequiredArgsConstructor @Slf4j public class DataService { private List<Object> dataList = new CopyOnWriteArrayList<>(); private final RedisTemplate redisTemplate; private final BaseDataSyncTrigger dataSyncTrigger; public boolean add(String data){ try { Long count = redisTemplate.opsForList().leftPush(RedisConstant.REDIS_LIST_KEY, data); if(count > 0){ dataSyncTrigger.broadcast(data); return true; } } catch (Exception e) { log.error("add fail:" + e.getMessage(),e); } return false; } public List<Object> getDataList(){ return dataList; } }
b、编写业务控制器
@RestController @RequestMapping("data") @RequiredArgsConstructor public class DataController { private final DataService dataService; @GetMapping("add/{data}") public String syncData(@PathVariable("data") String data){ boolean isSuccess = dataService.add(data); return isSuccess ? "success" : "fail"; } @GetMapping("list") public List<Object> listData(){ return dataService.getDataList(); } }
c、编写业务回调类
@Component @RequiredArgsConstructor @Slf4j public class LocalListDataSyncTriggerCallBack implements DataSyncTriggerCallBack { private final DataService dataService; @Override public void execute(Object data) { dataService.getDataList().add(data); log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sync data:-->{}",data); } }
d、小细节
注: 当项目重启时,本地存储容器是没内容的,因此需要在项目重启时,写一个钩子,从其他缓存介质将数据刷到本地存储中
@Component @RequiredArgsConstructor @Slf4j public class DataInitTask implements CommandLineRunner { private final RedisTemplate redisTemplate; private final DataService dataService; @Override public void run(String... args) throws Exception { List redisDataList = redisTemplate.opsForList().range(RedisConstant.REDIS_LIST_KEY, 0, -1); if(CollectionUtil.isNotEmpty(redisDataList)){ dataService.getDataList().addAll(redisDataList); log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Loaded data from redis finished!!!"); } } }
e、测试
从一个节点(示例:54860端口)添加数据,如图
观察其他节点(示例:59829端口)本地存储是否接收到数据
从图可以发现已经收到数据,同时我们观察控制台
可以看出业务回调已经触发
本文介绍了通过redis pubsub实现广播效果,示例代码中也提供基于注册中心以及配置中心apollo来实现广播的效果。基于篇幅就不再论述了,感兴趣的朋友,可以查看下方demo链接。本文除了介绍多个节点实例数据同步如何触发之外,其实还有实现一个通用组件套路原则–依赖倒置原则。高层定义抽象,程序依赖高层抽象,也不依赖具体实现,这样后续才比较好扩展