在分析完Dubbo的整体架构之后,我们对每个层次来单独分析下。
我们的消费者在启动时,会去查询其所有对应的provider,并将URL转换为Invoker保存到当前内存,并启动对provider的监听,当其发生变动时,可以及时反馈到当前,对Invoker列表进行更新。
那么以上是如何实现的呢?
作为注册中心层,我们可以看到结构如下图:
从RegistryProtocol开始,在RegistryFactory中获取到对应的Registry(示例中采取的是ZookeeperRegistry)
new一个RegistryDirectory,其总负责对注册中心的监听,当有provider发生变动时,可以及时反馈到consumer。
本文就从RegistryProtocol.refer()方法开始聊起。
public class RegistryProtocol implements Protocol { public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 获取注册url,本例中以zookeeper:// 开头 url = getRegistryUrl(url); // 所以从registryFactory中获取到的最终为ZookeeperRegistry Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } // 继续调用doRefer()方法 return doRefer(cluster, registry, type, url); } }
通过注册url zookeeper://... 来确定最终使用的注册中心类型为:ZookeeperRegistry。
public class RegistryProtocol implements Protocol { private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(subscribeUrl); // 调用ZookeeperRegistry.register()方法,将当前consumer_url注册到Zookeeper上(本质上就是创建一个临时节点) // 具体见2.1 registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); // RegistryDirectory订阅url变更 directory.subscribe(toSubscribeUrl(subscribeUrl)); Invoker<T> invoker = cluster.join(directory); List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl); for (RegistryProtocolListener listener : listeners) { listener.onRefer(this, registryInvokerWrapper); } return registryInvokerWrapper; } }
registry()方法在父类FailbackRegistry.java中,最终还是调用到子类的doRegistry()方法
public class ZookeeperRegistry extends FailbackRegistry { public void doRegister(URL url) { try { // 创建临时节点 zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } }
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); // 调用ZookeeperRegistry.subscribe()订阅方法 registry.subscribe(url, this); } }
这里需要注意的是将当前this 也就是RegistryDirectory本身当做listener传入,所以最终监听被触发时,还是会调用到RegistryDirectory
public class ZookeeperRegistry extends FailbackRegistry { public void doSubscribe(final URL url, final NotifyListener listener) { try { // 匹配所有接口 if (ANY_VALUE.equals(url.getServiceInterface())) { ... } else { List<URL> urls = new ArrayList<>(); // 获取到的path,在本例中即为:/dubbo/org.apache.dubbo.demo.DemoService/providers // 也就是provider的路径 for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); // 创建对该provider_path的监听,监听器本身为RegistryDirectory List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 最后触发一次notify,调用RegistryDirectory.notify() notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } }
总结:消费者启动时,创建对provider_path(本例中为/dubbo/org.apache.dubbo.demo.DemoService/providers)的监听,监听器为RegistryDirectory。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(this::judgeCategory)); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); // router相关,非本文重点 List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); // providers List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class); List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); if (supportedListeners != null && !supportedListeners.isEmpty()) { for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); } } // 在这里将URL转换为Invoker,保存到RegistryDirectory.urlInvokerMap中 refreshOverrideAndInvoker(providerURLs); } }
我们可以把RegistryDirectory当做注册中心的操作层,所需要的provider信息都存放在RegistryDirectory中。
而具体的操作则交由ZookeeperRegistry来实现。