上面代码看起来比较复杂,主要做如下一些操作:

    1. 调用 doLocalExport 导出服务
    2. 向注册中心注册服务
    3. 向注册中心进行订阅 override 数据
    4. 创建并返回 DestroyableExporter

    在以上操作中,除了创建并返回 DestroyableExporter 没什么难度外,其他几步操作都不是很简单。这其中,导出服务和注册服务是本章要重点分析的逻辑。 订阅 override 数据并非本文重点内容,后面会简单介绍一下。下面先来分析 doLocalExport 方法的逻辑,如下:

    1. String key = getCacheKey(originInvoker);
    2. // 访问缓存
    3. ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    4. if (exporter == null) {
    5. synchronized (bounds) {
    6. exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    7. if (exporter == null) {
    8. // 创建 Invoker 为委托类对象
    9. final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
    10. // 调用 protocol 的 export 方法导出服务
    11. exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
    12. // 写缓存
    13. bounds.put(key, exporter);
    14. }
    15. }
    16. }
    17. return exporter;
    18. }

    上面的代码是典型的双重检查锁,大家在阅读 Dubbo 的源码中,会多次见到。接下来,我们把重点放在 Protocol 的 export 方法上。假设运行时协议为 dubbo,此处的 protocol 变量会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。所以,接下来我们目光转移到 DubboProtocol 的 export 方法上,相关分析如下:

    1. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    2. URL url = invoker.getUrl();
    3. // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
    4. // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
    5. String key = serviceKey(url);
    6. // 创建 DubboExporter
    7. DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    8. // 将 <key, exporter> 键值对放入缓存中
    9. exporterMap.put(key, exporter);
    10. // 本地存根相关代码
    11. Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    12. Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    13. if (isStubSupportEvent && !isCallbackservice) {
    14. String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
    15. if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
    16. // 省略日志打印代码
    17. } else {
    18. stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
    19. }
    20. }
    21. // 启动服务器
    22. // 优化序列化
    23. optimizeSerialization(url);
    24. return exporter;

    如上,我们重点关注 DubboExporter 的创建以及 openServer 方法,其他逻辑看不懂也没关系,不影响理解服务导出过程。另外,DubboExporter 的代码比较简单,就不分析了。下面分析 openServer 方法。

    1. private ExchangeServer createServer(URL url) {
    2. url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
    3. // 添加心跳检测配置到 url 中
    4. url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    5. // 获取 server 参数,默认为 netty
    6. String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    7. // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
    8. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
    9. throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    10. // 添加编码解码器参数
    11. url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    12. ExchangeServer server;
    13. try {
    14. // 创建 ExchangeServer
    15. server = Exchangers.bind(url, requestHandler);
    16. } catch (RemotingException e) {
    17. throw new RpcException("Fail to start server...");
    18. }
    19. // 获取 client 参数,可指定 netty,mina
    20. str = url.getParameter(Constants.CLIENT_KEY);
    21. if (str != null && str.length() > 0) {
    22. // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
    23. Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
    24. // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
    25. // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
    26. if (!supportedTypes.contains(str)) {
    27. throw new RpcException("Unsupported client type...");
    28. }
    29. }
    30. return server;
    31. }

    如上,createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。两次检测操作所对应的代码比较直白了,无需多说。但创建服务器的操作目前还不是很清晰,我们继续往下看。

    1. public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    2. if (url == null) {
    3. throw new IllegalArgumentException("url == null");
    4. }
    5. if (handler == null) {
    6. throw new IllegalArgumentException("handler == null");
    7. }
    8. url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    9. // 获取 Exchanger,默认为 HeaderExchanger。
    10. // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
    11. }

    上面代码比较简单,就不多说了。下面看一下 HeaderExchanger 的 bind 方法。

    HeaderExchanger 的 bind 方法包含的逻辑比较多,但目前我们仅需关心 Transporters 的 bind 方法逻辑即可。该方法的代码如下:

    1. public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    2. throw new IllegalArgumentException("url == null");
    3. }
    4. if (handlers == null || handlers.length == 0) {
    5. throw new IllegalArgumentException("handlers == null");
    6. }
    7. ChannelHandler handler;
    8. if (handlers.length == 1) {
    9. handler = handlers[0];
    10. } else {
    11. // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
    12. handler = new ChannelHandlerDispatcher(handlers);
    13. }
    14. // 获取自适应 Transporter 实例,并调用实例方法
    15. return getTransporter().bind(url, handler);
    16. }

    如上,getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。下面我们继续跟下去,这次分析的是 NettyTransporter 的 bind 方法。

    1. public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    2. // 创建 NettyServer
    3. return new NettyServer(url, listener);
    4. }

    上面代码多为赋值代码,不需要多讲。我们重点关注 doOpen 抽象方法,该方法需要子类实现。下面回到 NettyServer 中。

    1. protected void doOpen() throws Throwable {
    2. NettyHelper.setNettyLoggerFactory();
    3. // 创建 boss 和 worker 线程池
    4. ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    5. ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    6. ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    7. // 创建 ServerBootstrap
    8. bootstrap = new ServerBootstrap(channelFactory);
    9. final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    10. channels = nettyHandler.getChannels();
    11. bootstrap.setOption("child.tcpNoDelay", true);
    12. // 设置 PipelineFactory
    13. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    14. @Override
    15. public ChannelPipeline getPipeline() {
    16. NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
    17. ChannelPipeline pipeline = Channels.pipeline();
    18. pipeline.addLast("decoder", adapter.getDecoder());
    19. pipeline.addLast("encoder", adapter.getEncoder());
    20. pipeline.addLast("handler", nettyHandler);
    21. return pipeline;
    22. }
    23. });
    24. // 绑定到指定的 ip 和端口上
    25. }

    以上就是 NettyServer 创建的过程,dubbo 默认使用的 NettyServer 是基于 netty 3.x 版本实现的,比较老了。因此 Dubbo 另外提供了 netty 4.x 版本的 NettyServer,大家可在使用 Dubbo 的过程中按需进行配置。

    到此,关于服务导出的过程就分析完了。整个过程比较复杂,大家在分析的过程中耐心一些。并且多写 Demo 进行调试,以便能够更好的理解代码逻辑。

    本节内容先到这里,接下来分析服务导出的另一块逻辑 — 服务注册。