在使用dubbo的时候,发现当消费者启动的时候,如果提供者没有启动,即使提供者后来启动了,消费者也调不通提供者提供的接口了。
注册中心使用都是nacos
dubbo版本是3.0.4
接口
public interface DemoService { String sayHello(); }
提供者
@DubboService public class DemoServiceImpl implements DemoService { @Override public String sayHello() { return "hello"; } } @EnableDubbo @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) public class ReferenceCheckProviderStarter { public static void main(String[] args) { new SpringApplicationBuilder(ReferenceCheckProviderStarter.class) .web(WebApplicationType.NONE) // .REACTIVE, .SERVLET .run(args); System.out.println("dubbo service started"); } }
消费者
@EnableDubbo @RestController @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) public class ReferenceCheckConsumerStarter { @DubboReference private DemoService demoService; @GetMapping("/dubbo/nacos/test") public Object test() { return demoService.sayHello(); } public static void main(String[] args) { SpringApplication.run(ReferenceCheckConsumerStarter.class, args); } }
1. 先启动provider,再启动consumer
a. 启动provider
nacos出现provider的服务
b. 启动consumer
nacos出现consumer的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回 hello
c. 终止provider
nacos上provider的服务消失了
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回 No provider available from registry
d. 重新启动provider
nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回 hello
可以看出:先启动provider,再启动consumer,整个过程是没问题。
2. 先启动consumer,再启动provider
a. 启动consumer
nacos出现consumer的服务,但立即又消失了
b. 启动provider
nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回 Directory already destroyed .
可以看出:当consumer先启动时,如果provider此时没有启动,consumer就再也访问不到provider的服务了。
3. 先启动consumer,再启动provider (check=false)
修改一下注解@DubboRefere
的参数
@DubboReference(check = false) private DemoService demoService;
a. 启动consumer
nacos出现consumer的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回 No provider available from registry
b. 启动provider
nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回 hello
可以看出:即使是consumer先启动,当provider启动后,consumer还是能够访问到provider的服务的。
org.apache.dubbo.rpc.RpcException: No provider available from registry
public class RegistryDirectory<T> extends DynamicDirectory<T> { @Override public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)."); } // ...... } }
public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> { String EMPTY_PROTOCOL = "empty"; private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty url list to clear address."); this.originalUrls = invokerUrls; if (invokerUrls.size() == 0) { logger.info("Received empty url list..."); this.forbidden = true; // Forbid to access // 这里 this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow accessing // 这里 if (CollectionUtils.isEmpty(invokerUrls)) { return; } // can't use local reference because this.urlInvokerMap might be accessed at isAvailable() by main thread concurrently. Map<String, Invoker<T>> oldUrlInvokerMap = null; if (this.urlInvokerMap != null) { // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing. oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + this.urlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR)); this.urlInvokerMap.forEach(oldUrlInvokerMap::put); } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map // 这里 logger.info("Refreshed invoker size " + newUrlInvokerMap.size()); if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")")); return; } List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; // 这里 if (oldUrlInvokerMap != null) { try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } // notify invokers refreshed this.invokersChanged(); } private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls) { // mock zookeeper://xxx?mock=return null if (enableConfigurationListen) { overrideDirectoryUrl(); } refreshInvoker(instanceUrls); // 这里 } }
public class RegistryDirectory<T> extends DynamicDirectory<T> { @Override public synchronized void notify(List<URL> urls) { if (isDestroyed()) { return; } Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(this::judgeCategory)); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); // providers List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); // 3.x added for extend URL address ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class); List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); if (supportedListeners != null && !supportedListeners.isEmpty()) { for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); } } refreshOverrideAndInvoker(providerURLs); // 这里 } } public abstract class AbstractRegistry implements Registry { /** * Notify changes from the Provider side. * * @param url consumer side url * @param listener listener * @param urls provider latest urls */ protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size()); } // keep every provider's category. Map<String, List<URL>> result = new HashMap<>(); // 这里 for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getCategory(DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); // 这里 categoryList.add(u); // 这里 } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); listener.notify(categoryList); // 这里 // We will update our cache file after each notification. // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL. if (localCacheEnabled) { saveProperties(url); } } } }
public class NacosRegistry extends FailbackRegistry { private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) { List<Instance> enabledInstances = new LinkedList<>(instances); if (enabledInstances.size() > 0) { // Instances filterEnabledInstances(enabledInstances); } List<URL> urls = toUrlWithEmpty(url, enabledInstances); NacosRegistry.this.notify(url, listener, urls); // 这里 } String EMPTY_PROTOCOL = "empty"; private List<URL> toUrlWithEmpty(URL consumerURL, Collection<Instance> instances) { List<URL> urls = buildURLs(consumerURL, instances); if (urls.size() == 0) { // 这里 URL empty = URLBuilder.from(consumerURL) .setProtocol(EMPTY_PROTOCOL) .addParameter(CATEGORY_KEY, DEFAULT_CATEGORY) .build(); urls.add(empty); } return urls; } }
当没有可用的服务时,instances是空的
当有可用的服务时,instances是不为空的
是怎么通知的
public class ServiceInfoHolder implements Closeable { public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (isEmptyOrErrorPush(serviceInfo)) { //empty or error push, just ignore return oldService; } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); boolean changed = isChangedServiceInfo(oldService, serviceInfo); if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) { serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo)); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { // 这里 NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); // 这里 DiskCache.write(serviceInfo, cacheDir); } return serviceInfo; } } public class DefaultPublisher extends Thread implements EventPublisher { private BlockingQueue<Event> queue; @Override public void init(Class<? extends Event> type, int bufferSize) { setDaemon(true); setName("nacos.publisher-" + type.getName()); this.eventType = type; this.queueMaxSize = bufferSize; this.queue = new ArrayBlockingQueue<>(bufferSize); // 这里 start(); } @Override public boolean publish(Event event) { checkIsStart(); boolean success = this.queue.offer(event); // 这里 if (!success) { LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); receiveEvent(event); return true; } return true; } @Override public void run() { openEventHandler(); } void openEventHandler() { try { // This variable is defined to resolve the problem which message overstock in the queue. int waitTimes = 60; // To ensure that messages are not lost, enable EventHandler when // waiting for the first Subscriber to register for (; ; ) { if (shutdown || hasSubscriber() || waitTimes <= 0) { break; } ThreadUtils.sleep(1000L); waitTimes--; } for (; ; ) { if (shutdown) { break; } final Event event = queue.take(); // 这里 receiveEvent(event); // 这里 UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); } } catch (Throwable ex) { LOGGER.error("Event listener exception : ", ex); } } void receiveEvent(Event event) { final long currentEventSequence = event.sequence(); if (!hasSubscriber()) { LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber."); return; } // Notification single event listener for (Subscriber subscriber : subscribers) { // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass()); continue; } // Because unifying smartSubscriber and subscriber, so here need to think of compatibility. // Remove original judge part of codes. notifySubscriber(subscriber, event); // 这里 } } @Override public void notifySubscriber(final Subscriber subscriber, final Event event) { LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber); final Runnable job = () -> subscriber.onEvent(event); final Executor executor = subscriber.executor(); if (executor != null) { executor.execute(job); // 这里 } else { try { job.run(); // 这里 } catch (Throwable e) { LOGGER.error("Event callback exception: ", e); } } } } public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> { @Override public void onEvent(InstancesChangeEvent event) { String key = ServiceInfo .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters()); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); if (CollectionUtils.isEmpty(eventListeners)) { return; } for (final EventListener listener : eventListeners) { final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event); if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) { ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent)); // 这里 } else { listener.onEvent(namingEvent); // 这里 } } } } public class NacosRegistry extends FailbackRegistry { @Override public void onEvent(Event event) { if (event instanceof NamingEvent) { NamingEvent e = (NamingEvent) event; notifier.notify(e.getInstances()); // 这里 } } } public abstract class RegistryNotifier { public synchronized void notify(Object rawAddresses) { this.rawAddresses = rawAddresses; long notifyTime = System.currentTimeMillis(); this.lastEventTime = notifyTime; long delta = (System.currentTimeMillis() - lastExecuteTime) - delayTime; // more than 10 calls && next execute time is in the future boolean delay = shouldDelay.get() && delta < 0; if (delay) { scheduler.schedule(new NotificationTask(this, notifyTime), -delta, TimeUnit.MILLISECONDS); // 这里 } else { // check if more than 10 calls if (!shouldDelay.get() && executeTime.incrementAndGet() > DEFAULT_DELAY_EXECUTE_TIMES) { shouldDelay.set(true); } scheduler.submit(new NotificationTask(this, notifyTime)); // 这里 } } public static class NotificationTask implements Runnable { private final RegistryNotifier listener; private final long time; public NotificationTask(RegistryNotifier listener, long time) { this.listener = listener; this.time = time; } @Override public void run() { try { if (this.time == listener.lastEventTime) { listener.doNotify(listener.rawAddresses); // 这里 listener.lastExecuteTime = System.currentTimeMillis(); synchronized (listener) { if (this.time == listener.lastEventTime) { listener.rawAddresses = null; } } } } catch (Throwable t) { logger.error("Error occurred when notify directory. ", t); } } }} } public class NacosRegistry extends FailbackRegistry { private class RegistryChildListenerImpl implements EventListener { private RegistryNotifier notifier; public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener) { notifier = new RegistryNotifier(getUrl(), NacosRegistry.this.getDelay()) { @Override protected void doNotify(Object rawAddresses) { List<Instance> instances = (List<Instance>) rawAddresses; if (isServiceNamesWithCompatibleMode(consumerUrl)) { /** * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899 */ NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances); instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName); } NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances); // 这里 } }; } }
然后就调用了上面的👆🏻
什么时候添加监听器的?
public class NacosRegistry extends FailbackRegistry { private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener) throws NacosException { EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener); // 这里 namingService.subscribe(serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP), eventListener); // 这里 } private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) { try { if (isServiceNamesWithCompatibleMode(url)) { List<Instance> allCorrespondingInstanceList = Lists.newArrayList(); /** * Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899 * * namingService.getAllInstances with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl} * default {@link DEFAULT_GROUP} * * in https://github.com/apache/dubbo/issues/5978 */ for (String serviceName : serviceNames) { List<Instance> instances = namingService.getAllInstances(serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP)); NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances); allCorrespondingInstanceList.addAll(instances); } notifySubscriber(url, listener, allCorrespondingInstanceList); for (String serviceName : serviceNames) { subscribeEventListener(serviceName, url, listener); // 这里 } } else { for (String serviceName : serviceNames) { List<Instance> instances = new LinkedList<>(); instances.addAll(namingService.getAllInstances(serviceName , getUrl().getGroup(Constants.DEFAULT_GROUP))); String serviceInterface = serviceName; String[] segments = serviceName.split(SERVICE_NAME_SEPARATOR, -1); if (segments.length == 4) { serviceInterface = segments[SERVICE_INTERFACE_INDEX]; } URL subscriberURL = url.setPath(serviceInterface).addParameters(INTERFACE_KEY, serviceInterface, CHECK_KEY, String.valueOf(false)); notifySubscriber(subscriberURL, listener, instances); subscribeEventListener(serviceName, subscriberURL, listener); } } } catch (Throwable cause) { throw new RpcException("Failed to subscribe " + url + " to nacos " + getUrl() + ", cause: " + cause.getMessage(), cause); } } }
org.apache.dubbo.rpc.RpcException: Directory already destroyed
public abstract class AbstractDirectory<T> implements Directory<T> { @Override public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } return doList(invocation); } @Override public void destroy() { destroyed = true; // 这里 } }
public class ReferenceConfig<T> extends ReferenceConfigBase<T> { private void checkInvokerAvailable() throws IllegalStateException { if (shouldCheck() && !invoker.isAvailable()) { invoker.destroy(); // 这里 throw new IllegalStateException("Should has at least one way to know which services this interface belongs to," + " subscription url: " + invoker.getUrl()); } } protected synchronized void init() { // ...... checkInvokerAvailable(); // 这里 } }
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig { public boolean shouldCheck() { checkDefault(); Boolean shouldCheck = isCheck(); // 这里 if (shouldCheck == null && getConsumer() != null) { shouldCheck = getConsumer().isCheck(); } if (shouldCheck == null) { // default true // 这里 shouldCheck = true; } return shouldCheck; } }
public class RegistryDirectory<T> extends DynamicDirectory<T> { @Override public boolean isAvailable() { if (isDestroyed() || this.forbidden) { // 这里 return false; } Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap; // 这里 return CollectionUtils.isNotEmptyMap(localUrlInvokerMap) && localUrlInvokerMap.values().stream().anyMatch(Invoker::isAvailable); } }
如果没有设置check
字段,那么就会在启动的时候检查提供方是否可用,如果不可用,就销毁了。