乐于分享
好东西不私藏

【Dubbo】服务导出源码分析

本文最后更新于2026-01-02,某些文章具有时效性,若有错误或已失效,请在下方留言或联系老夜

【Dubbo】服务导出源码分析

前言

前面讨论了Dubbo与Spring的整合原理,今天讨论一下Dubbo服务导出流程

总体调用关系

上述流程为整体导出过程的一个粗略架构图,后续会展开介绍详细流程

核心源码分析

调用时序图

时序图来自官方:https://cn.dubbo.apache.org/zh-cn/overview/mannual/java-sdk/reference-manual/architecture/code-architecture/,我本地的代码是Dubbo3.0,跟这个有一些差异,但是整体来说区别不是非常大,不影响理解。

调用入口

上文中介绍到了在启动过程中注册了两个关键的监听器DubboDeployApplicationListener,我们这里服务导出就从这个监听器开始

注册入口org.apache.dubbo.config.spring.util.DubboBeanUtils#registerCommonBeans

staticvoidregisterCommonBeans(BeanDefinitionRegistry registry) {// 省略无关代码// register ApplicationListeners,重点看这个        registerInfrastructureBean(registry, DubboDeployApplicationListener.class.getName(), DubboDeployApplicationListener.class);// 。。。。    }

既然是监听器,那我们首先是应该看看它监听了什么?

上图可以看出,它实现了Spring自带的事件ApplicationContextEvent,这个事件主要是在Spring容器启动生命周期中会发送各类事件,用于用户根据需要进行扩展。Dubbo服务的导出主要就是依赖了其中两个事件。这里补充下ApplicationContextEvent的子类:

接下来回到正文,看看监听器DubboDeployApplicationListener具体实现逻辑,重点看org.apache.dubbo.config.spring.context.DubboDeployApplicationListener#onApplicationEvent方法

publicvoidonApplicationEvent(ApplicationContextEvent event) {if (event instanceof ContextRefreshedEvent) {    onContextRefreshedEvent((ContextRefreshedEvent) event);  } elseif (event instanceof ContextClosedEvent) {    onContextClosedEvent((ContextClosedEvent) event);  }}

这个监听器主要是用了Spring的ContextRefreshedEvent事件用于导出服务,实现了ContextClosedEvent用于取消导出服务,先来看看onContextRefreshedEvent源码

privatevoidonContextRefreshedEvent(ContextRefreshedEvent event) {// 获取部署器(dubbo3.0对应用、模块都做了非常大的升级,这里先不深究)ModuleDeployerdeployer= moduleModel.getDeployer();  Assert.notNull(deployer, "Module deployer is null");// start module,导出服务(异步或者同步,根据配置),重点方法接着看CompletableFuturefuture= deployer.start();// 如果配置了不是异步导出,则等待导出完成if (!deployer.isBackground()) {try {      future.get();    } catch (InterruptedException e) {      logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());    } catch (Exception e) {      logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);    }  }}

导出&引用入口

接着看deployer.start()做了什么,代码位置

org.apache.dubbo.config.deploy.DefaultModuleDeployer#start

publicsynchronized CompletableFuture start()throws IllegalStateException {// 判断是否在启动中或者已经启动完成if (isStarting() || isStarted()) {return startFuture;        }// 标记导出状态,发布事件        onModuleStarting();        startFuture = newCompletableFuture();// 应用初始化(这个方法也做了非常多事情,很重要,后续单独讨论,比如配置中心初始化,元数据初始化。。。)        applicationDeployer.initialize();// initialize,加载最新配置,标记是否异步导出或引入        initialize();// export services,导出服务,重点来了        exportServices();// prepare application instanceif (hasExportedServices()) {            applicationDeployer.prepareApplicationInstance();        }// refer services,引用服务,这个下一步讨论        referServices();// 等待导出、引用完成,然后回收一些用于启动服务的一些线程资源        executorRepository.getSharedExecutor().submit(() -> {// wait for export finish            waitExportFinish();// wait for refer finish            waitReferFinish();            onModuleStarted(startFuture);        });return startFuture;    }

导出流程

导出主要是org.apache.dubbo.config.deploy.DefaultModuleDeployer#exportServices方法,我们重点看看:

第一步拿到所有的待导出server,循环调用

privatevoidexportServices() {// 从配置管理器拿到所有的待导出服务for (ServiceConfigBase sc : configManager.getServices()) {// 导出    exportServiceInternal(sc);  }}

接着看导出exportServiceInternal方法

privatevoidexportServiceInternal(ServiceConfigBase sc) {        ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;// 判断配置是否是最新的,不是的话重新刷新一些,这里回去环境变量,外部配置等读取最新配置,不是重点,这里不讨论if (!serviceConfig.isRefreshed()) {            serviceConfig.refresh();        }// 已经导出就直接返回if (sc.isExported()) {return;        }// 判断是否是异步导出if (exportAsync || sc.shouldExportAsync()) {// 线程池管理器ExecutorServiceexecutor= executorRepository.getServiceExportExecutor();            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {if (!sc.isExported()) {// 导出                        sc.exportOnly();                        exportedServices.add(sc);                    }                } catch (Throwable t) {                    logger.error(getIdentifier() + " export async catch error : " + t.getMessage(), t);                }            }, executor);            asyncExportingFutures.add(future);        } else {if (!sc.isExported()) {                sc.exportOnly();                exportedServices.add(sc);            }        }    }

继续看exportOnly方法,代码位置

org.apache.dubbo.config.ServiceConfig#exportOnly

publicsynchronizedvoidexportOnly() {if (this.exported) {return;  }// 保证配置是最新的if (!this.isRefreshed()) {this.refresh();  }// 是否需要导出if (this.shouldExport()) {// 初始化监听器this.init();if (shouldDelay()) {// 延迟导出      doDelayExport();    } else {// 同步导出      doExport();    }  }}

延迟导出org.apache.dubbo.config.ServiceConfig#doDelayExport

protectedvoiddoDelayExport() {// 实际上是起了个延迟任务,根据用于配置的延迟时间进行导出服务    DELAY_EXPORT_EXECUTOR.schedule(() -> {try {// 导出            doExport();        } catch (Exception e) {            logger.error("Failed to export service config: " + interfaceName, e);        }    }, getDelay(), TimeUnit.MILLISECONDS);}

接着看org.apache.dubbo.config.ServiceConfig#doExport

protectedsynchronizedvoiddoExport() {if (unexported) {thrownewIllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");   }if (exported) {return;   }if (StringUtils.isEmpty(path)) {     path = interfaceName;   }// 导出   doExportUrls();// 缓存已经导出的服务   exported(); }

接着看org.apache.dubbo.config.ServiceConfig#doExportUrls

privatevoiddoExportUrls() {// 获取模块服务仓库ModuleServiceRepositoryrepository= getScopeModel().getServiceRepository();// 获取服务描述器,一个服务里面会有很多歌方法,每个方法可能有自己的配置ServiceDescriptorserviceDescriptor= repository.registerService(getInterfaceClass());// 服务提供者模型  providerModel = newProviderModel(getUniqueServiceName(),                                    ref,                                    serviceDescriptor,this,                                    getScopeModel(),                                    serviceMetadata);// 注册到模块服务仓库  repository.registerProvider(providerModel);// 使用loadRegistries获取到所有的注册中心地址,用户可能会配置多个注册中心// registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registryConfig&application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=13403&registry=zookeeper&timestamp=1767344233494  List<URL> registryURLs = ConfigValidationUtils.loadRegistries(thistrue);// 遍历当前服务的协议配置,通过doExportUrlsFor1Protocol按照每个协议分别注册到注册中心(注意注册中心可以是多个)for (ProtocolConfig protocolConfig : protocols) {StringpathKey= URL.buildKey(getContextPath(protocolConfig)                                  .map(p -> p + "/" + path)                                  .orElse(path), group, version);// 如果用于声明了路径,则需要再次用新路径注册服务    repository.registerService(pathKey, interfaceClass);// 导出服务,重点,注意这里的protocol是registry://开头    doExportUrlsFor1Protocol(protocolConfig, registryURLs);  }}

接着看org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol

privatevoiddoExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {// 得到协议配置  Map<String, String> map = buildAttributes(protocolConfig);//init serviceMetadata attachments  serviceMetadata.getAttachments().putAll(map);// 构建url,类似这样dubbo://192.168.1.101:20880/org.apache.dubbo.demo.UserService?anyhost=true&application=dubbo-demo-annotation-provider&background=false&bind.ip=192.168.1.101&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.UserService&metadata-type=remote&methods=getUser&pid=8169&release=&side=provider&timestamp=1767277014048URLurl= buildUrl(protocolConfig, registryURLs, map);// 导出url  exportUrl(url, registryURLs);}

继续看org.apache.dubbo.config.ServiceConfig#exportUrl

privatevoidexportUrl(URL url, List<URL> registryURLs) {// 范围Stringscope= url.getParameter(SCOPE_KEY);// don't export when none is configuredif (!SCOPE_NONE.equalsIgnoreCase(scope)) {// 本地配置不用导出到远程// export to local if the config is not remote (export to remote only when config is remote)if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {         exportLocal(url);       }// export to remote if the config is not local (export to local only when config is local)if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {// 远程导出,重点方法         url = exportRemote(url, registryURLs);// 发布事件         MetadataUtils.publishServiceDefinition(url);       }     }this.urls.add(url);   }

分析org.apache.dubbo.config.ServiceConfig#exportRemote

private URL exportRemote(URL url, List<URL> registryURLs) {if (CollectionUtils.isNotEmpty(registryURLs)) {// 注册中心,这里其实有两个,// service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registryConfig&application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=8299&registry=zookeeper&timestamp=1767277773610// registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registryConfig&application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=8299&registry=zookeeper&timestamp=1767277773610for (URL registryURL : registryURLs) {if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {        url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");      }//if protocol is only injvm ,not registerif (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {continue;      }// dubbo://xxx      url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));URLmonitorUrl= ConfigValidationUtils.loadMonitor(this, registryURL);if (monitorUrl != null) {        url = url.putAttribute(MONITOR_KEY, monitorUrl);      }// For providers, this is used to enable custom proxy to generate invokerStringproxy= url.getParameter(PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {        registryURL = registryURL.addParameter(PROXY_KEY, proxy);      }            // 讲dubbo://开头的url添加到当前的注册中心URL上。然后调用doExportUrl导出      doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);    }  } else {if (MetadataService.class.getName().equals(url.getServiceInterface())) {      localMetadataService.setMetadataServiceURL(url);    }if (logger.isInfoEnabled()) {      logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);    }    doExportUrl(url, true);  }return url;}

接着看org.apache.dubbo.config.ServiceConfig#doExportUrl

privatevoiddoExportUrl(URL url, boolean withMetaData) {// 生成动态代理invoker对象,这里的代理非常重要,后续单独写一个文档,立一个flag在这里  Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);if (withMetaData) {    invoker = newDelegateProviderMetaDataInvoker(invoker, this);  }// 通过protocol进行导出操作,注意这里的protocol是一个动态代理Adaptive。底层通过SPI拿到的registryProtocol,然后调用export  Exporter<?> exporter = protocolSPI.export(invoker);  exporters.add(exporter);}

这里的导出是一个dubbo的动态代理机制,会调用到如下方法,实际逻辑即使根据url上配置的协议通过spi获取真正的protocol

通过上述的动态代理会调用到org.apache.dubbo.registry.integration.RegistryProtocol#export,因为org.apache.dubbo.rpc.Protocol文件配置的是org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol。代码如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker)throws RpcException {// 实际的注册地址 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registryConfig&application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=13600&timestamp=1767345767765URLregistryUrl= getRegistryUrl(originInvoker);// 需要导出的服务地址 dubbo://192.168.1.101:20880/org.apache.dubbo.demo.UserService?anyhost=true&application=dubbo-demo-annotation-provider&background=false&bind.ip=192.168.1.101&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.UserService&metadata-type=remote&methods=getUser&pid=13600&release=&service-name-mapping=true&side=provider&timestamp=1767345772348URLproviderUrl= getProviderUrl(originInvoker);// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call//  the same service. Because the subscribed is cached key with the name of the service, it causes the//  subscription information to cover.// 生成监听器,主要为了动态配置变化,如果有监听到就覆盖原有的finalURLoverrideSubscribeUrl= getSubscribedOverrideUrl(providerUrl);finalOverrideListeneroverrideSubscribeListener=newOverrideListener(overrideSubscribeUrl, originInvoker);        Map<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();        overrideListeners.put(registryUrl, overrideSubscribeListener);        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//export invoker 启动当前服务的nettyserverfinal ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);// url to registry 注册到注册中心finalRegistryregistry= getRegistry(registryUrl);// 确定最终需要保存到注册中心的地址finalURLregisteredProviderUrl= getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publish 延迟注册booleanregister= providerUrl.getParameter(REGISTER_KEY, true);if (register) {            register(registry, registeredProviderUrl);        }// register stated url on provider model        registerStatedUrl(registryUrl, registeredProviderUrl, register);        exporter.setRegisterUrl(registeredProviderUrl);        exporter.setSubscribeUrl(overrideSubscribeUrl);// Deprecated! Subscribe to override rules in 2.6.x or before.// 订阅配置中心关于注册中心的配置变更        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 发送通知        notifyExport(exporter);//Ensure that a new exporter instance is returned every time exportreturnnewDestroyableExporter<>(exporter);    }

先来看看org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {Stringkey= getCacheKey(originInvoker);// 这里继续调用protocol.export导出,这里还是一个Dubbo的Aop,是DubboProtocol的return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {     Invoker<?> invokerDelegate = newInvokerDelegate<>(originInvoker, providerUrl);returnnewExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);   }); }

这里继续调用protocol.export导出,这里还是一个Dubbo的Aop,是DubboProtocol的一一堆Warpper,大概是这样的

用一个图的话,这里的调用关系是这样,这个是在加载SPI时就根据Order进行排好序的,类似Warpper的东西都是这个套路,一层包一层的调用:

  • • ProtocolSerializationWrapper:主要做资源回收
  • • ProtocolFilterWrapper:构建过滤器链,这个比较重要,真正调用的时候会请求会过这些过滤器
  • • ProtocolListenerWrapper:构建一些监听器,这几个其实warpper没有做太多事情过完了上面的几个Wrapper,接着会走到真正注册的DubboProtocol的export,代码位置org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker)throws RpcException {URLurl= invoker.getUrl();// export service.Stringkey= serviceKey(url);        DubboExporter<T> exporter = newDubboExporter<T>(invoker, key, exporterMap);        exporterMap.put(key, exporter);// 存根验证//export an stub service for dispatching eventBooleanisStubSupportEvent= url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);BooleanisCallbackservice= url.getParameter(IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {StringstubServiceMethods= url.getParameter(STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {                    logger.warn(newIllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded."));                }            }        }// 启动server端        openServer(url);        optimizeSerialization(url);return exporter;    }

接着看org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer

privatevoidopenServer(URL url) {// find server.Stringkey= url.getAddress();//client can export a service which's only for server to invokebooleanisServer= url.getParameter(IS_SERVER_KEY, true);// 如果服务启动过就不再启动了if (isServer) {ProtocolServerserver= serverMap.get(key);if (server == null) {synchronized (this) {        server = serverMap.get(key);if (server == null) {// 创建server,          serverMap.put(key, createServer(url));        }else {          server.reset(url);        }      }    } else {// server supports reset, use together with override      server.reset(url);    }  }}

调用exchange层创建服务,org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer,离真正的启动越来越近了

private ProtocolServer createServer(URL url) {   url = URLBuilder.from(url)// send readonly event when server closes, it's enabled by default     .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())// enable heartbeat by default     .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))     .addParameter(CODEC_KEY, DubboCodec.NAME)     .build();Stringstr= url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);if (str != null && str.length() > 0 && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {thrownewRpcException("Unsupported server type: " + str + ", url: " + url);   }   ExchangeServer server;try {// 重点方法     server = Exchangers.bind(url, requestHandler);   } catch (RemotingException e) {thrownewRpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);   }   str = url.getParameter(CLIENT_KEY);if (str != null && str.length() > 0) {     Set<String> supportedTypes = url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {thrownewRpcException("Unsupported client type: " + str);     }   }DubboProtocolServerprotocolServer=newDubboProtocolServer(server);   loadServerProperties(protocolServer);return protocolServer; }

重点看Exchangers.bind(url, requestHandler)

publicstatic ExchangeServer bind(URL url, ExchangeHandler handler)throws RemotingException {if (url == null) {thrownewIllegalArgumentException("url == null");  }if (handler == null) {thrownewIllegalArgumentException("handler == null");  }  url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");// 继续看return getExchanger(url).bind(url, handler);}

getExchanger(url).bind(url, handler)

publicstatic Exchanger getExchanger(URL url) {Stringtype= url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);// 通过api获取当前的exchangerreturn url.getOrDefaultFrameworkModel().getExtensionLoader(Exchanger.class).getExtension(type);}

接着到org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind

public ExchangeServer bind(URL url, ExchangeHandler handler)throws RemotingException {returnnewHeaderExchangeServer(Transporters.bind(url, newDecodeHandler(newHeaderExchangeHandler(handler))));    }

接着调org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler…)

publicstatic RemotingServer bind(URL url, ChannelHandler... handlers)throws RemotingException {if (url == null) {thrownewIllegalArgumentException("url == null");  }if (handlers == null || handlers.length == 0) {thrownewIllegalArgumentException("handlers == null");  }  ChannelHandler handler;if (handlers.length == 1) {    handler = handlers[0];  } else {    handler = newChannelHandlerDispatcher(handlers);  }return getTransporter(url).bind(url, handler);}

继续调用getTransporter(url).bind(url, handler),这里回走到代理类,然后通过api获取真正的Transpoter

这里会走到org.apache.dubbo.remoting.transport.netty4.NettyServer

publicNettyServer(URL url, ChannelHandler handler)throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handlersuper(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));// read config before destroy    serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());}

这里什么都没做,看父类,重点来了

publicAbstractServer(URL url, ChannelHandler handler)throws RemotingException {super(url, handler);  executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();  localAddress = getUrl().toInetSocketAddress();// 服务端ipStringbindIp= getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());// 端口intbindPort= getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {    bindIp = ANYHOST_VALUE;  }// 声明InetSocketAddress  bindAddress = newInetSocketAddress(bindIp, bindPort);this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);try {// 真正启动服务    doOpen();if (logger.isInfoEnabled()) {      logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());    }  } catch (Throwable t) {thrownewRemotingException(url.toInetSocketAddress(), null"Failed to bind " + getClass().getSimpleName()                                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);  }// 创建线程池,为了后续回收资源  executor = executorRepository.createExecutorIfAbsent(url);}

看doOpen,熟悉的代码来了

org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen

protectedvoiddoOpen()throws Throwable {  bootstrap = newServerBootstrap();  bossGroup = NettyEventLoopFactory.eventLoopGroup(1"NettyServerBoss");  workerGroup = NettyEventLoopFactory.eventLoopGroup(    getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),"NettyServerWorker");finalNettyServerHandlernettyServerHandler=newNettyServerHandler(getUrl(), this);  channels = nettyServerHandler.getChannels();booleankeepalive= getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);  bootstrap.group(bossGroup, workerGroup)    .channel(NettyEventLoopFactory.serverSocketChannelClass())    .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)    .childOption(ChannelOption.SO_KEEPALIVE, keepalive)    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)    .childHandler(newChannelInitializer<SocketChannel>() {@OverrideprotectedvoidinitChannel(SocketChannel ch)throws Exception {// FIXME: should we use getTimeout()?intidleTimeout= UrlUtils.getIdleTimeout(getUrl());NettyCodecAdapteradapter=newNettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {          ch.pipeline().addLast("negotiation"newSslServerTlsHandler(getUrl()));        }        ch.pipeline()          .addLast("decoder", adapter.getDecoder())          .addLast("encoder", adapter.getEncoder())          .addLast("server-idle-handler"newIdleStateHandler(00, idleTimeout, MILLISECONDS))          .addLast("handler", nettyServerHandler);      }    });// bindChannelFuturechannelFuture= bootstrap.bind(getBindAddress());  channelFuture.syncUninterruptibly();  channel = channelFuture.channel();}

上面这段代码实际上就是启动了一个Netty服务,注入了Dubbo协议的序列化和反序列化编码器。这都是netty的内容了,Netty底层就是Nio了,后续慢慢在分享吧。其实到这里,服务启动就完成了。接着回到org.apache.dubbo.registry.integration.RegistryProtocol#export的代码,调用doLocalexport完成服务启动后,进行服务注册。

首先是获取注册中心

看看具体实现,代码位置

org.apache.dubbo.registry.integration.RegistryProtocol#getRegistry

protected Registry getRegistry(final URL registryUrl) {// 使用SPI获取factory,然后获取注册器,默认配置是wrapper=org.apache.dubbo.registry.RegistryFactoryWrapperRegistryFactoryregistryFactory= ScopeModelUtil.getExtensionLoader(RegistryFactory.class, registryUrl.getScopeModel()).getAdaptiveExtension();return registryFactory.getRegistry(registryUrl);    }

这里SPI,由于我们配置的注册中心为zookeeper://xxx,所以这里根据registryFactory.getRegistry获取到的注册中心工厂则为ZookeeperRegistry,由于经过了Warpper包装所依返回的是ListenerRegistryWrapper

public Registry getRegistry(URL url) {returnnewListenerRegistryWrapper(registryFactory.getRegistry(url),                Collections.unmodifiableList(url.getOrDefaultApplicationModel().getExtensionLoader(RegistryServiceListener.class)                        .getActivateExtension(url, "registry.listeners")));    }

这样就获取到了注册中心,接下来就是真正的注册了

接着看register方法,一路调用会调用到org.apache.dubbo.registry.support.FailbackRegistry#register方法,直接看这里FailbackRegistry是ZookeeperRegistry的父类

publicvoidregister(URL url) {if (!acceptable(url)) {    logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");return;  }super.register(url);  removeFailedRegistered(url);  removeFailedUnregistered(url);try {// Sending a registration request to the server side// 重点方法,这里回调用实际实现类的注册方法    doRegister(url);  } catch (Exception e) {// 异常处理先不看Throwablet= e;// If the startup detection is opened, the Exception is thrown directly.booleancheck= getUrl().getParameter(Constants.CHECK_KEY, true)      && url.getParameter(Constants.CHECK_KEY, true)      && !(url.getPort() == 0);booleanskipFailback= t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {        t = t.getCause();      }thrownewIllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);    } else {      logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);    }// Record a failed registration request to a failed list, retry regularly    addFailedRegistered(url);  }}

接着看最后的方法org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister

publicvoiddoRegister(URL url) {try {// 调用zk客户端创建临时节点     zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));   } catch (Throwable e) {thrownewRpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);   } }

到此,整个服务暴露流程就完成了。接着我们回顾一下导出全流程:

  • • 确定基础服务参数,比如ApplicationConfig,ModulConfig,ProviderConfig,RegistryConfig。。。
  • • 确定当前服务的协议列表
  • • 确定注册方式,如果有多种注册方式,则把上述的协议按照各种注册方式注册
  • • 注册过程中会判断当前服务本地是否导出,如果没有则通过DubboProtocol->Expoter->Transpoter启动NettyServer
  • • 启动成功后进行注册,如果是zk,则注册到zk
  • • 发送注册完成通知或调用相关的监听器进行回调

总结

重点讨论了服务导出的全流程代码,发现有时候在遇到Dubbo的SPI机制在全流程起作用的以后,跟断点都蒙圈,说明理解SPI机制不够深入,这个对于理解Dubbo框架其实非常核心。后续再继续学习下SPI,以及理解它在Dubbo整体生命周期中的作用。下篇讨论下服务的引用。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » 【Dubbo】服务导出源码分析

评论 抢沙发

5 + 5 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮