关于服务目录这里就先介绍这些,大家先有个大致印象。接下来我们通过继承体系图来了解一下服务目录的家族成员都有哪些。

服务目录目前内置的实现有两个,分别为 StaticDirectory 和 RegistryDirectory,它们均是 AbstractDirectory 的子类。AbstractDirectory 实现了 Directory 接口,这个接口包含了一个重要的方法定义,即 list(Invocation),用于列举 Invoker。下面我们来看一下他们的继承体系图。

如上,Directory 继承自 Node 接口,Node 这个接口继承者比较多,像 Registry、Monitor、Invoker 等均继承了这个接口。这个接口包含了一个获取配置信息的方法 getUrl,实现该接口的类可以向外提供配置信息。另外,大家注意看 RegistryDirectory 实现了 NotifyListener 接口,当注册中心节点信息发生变化后,RegistryDirectory 可以通过此接口方法得到变更信息,并根据变更信息动态调整内部 Invoker 列表。

本章将分析 AbstractDirectory 和它两个子类的源码。AbstractDirectory 封装了 Invoker 列举流程,具体的列举逻辑则由子类实现,这是典型的模板模式。所以,接下来我们先来看一下 AbstractDirectory 的源码。

上面就是 AbstractDirectory 的 list 方法源码,这个方法封装了 Invoker 的列举过程。如下:

  • 调用 doList 获取 Invoker 列表
  • 根据 Router 的 getUrl 返回值为空与否,以及 runtime 参数决定是否进行服务路由
    以上步骤中,doList 是模板方法,需由子类实现。Router 的 runtime 参数这里简单说明一下,这个参数决定了是否在每次调用服务时都执行路由规则。如果 runtime 为 true,那么每次调用服务前,都需要进行服务路由。这个对性能造成影响,配置时需要注意。

关于 AbstractDirectory 就分析这么多,下面开始分析子类的源码。

  1. // Invoker 列表
  2. private final List<Invoker<T>> invokers;
  3. // 省略构造方法
  4. @Override
  5. public Class<T> getInterface() {
  6. // 获取接口类
  7. return invokers.get(0).getInterface();
  8. }
  9. // 检测服务目录是否可用
  10. @Override
  11. public boolean isAvailable() {
  12. if (isDestroyed()) {
  13. return false;
  14. }
  15. for (Invoker<T> invoker : invokers) {
  16. if (invoker.isAvailable()) {
  17. // 只要有一个 Invoker 是可用的,就认为当前目录是可用的
  18. return true;
  19. }
  20. }
  21. return false;
  22. }
  23. @Override
  24. public void destroy() {
  25. if (isDestroyed()) {
  26. return;
  27. }
  28. // 调用父类销毁逻辑
  29. super.destroy();
  30. // 遍历 Invoker 列表,并执行相应的销毁逻辑
  31. for (Invoker<T> invoker : invokers) {
  32. invoker.destroy();
  33. }
  34. invokers.clear();
  35. }
  36. @Override
  37. protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
  38. // 列举 Inovker,也就是直接返回 invokers 成员变量
  39. return invokers;
  40. }
  41. }

以上就是 StaticDirectory 的代码逻辑,很简单,就不多说了。下面来看看 RegistryDirectory,这个类的逻辑比较复杂。

3.2 RegistryDirectory

RegistryDirectory 是一种动态服务目录,实现了 NotifyListener 接口。当注册中心服务配置发生变化后,RegistryDirectory 可收到与当前服务相关的变化。收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表。RegistryDirectory 中有几个比较重要的逻辑,第一是 Invoker 的列举逻辑,第二是接收服务配置变更的逻辑,第三是 Invoker 列表的刷新逻辑。接下来按顺序对这三块逻辑。

3.2.1 列举 Invoker

Invoker 列举逻辑封装在 doList 方法中,相关代码如下:

  1. public List<Invoker<T>> doList(Invocation invocation) {
  2. if (forbidden) {
  3. // 服务提供者关闭或禁用了服务,此时抛出 No provider 异常
  4. throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
  5. "No provider available from registry ...");
  6. }
  7. List<Invoker<T>> invokers = null;
  8. // 获取 Invoker 本地缓存
  9. Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
  10. if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
  11. // 获取方法名和参数列表
  12. String methodName = RpcUtils.getMethodName(invocation);
  13. Object[] args = RpcUtils.getArguments(invocation);
  14. // 检测参数列表的第一个参数是否为 String 或 enum 类型
  15. if (args != null && args.length > 0 && args[0] != null
  16. && (args[0] instanceof String || args[0].getClass().isEnum())) {
  17. // 通过 方法名 + 第一个参数名称 查询 Invoker 列表,具体的使用场景暂时没想到
  18. invokers = localMethodInvokerMap.get(methodName + "." + args[0]);
  19. }
  20. if (invokers == null) {
  21. // 通过方法名获取 Invoker 列表
  22. invokers = localMethodInvokerMap.get(methodName);
  23. }
  24. if (invokers == null) {
  25. // 通过星号 * 获取 Invoker 列表
  26. invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
  27. }
  28. // 冗余逻辑,pull request #2861 移除了下面的 if 分支代码
  29. if (invokers == null) {
  30. Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
  31. if (iterator.hasNext()) {
  32. invokers = iterator.next();
  33. }
  34. }
  35. }
  36. // 返回 Invoker 列表
  37. return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
  38. }

以上代码进行多次尝试,以期从 localMethodInvokerMap 中获取到 Invoker 列表。一般情况下,普通的调用可通过方法名获取到对应的 Invoker 列表,泛化调用可通过 * 获取到 Invoker 列表。localMethodInvokerMap 源自 RegistryDirectory 类的成员变量 methodInvokerMap。doList 方法可以看做是对 methodInvokerMap 变量的读操作,至于对 methodInvokerMap 变量的写操作,下一节进行分析。

3.2.2 接收服务变更通知

RegistryDirectory 是一个动态服务目录,会随注册中心配置的变化进行动态调整。因此 RegistryDirectory 实现了 NotifyListener 接口,通过这个接口获取注册中心变更通知。下面我们来看一下具体的逻辑。

如上,notify 方法首先是根据 url 的 category 参数对 url 进行分门别类存储,然后通过 toRouters 和 toConfigurators 将 url 列表转成 Router 和 Configurator 列表。最后调用 refreshInvoker 方法刷新 Invoker 列表。这里的 toRouters 和 toConfigurators 方法逻辑不复杂,大家自行分析。接下来,我们把重点放在 refreshInvoker 方法上。

3.2.3 刷新 Invoker 列表

refreshInvoker 方法是保证 RegistryDirectory 随注册中心变化而变化的关键所在。这一块逻辑比较多,接下来一一进行分析。

  1. private void refreshInvoker(List<URL> invokerUrls) {
  2. // invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
  3. if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
  4. && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
  5. // 设置 forbidden 为 true
  6. this.methodInvokerMap = null;
  7. // 销毁所有 Invoker
  8. destroyAllInvokers();
  9. } else {
  10. Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
  11. if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
  12. // 添加缓存 url 到 invokerUrls 中
  13. invokerUrls.addAll(this.cachedInvokerUrls);
  14. } else {
  15. this.cachedInvokerUrls = new HashSet<URL>();
  16. // 缓存 invokerUrls
  17. this.cachedInvokerUrls.addAll(invokerUrls);
  18. }
  19. if (invokerUrls.isEmpty()) {
  20. return;
  21. }
  22. // 将 url 转成 Invoker
  23. Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
  24. // 将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
  25. Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
  26. // 转换出错,直接打印异常,并返回
  27. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
  28. logger.error(new IllegalStateException("urls to invokers error ..."));
  29. return;
  30. }
  31. // 合并多个组的 Invoker
  32. this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
  33. this.urlInvokerMap = newUrlInvokerMap;
  34. try {
  35. // 销毁无用 Invoker
  36. destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
  37. } catch (Exception e) {
  38. logger.warn("destroyUnusedInvokers error. ", e);
  39. }
  40. }
  41. }

接下来对 refreshInvoker 方法中涉及到的调用一一进行分析。按照顺序,先来分析 url 到 Invoker 的转换过程。

  1. private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
  2. Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
  3. if (urls == null || urls.isEmpty()) {
  4. return newUrlInvokerMap;
  5. }
  6. Set<String> keys = new HashSet<String>();
  7. // 获取服务消费端配置的协议
  8. String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
  9. for (URL providerUrl : urls) {
  10. if (queryProtocols != null && queryProtocols.length() > 0) {
  11. boolean accept = false;
  12. String[] acceptProtocols = queryProtocols.split(",");
  13. // 检测服务提供者协议是否被服务消费者所支持
  14. for (String acceptProtocol : acceptProtocols) {
  15. if (providerUrl.getProtocol().equals(acceptProtocol)) {
  16. accept = true;
  17. break;
  18. }
  19. }
  20. if (!accept) {
  21. // 若服务消费者协议头不被消费者所支持,则忽略当前 providerUrl
  22. continue;
  23. }
  24. }
  25. // 忽略 empty 协议
  26. if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
  27. continue;
  28. }
  29. // 通过 SPI 检测服务端协议是否被消费端支持,不支持则抛出异常
  30. if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
  31. logger.error(new IllegalStateException("Unsupported protocol..."));
  32. continue;
  33. }
  34. // 合并 url
  35. URL url = mergeUrl(providerUrl);
  36. String key = url.toFullString();
  37. if (keys.contains(key)) {
  38. // 忽略重复 url
  39. continue;
  40. }
  41. keys.add(key);
  42. // 将本地 Invoker 缓存赋值给 localUrlInvokerMap
  43. Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
  44. // 获取与 url 对应的 Invoker
  45. Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
  46. // 缓存未命中
  47. if (invoker == null) {
  48. try {
  49. boolean enabled = true;
  50. if (url.hasParameter(Constants.DISABLED_KEY)) {
  51. // 获取 disable 配置,取反,然后赋值给 enable 变量
  52. enabled = !url.getParameter(Constants.DISABLED_KEY, false);
  53. } else {
  54. // 获取 enable 配置,并赋值给 enable 变量
  55. enabled = url.getParameter(Constants.ENABLED_KEY, true);
  56. }
  57. if (enabled) {
  58. // 调用 refer 获取 Invoker
  59. invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
  60. }
  61. } catch (Throwable t) {
  62. logger.error("Failed to refer invoker for interface...");
  63. }
  64. newUrlInvokerMap.put(key, invoker);
  65. }
  66. // 缓存命中
  67. } else {
  68. // 将 invoker 存储到 newUrlInvokerMap 中
  69. newUrlInvokerMap.put(key, invoker);
  70. }
  71. }
  72. keys.clear();
  73. return newUrlInvokerMap;
  74. }

toInvokers 方法一开始会对服务提供者 url 进行检测,若服务消费端的配置不支持服务端的协议,或服务端 url 协议头为 empty 时,toInvokers 均会忽略服务提供方 url。必要的检测做完后,紧接着是合并 url,然后访问缓存,尝试获取与 url 对应的 invoker。如果缓存命中,直接将 Invoker 存入 newUrlInvokerMap 中即可。如果未命中,则需新建 Invoker。

toInvokers 方法返回的是 <url, Invoker> 映射关系表,接下来还要对这个结果进行进一步处理,得到方法名到 Invoker 列表的映射关系。这个过程由 toMethodInvokers 方法完成,如下:

上面方法主要做了三件事情, 第一是对入参进行遍历,然后从 Invoker 的 url 成员变量中获取 methods 参数,并切分成数组。随后以方法名为键,Invoker 列表为值,将映射关系存储到 newMethodInvokerMap 中。第二是分别基于类和方法对 Invoker 列表进行路由操作。第三是对 Invoker 列表进行排序,并转成不可变列表。关于 toMethodInvokers 方法就先分析到这,我们继续向下分析,这次要分析的多组服务的合并逻辑。

  1. private Map<String, List<Invoker<T>>> toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap) {
  2. Map<String, List<Invoker<T>>> result = new HashMap<String, List<Invoker<T>>>();
  3. // 遍历入参
  4. for (Map.Entry<String, List<Invoker<T>>> entry : methodMap.entrySet()) {
  5. String method = entry.getKey();
  6. List<Invoker<T>> invokers = entry.getValue();
  7. // group -> Invoker 列表
  8. Map<String, List<Invoker<T>>> groupMap = new HashMap<String, List<Invoker<T>>>();
  9. // 遍历 Invoker 列表
  10. for (Invoker<T> invoker : invokers) {
  11. // 获取分组配置
  12. String group = invoker.getUrl().getParameter(Constants.GROUP_KEY, "");
  13. List<Invoker<T>> groupInvokers = groupMap.get(group);
  14. if (groupInvokers == null) {
  15. groupInvokers = new ArrayList<Invoker<T>>();
  16. // 缓存 <group, List<Invoker>> 到 groupMap 中
  17. groupMap.put(group, groupInvokers);
  18. }
  19. // 存储 invoker 到 groupInvokers
  20. groupInvokers.add(invoker);
  21. }
  22. if (groupMap.size() == 1) {
  23. // 如果 groupMap 中仅包含一组键值对,此时直接取出该键值对的值即可
  24. result.put(method, groupMap.values().iterator().next());
  25. // groupMap.size() > 1 成立,表示 groupMap 中包含多组键值对,比如:
  26. // {
  27. // "dubbo": [invoker1, invoker2, invoker3, ...],
  28. // "hello": [invoker4, invoker5, invoker6, ...]
  29. // }
  30. } else if (groupMap.size() > 1) {
  31. List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>();
  32. for (List<Invoker<T>> groupList : groupMap.values()) {
  33. // 通过集群类合并每个分组对应的 Invoker 列表
  34. groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList)));
  35. }
  36. // 缓存结果
  37. result.put(method, groupInvokers);
  38. } else {
  39. result.put(method, invokers);
  40. }
  41. }
  42. return result;
  43. }

上面方法首先是生成 group 到 Invoker 类比的映射关系表,若关系表中的映射关系数量大于1,表示有多组服务。此时通过集群类合并每组 Invoker,并将合并结果存储到 groupInvokers 中。之后将方法名与 groupInvokers 存到到 result 中,并返回,整个逻辑结束。

接下来我们再来看一下 Invoker 列表刷新逻辑的最后一个动作 — 删除无用 Invoker。如下:

  1. private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
  2. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
  3. destroyAllInvokers();
  4. return;
  5. }
  6. List<String> deleted = null;
  7. if (oldUrlInvokerMap != null) {
  8. // 获取新生成的 Invoker 列表
  9. Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
  10. // 遍历老的 <url, Invoker> 映射表
  11. for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
  12. // 检测 newInvokers 中是否包含老的 Invoker
  13. if (!newInvokers.contains(entry.getValue())) {
  14. if (deleted == null) {
  15. deleted = new ArrayList<String>();
  16. }
  17. // 若不包含,则将老的 Invoker 对应的 url 存入 deleted 列表中
  18. deleted.add(entry.getKey());
  19. }
  20. }
  21. }
  22. if (deleted != null) {
  23. // 遍历 deleted 集合,并到老的 <url, Invoker> 映射关系表查出 Invoker,销毁之
  24. for (String url : deleted) {
  25. if (url != null) {
  26. // 从 oldUrlInvokerMap 中移除 url 对应的 Invoker
  27. Invoker<T> invoker = oldUrlInvokerMap.remove(url);
  28. if (invoker != null) {
  29. try {
  30. // 销毁 Invoker
  31. invoker.destroy();
  32. } catch (Exception e) {
  33. logger.warn("destroy invoker...");
  34. }
  35. }
  36. }
  37. }
  38. }

destroyUnusedInvokers 方法的主要逻辑是通过 newUrlInvokerMap 找出待删除 Invoker 对应的 url,并将 url 存入到 deleted 列表中。然后再遍历 deleted 列表,并从 oldUrlInvokerMap 中移除相应的 Invoker,销毁之。整个逻辑大致如此,不是很难理解。

  • 检测入参是否仅包含一个 url,且 url 协议头为 empty
  • 若第一步检测结果为 true,表示禁用所有服务,此时销毁所有的 Invoker
  • 若第一步检测结果为 false,此时将入参转为 Invoker 列表
  • 对将上一步逻辑生成的结果进行进一步处理,得到方法名到 Invoker 的映射关系表
  • 合并多组 Invoker
  • 销毁无用 Invoker
    Invoker 的刷新逻辑还是比较复杂的,大家在看的过程中多写点 demo 进行调试,以加深理解。

本篇文章对 Dubbo 服务目录进行了较为详细的分析,篇幅主要集中在 RegistryDirectory 的源码分析上。从代码量上可以看出,想让本地服务目录和注册中心保持一致还是需要做很多事情的,并不简单。服务目录是 Dubbo 集群容错的一部分,也是比较基础的部分,所以大家应尽量搞懂。