客户端会调用/nacos/v1/ns/instance/beat接口进行心跳,主要逻辑有:
1.如果在Nacos Server没有找到相对应的Instance,那么就构造一个Instance,源码如下:
// com.alibaba.nacos.naming.controllers.InstanceController#beat @CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { // 省略其它代码。。。。。。。 // 获取Instance Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); // 如果Instance为null,那么就构造一个Instance进行注册 if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); // 把Client注册到Nacos serviceManager.registerInstance(namespaceId, serviceName, instance); } Service service = serviceManager.getService(namespaceId, serviceName); // 省略其它代码。。。。。。。 // 处理Client端的心跳 service.processClientBeat(clientBeat); // 省略其它代码。。。。。。。 return result; }
2.如果在Nacos Server中有当前实例,那么就更新一下lastBeat、healthy等,源码如下:
// com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor#run @Override public void run() { // 省略其它代码。。。。。。。 for (Instance instance : instances) { // 从Service中找到需要更新的Instance if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } // 把当前时间设置到Insance的lastBeat中 instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { // 更新一下健康状态 instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } } }
服务端也会启动线程去检测客户端心跳信息,来判断客户端是否存活,Nacos是怎么启动心跳检测的,怎么心跳检测的?
请看源码:
// com.alibaba.nacos.naming.core.Service#init // 在注册Instance的时候会调用这个方法 public void init() { // HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } } // com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck public static void scheduleCheck(ClientBeatCheckTask task) { // 这里会启动一个定时任务来检测心跳,按照的是固定频率5s // 其实这里最后采用的JDK的scheduleWithFixedDelay futureMap.computeIfAbsent(task.taskKey(), k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS)); } // com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run @Override public void run() { try { // 判断是否是当前Nacos实例来启动心跳检测 if (!getDistroMapper().responsible(service.getName())) { return; } if (!getSwitchDomain().isHealthCheckEnabled()) { return; } List<Instance> instances = service.allIPs(true); // first set health status of instances: // 检测实例的健康状态 for (Instance instance : instances) { // 判断心跳是否超时,默认超时时间为15秒 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { // 如果心跳超时,首先把此服务的健康状态设置为false instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); // 发布一个服务变更事件 getPushService().serviceChanged(service); // 发布一个心跳超时事件 ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: // 删除过时的实例(默认30s) for (Instance instance : instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } }
默认的心跳超时时间为15s,如果发现Instance中的lastBeat的时间与当前时间对比,小于当前时间15s以上那么就判断Instance为不健康的状态,首先会设置Instance中的healthy为false,然后发布一个服务变更事件,再发布一个心跳超时事件。