public interface DemoService { String sayHello(); }
@DubboService public class DemoServiceImpl implements DemoService { @Override public String sayHello() { return "hello"; } } @EnableDubbo @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) public class ReferenceCheckProviderStarter { public static void main(String[] args) { new SpringApplicationBuilder(ReferenceCheckProviderStarter.class) .web(WebApplicationType.NONE) // .REACTIVE, .SERVLET .run(args); System.out.println("dubbo service started"); } }
@EnableDubbo @RestController @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) public class ReferenceCheckConsumerStarter { @DubboReference private DemoService demoService; @GetMapping("/dubbo/nacos/test") public Object test() { return demoService.sayHello(); } public static void main(String[] args) { SpringApplication.run(ReferenceCheckConsumerStarter.class, args); } }
1. 先启动provider,再启动consumer
a. 启动provider
b. 启动consumer
访问 后返回 hello
c. 终止provider
访问 后返回 No provider available from registry
d. 重新启动provider
访问 后返回 hello
2. 先启动consumer,再启动provider
a. 启动consumer
b. 启动provider
访问 后返回 Directory already destroyed .
3. 先启动consumer,再启动provider (check=false)
@DubboReference(check = false) private DemoService demoService;
a. 启动consumer
访问 后返回 No provider available from registry
b. 启动provider
访问 后返回 hello
org.apache.dubbo.rpc.RpcException: No provider available from registry
public class RegistryDirectory<T> extends DynamicDirectory<T> { @Override public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)."); } // ...... } }
public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> { String EMPTY_PROTOCOL = "empty"; private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty url list to clear address."); this.originalUrls = invokerUrls; if (invokerUrls.size() == 0) { logger.info("Received empty url list..."); this.forbidden = true; // Forbid to access // 这里 this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow accessing // 这里 if (CollectionUtils.isEmpty(invokerUrls)) { return; } // can't use local reference because this.urlInvokerMap might be accessed at isAvailable() by main thread concurrently. Map<String, Invoker<T>> oldUrlInvokerMap = null; if (this.urlInvokerMap != null) { // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing. oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + this.urlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR)); this.urlInvokerMap.forEach(oldUrlInvokerMap::put); } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map // 这里 logger.info("Refreshed invoker size " + newUrlInvokerMap.size()); if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")")); return; } List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; // 这里 if (oldUrlInvokerMap != null) { try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } // notify invokers refreshed this.invokersChanged(); } private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls) { // mock zookeeper://xxx?mock=return null if (enableConfigurationListen) { overrideDirectoryUrl(); } refreshInvoker(instanceUrls); // 这里 } }
public class RegistryDirectory<T> extends DynamicDirectory<T> { @Override public synchronized void notify(List<URL> urls) { if (isDestroyed()) { return; } Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(this::judgeCategory)); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); // providers List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); // 3.x added for extend URL address ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class); List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); if (supportedListeners != null && !supportedListeners.isEmpty()) { for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); } } refreshOverrideAndInvoker(providerURLs); // 这里 } } public abstract class AbstractRegistry implements Registry { /** * Notify changes from the Provider side. * * @param url consumer side url * @param listener listener * @param urls provider latest urls */ protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size()); } // keep every provider's category. Map<String, List<URL>> result = new HashMap<>(); // 这里 for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getCategory(DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); // 这里 categoryList.add(u); // 这里 } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); listener.notify(categoryList); // 这里 // We will update our cache file after each notification. // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL. if (localCacheEnabled) { saveProperties(url); } } } }
public class NacosRegistry extends FailbackRegistry { private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) { List<Instance> enabledInstances = new LinkedList<>(instances); if (enabledInstances.size() > 0) { // Instances filterEnabledInstances(enabledInstances); } List<URL> urls = toUrlWithEmpty(url, enabledInstances); NacosRegistry.this.notify(url, listener, urls); // 这里 } String EMPTY_PROTOCOL = "empty"; private List<URL> toUrlWithEmpty(URL consumerURL, Collection<Instance> instances) { List<URL> urls = buildURLs(consumerURL, instances); if (urls.size() == 0) { // 这里 URL empty = URLBuilder.from(consumerURL) .setProtocol(EMPTY_PROTOCOL) .addParameter(CATEGORY_KEY, DEFAULT_CATEGORY) .build(); urls.add(empty); } return urls; } }
public class ServiceInfoHolder implements Closeable { public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (isEmptyOrErrorPush(serviceInfo)) { //empty or error push, just ignore return oldService; } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); boolean changed = isChangedServiceInfo(oldService, serviceInfo); if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) { serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo)); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { // 这里 NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); // 这里 DiskCache.write(serviceInfo, cacheDir); } return serviceInfo; } } public class DefaultPublisher extends Thread implements EventPublisher { private BlockingQueue<Event> queue; @Override public void init(Class<? extends Event> type, int bufferSize) { setDaemon(true); setName("nacos.publisher-" + type.getName()); this.eventType = type; this.queueMaxSize = bufferSize; this.queue = new ArrayBlockingQueue<>(bufferSize); // 这里 start(); } @Override public boolean publish(Event event) { checkIsStart(); boolean success = this.queue.offer(event); // 这里 if (!success) { LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); receiveEvent(event); return true; } return true; } @Override public void run() { openEventHandler(); } void openEventHandler() { try { // This variable is defined to resolve the problem which message overstock in the queue. int waitTimes = 60; // To ensure that messages are not lost, enable EventHandler when // waiting for the first Subscriber to register for (; ; ) { if (shutdown || hasSubscriber() || waitTimes <= 0) { break; } ThreadUtils.sleep(1000L); waitTimes--; } for (; ; ) { if (shutdown) { break; } final Event event = queue.take(); // 这里 receiveEvent(event); // 这里 UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); } } catch (Throwable ex) { LOGGER.error("Event listener exception : ", ex); } } void receiveEvent(Event event) { final long currentEventSequence = event.sequence(); if (!hasSubscriber()) { LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber."); return; } // Notification single event listener for (Subscriber subscriber : subscribers) { // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass()); continue; } // Because unifying smartSubscriber and subscriber, so here need to think of compatibility. // Remove original judge part of codes. notifySubscriber(subscriber, event); // 这里 } } @Override public void notifySubscriber(final Subscriber subscriber, final Event event) { LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber); final Runnable job = () -> subscriber.onEvent(event); final Executor executor = subscriber.executor(); if (executor != null) { executor.execute(job); // 这里 } else { try { job.run(); // 这里 } catch (Throwable e) { LOGGER.error("Event callback exception: ", e); } } } } public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> { @Override public void onEvent(InstancesChangeEvent event) { String key = ServiceInfo .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters()); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); if (CollectionUtils.isEmpty(eventListeners)) { return; } for (final EventListener listener : eventListeners) { final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event); if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) { ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent)); // 这里 } else { listener.onEvent(namingEvent); // 这里 } } } } public class NacosRegistry extends FailbackRegistry { @Override public void onEvent(Event event) { if (event instanceof NamingEvent) { NamingEvent e = (NamingEvent) event; notifier.notify(e.getInstances()); // 这里 } } } public abstract class RegistryNotifier { public synchronized void notify(Object rawAddresses) { this.rawAddresses = rawAddresses; long notifyTime = System.currentTimeMillis(); this.lastEventTime = notifyTime; long delta = (System.currentTimeMillis() - lastExecuteTime) - delayTime; // more than 10 calls && next execute time is in the future boolean delay = shouldDelay.get() && delta < 0; if (delay) { scheduler.schedule(new NotificationTask(this, notifyTime), -delta, TimeUnit.MILLISECONDS); // 这里 } else { // check if more than 10 calls if (!shouldDelay.get() && executeTime.incrementAndGet() > DEFAULT_DELAY_EXECUTE_TIMES) { shouldDelay.set(true); } scheduler.submit(new NotificationTask(this, notifyTime)); // 这里 } } public static class NotificationTask implements Runnable { private final RegistryNotifier listener; private final long time; public NotificationTask(RegistryNotifier listener, long time) { this.listener = listener; this.time = time; } @Override public void run() { try { if (this.time == listener.lastEventTime) { listener.doNotify(listener.rawAddresses); // 这里 listener.lastExecuteTime = System.currentTimeMillis(); synchronized (listener) { if (this.time == listener.lastEventTime) { listener.rawAddresses = null; } } } } catch (Throwable t) { logger.error("Error occurred when notify directory. ", t); } } }} } public class NacosRegistry extends FailbackRegistry { private class RegistryChildListenerImpl implements EventListener { private RegistryNotifier notifier; public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener) { notifier = new RegistryNotifier(getUrl(), NacosRegistry.this.getDelay()) { @Override protected void doNotify(Object rawAddresses) { List<Instance> instances = (List<Instance>) rawAddresses; if (isServiceNamesWithCompatibleMode(consumerUrl)) { /** * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899 */ NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances); instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName); } NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances); // 这里 } }; } }
public class NacosRegistry extends FailbackRegistry { private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener) throws NacosException { EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener); // 这里 namingService.subscribe(serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP), eventListener); // 这里 } private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) { try { if (isServiceNamesWithCompatibleMode(url)) { List<Instance> allCorrespondingInstanceList = Lists.newArrayList(); /** * Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899 * * namingService.getAllInstances with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl} * default {@link DEFAULT_GROUP} * * in https://github.com/apache/dubbo/issues/5978 */ for (String serviceName : serviceNames) { List<Instance> instances = namingService.getAllInstances(serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP)); NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances); allCorrespondingInstanceList.addAll(instances); } notifySubscriber(url, listener, allCorrespondingInstanceList); for (String serviceName : serviceNames) { subscribeEventListener(serviceName, url, listener); // 这里 } } else { for (String serviceName : serviceNames) { List<Instance> instances = new LinkedList<>(); instances.addAll(namingService.getAllInstances(serviceName , getUrl().getGroup(Constants.DEFAULT_GROUP))); String serviceInterface = serviceName; String[] segments = serviceName.split(SERVICE_NAME_SEPARATOR, -1); if (segments.length == 4) { serviceInterface = segments[SERVICE_INTERFACE_INDEX]; } URL subscriberURL = url.setPath(serviceInterface).addParameters(INTERFACE_KEY, serviceInterface, CHECK_KEY, String.valueOf(false)); notifySubscriber(subscriberURL, listener, instances); subscribeEventListener(serviceName, subscriberURL, listener); } } } catch (Throwable cause) { throw new RpcException("Failed to subscribe " + url + " to nacos " + getUrl() + ", cause: " + cause.getMessage(), cause); } } }
org.apache.dubbo.rpc.RpcException: Directory already destroyed
public abstract class AbstractDirectory<T> implements Directory<T> { @Override public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } return doList(invocation); } @Override public void destroy() { destroyed = true; // 这里 } }
public class ReferenceConfig<T> extends ReferenceConfigBase<T> { private void checkInvokerAvailable() throws IllegalStateException { if (shouldCheck() && !invoker.isAvailable()) { invoker.destroy(); // 这里 throw new IllegalStateException("Should has at least one way to know which services this interface belongs to," + " subscription url: " + invoker.getUrl()); } } protected synchronized void init() { // ...... checkInvokerAvailable(); // 这里 } }
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig { public boolean shouldCheck() { checkDefault(); Boolean shouldCheck = isCheck(); // 这里 if (shouldCheck == null && getConsumer() != null) { shouldCheck = getConsumer().isCheck(); } if (shouldCheck == null) { // default true // 这里 shouldCheck = true; } return shouldCheck; } }
public class RegistryDirectory<T> extends DynamicDirectory<T> { @Override public boolean isAvailable() { if (isDestroyed() || this.forbidden) { // 这里 return false; } Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap; // 这里 return CollectionUtils.isNotEmptyMap(localUrlInvokerMap) && localUrlInvokerMap.values().stream().anyMatch(Invoker::isAvailable); } }