dubbo 服务消费原理分析之引用服务配置

2024-09-07 06:28

本文主要是介绍dubbo 服务消费原理分析之引用服务配置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、服务监听ContextRefreshedEvent
    • 1、AbstractApplicationContext.refresh
    • 2、AbstractApplicationContext.finishRefresh
    • 3、DubboDeployApplicationListener.onApplicationEvent
    • 4、DefaultModuleDeployer .referServices
    • 5、SimpleReferenceCache.get
  • 二、引用服务 ReferenceConfig
    • 1、时序图
    • 2、ReferenceConfig.get
    • 3、ReferenceConfig.init
    • 4、ReferenceConfig.createProxy
    • 5、ReferenceConfig.createInvokerForRemote
  • 三、注册协议 RegistryProtocol
    • 1、RegistryProtocol.refer
    • 2、RegistryProtocol.doRefer
    • 3、RegistryProtocol.interceptInvoker


前言

文章基于3.1.0版本进行分析

		<dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>3.1.0</version></dependency>

一、服务监听ContextRefreshedEvent

在springboot 中,refresh 是 AbstractApplicationContext 中的一个方法,负责初始化 ApplicationContext 容器,容器必须调用 refresh 才能正常工作是进行,refresh的最后处理了 finishRefresh 方法,改方法会广播一个ContextRefreshedEvent容器刷新完成事件,所有监听了该事件的bean都会去执行相关逻辑处理。

1、AbstractApplicationContext.refresh

public void refresh() throws BeansException, IllegalStateException {synchronized(this.startupShutdownMonitor) {// 省略无关代码***// 初始化生命周期处理器,调用生命周期处理器onRefresh方法,发布ContextRefreshedEvent事件,JMX相关处理this.finishRefresh();// 省略无关代码***}}

2、AbstractApplicationContext.finishRefresh

	protected void finishRefresh() {// 清除上下文资源缓存(如扫描中的ASM元数据) scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// 广播刷新完成事件publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.if (!NativeDetector.inNativeImage()) {LiveBeansView.registerApplicationContext(this);}}

3、DubboDeployApplicationListener.onApplicationEvent

dubbo很好的结合了spring的这一个拓展点,在这个拓展点开始实现服务的发布。可以看到,DubboDeployApplicationListener实现了ContextRefreshedEvent的消息监听

public class DubboDeployApplicationListener implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, Ordered {private static final Logger logger = LoggerFactory.getLogger(DubboDeployApplicationListener.class);private ApplicationContext applicationContext;private ApplicationModel applicationModel;private ModuleModel moduleModel;@Overridepublic void onApplicationEvent(ApplicationContextEvent event) {if (nullSafeEquals(applicationContext, event.getSource())) {if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}}private void onContextRefreshedEvent(ContextRefreshedEvent event) {// 获取配置的deployer 进行发布,默认是 DefaultModuleDeployer ModuleDeployer deployer = moduleModel.getDeployer();Assert.notNull(deployer, "Module deployer is null");// start moduleFuture future = deployer.start();// if the module does not start in background, await finishif (!deployer.isBackground()) {try {// 等待发布结束future.get();} catch (InterruptedException e) {logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());} catch (Exception e) {logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);}}}}

4、DefaultModuleDeployer .referServices

DefaultModuleDeployer 中,真正核心的是ReferenceConfig,ReferenceConfig才是去实际发布的动作

public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {// DefaultApplicationDeployerprivate ApplicationDeployer applicationDeployer;@Overridepublic synchronized Future start() throws IllegalStateException {...// 不管是 DefaultApplicationDeployer 还是DefaultModuleDeployer的initialize方法,都是处理相关配置文件// 其功能等价于监听器 DubboConfigApplicationListenerapplicationDeployer.initialize();initialize();// 真正触发 服务注册功能exportServices();// prepare application instance// exclude internal module to avoid wait itselfif (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {applicationDeployer.prepareInternalModule();}// 服务消费// refer servicesreferServices();...return startFuture;}private void referServices() {configManager.getReferences().forEach(rc -> {try {ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;// 刷新配置if (!referenceConfig.isRefreshed()) {referenceConfig.refresh();}// 如果还没初始化if (rc.shouldInit()) {// 是否异步注入if (referAsync || rc.shouldReferAsync()) {ExecutorService executor = executorRepository.getServiceReferExecutor();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {referenceCache.get(rc);} catch (Throwable t) {logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);}}, executor);asyncReferringFutures.add(future);} else {// 查询缓存中是否存在代理对象 对应的实现类SimpleReferenceCachereferenceCache.get(rc);}}} catch (Throwable t) {logger.error(getIdentifier() + " refer catch error.");referenceCache.destroy(rc);throw t;}});}
}

5、SimpleReferenceCache.get

SimpleReferenceCache,一个用于缓存引用ReferenceConfigBase的util工具类。
ReferenceConfigBase是一个重对象,对于频繁创建ReferenceConfigBase的框架来说,有必要缓存这些对象。
如果需要使用复杂的策略,可以实现并使用自己的ReferenceConfigBase缓存
这个Cache是引用服务的开始如果我们想在代码中自定义一些服务引用的逻辑,可以直接创建SimpleReferenceCache类型对象然后调用其get方法进行引用服务。

	public <T> T get(ReferenceConfigBase<T> rc) {String key = generator.generateKey(rc);// 服务类型 如果是泛化调用则这个类型为GenericServiceClass<?> type = rc.getInterfaceClass();boolean singleton = rc.getSingleton() == null || rc.getSingleton();T proxy = null;// Check existing proxy of the same 'key' and 'type' first.// 单例if (singleton) {// 缓存数据找proxy = get(key, (Class<T>) type);} else {logger.warn("Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");}// 不存在消费的代理对象,创建rc.get(),最后走到ReferenceConfig.get()if (proxy == null) {// 获取或者创建值,为引用类型referencesOfType对象(类型为Map<Class<?>, List<ReferenceConfigBase<?>>>)缓存对象生成值(值不存咋时候会生成一个)List<ReferenceConfigBase<?>> referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>()));// 每次走到这里都会添加一个ReferenceConfigBase 引用配置对象(单例的从缓存中拿到就可以直接返回了)referencesOfType.add(rc);// 与前面一样 前面是类型映射,这里是key映射List<ReferenceConfigBase<?>> referenceConfigList = referenceKeyMap.computeIfAbsent(key, _k -> Collections.synchronizedList(new ArrayList<>()));referenceConfigList.add(rc);// 开始引用服务proxy = rc.get();}return proxy;}

二、引用服务 ReferenceConfig

服务发现和引用就是从这里开始的

1、时序图

2、ReferenceConfig.get

获取引用的代理对象

	public T get() {if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}if (ref == null) {// ensure start module, compatible with old api usage// 如果使用方直接调用了ReferenceConfigBase的get方法或者缓存对象SimpleReferenceCache类型的对象的get方法来引用服务端的时候就会造成很多配置没有初始化// 这个代码其实就是启动模块进行一些基础配置的初始化操作,比如元数据中心默认配置选择,注册中心默认配置选择这些都是比较重要的getScopeModel().getDeployer().start();synchronized (this) {if (ref == null) {init();}}}return ref;}

主要包括

  • checkAndUpdateSubConfigs() – 检查并更新缺省配置
  • init() – 消费者服务的初始化,核心逻辑。

3、ReferenceConfig.init

初始化代理对象

    protected synchronized void init() {// 初始化标记变量保证只初始化一次,这里又是加锁又是加标记变量的if (initialized && ref != null) {return;}try {// 刷新配置if (!this.isRefreshed()) {this.refresh();}// init serviceMetadata// 初始化元数据信息 如版本号,分组,服务接口名initServiceMetadata(consumer);// //继续初始化元数据信息 服务接口类型和keyserviceMetadata.setServiceType(getServiceInterfaceClass());// TODO, uncomment this line once service key is unifiedserviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));// 参数配置转成MAPMap<String, String> referenceParameters = appendConfig();// init service-application mapping// 初始化路径,参数转化成url,dubbo主要从url上面读取参数initServiceAppsMapping(referenceParameters);// 获取Module级别的服务存储仓库,其内部保存着服务提供者和服务消费者的缓存ModuleServiceRepository repository = getScopeModel().getServiceRepository();ServiceDescriptor serviceDescriptor;if (CommonConstants.NATIVE_STUB.equals(getProxy())) {serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);repository.registerService(serviceDescriptor);} else {serviceDescriptor = repository.registerService(interfaceClass);}// 创建消费者模型对象consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor,getScopeModel(), serviceMetadata, createAsyncMethodInfo(), interfaceClassLoader);// Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.consumerModel.setConfig(this);repository.registerConsumer(consumerModel);serviceMetadata.getAttachments().putAll(referenceParameters);// 创建引用的代理对象 核心代码ref = createProxy(referenceParameters);serviceMetadata.setTarget(ref);serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);consumerModel.setDestroyCaller(getDestroyRunner());consumerModel.setProxyObject(ref);consumerModel.initMethodModels();checkInvokerAvailable();} catch (Throwable t) {// *** 省略部分代码throw t;}initialized = true;}

主要流程

  • 服务引用前初始化serviceMetadata服务元数据
  • 获取服务仓库ModuleServiceRepository,并注册service 和 consumer
  • 调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象,并赋值给ref,核心逻辑

4、ReferenceConfig.createProxy

创建代理对象,实际调用需要通过这个代理对象进行调用

	private T createProxy(Map<String, String> referenceParameters) {// 本地引用if (shouldJvmRefer(referenceParameters)) {createInvokerForLocal(referenceParameters);} else {urls.clear();meshModeHandleUrl(referenceParameters);if (StringUtils.isNotEmpty(url)) {// user specified URL, could be peer-to-peer address, or register center's address.// url参数不为空,且可以配置多个。可以是点对点,也可以是注册中心,然后添加到urlsparseUrl(referenceParameters);} else {// if protocols not in jvm checkRegistryif (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {// 从注册表中获取URL并将其聚合。这个其实就是初始化一下注册中心的url配置aggregateUrlFromRegistry(referenceParameters);}}// 创建远程引用,创建远程引用调用器,承担服务调用的核心逻辑createInvokerForRemote();}if (logger.isInfoEnabled()) {logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +(Boolean.parseBoolean(referenceParameters.get(GENERIC_KEY)) ?" it's GenericService reference" : " it's not GenericService reference"));}URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,referenceParameters.get(INTERFACE_KEY), referenceParameters);consumerUrl = consumerUrl.setScopeModel(getScopeModel());consumerUrl = consumerUrl.setServiceModel(consumerModel);MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());// create service proxy// 构建一个代理对象,代理客户端的请求return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));}

5、ReferenceConfig.createInvokerForRemote

远程引用或者直连引用情况下,将会调用该方法,创建远程引用Invoker。对于一个注册中心url和多个注册中心url的处理是不一样的,一个注册中心对应一个invoker,最后封装到集群路由invoker

	private void createInvokerForRemote() {// 只有一个,秩序创建一个invokerif (urls.size() == 1) {URL curUrl = urls.get(0);// 自适应扩展类,这里和服务发布类似,先进入了RegistryProtocol.referinvoker = protocolSPI.refer(interfaceClass, curUrl);if (!UrlUtils.isRegistry(curUrl)) {List<Invoker<?>> invokers = new ArrayList<>();invokers.add(invoker);invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);}} else {List<Invoker<?>> invokers = new ArrayList<>();URL registryUrl = null;// 创建多个invokerfor (URL url : urls) {// For multi-registry scenarios, it is not checked whether each referInvoker is available.// Because this invoker may become available later.// invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl = url;}}// 创建一个集群路由invokerif (registryUrl != null) {// registry url is available// for multi-subscription scenario, use 'zone-aware' policy by defaultString cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker// (RegistryDirectory, routing happens here) -> Invokerinvoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);} else {// not a registry url, must be direct invoke.// 直连if (CollectionUtils.isEmpty(invokers)) {throw new IllegalArgumentException("invokers == null");}URL curUrl = invokers.get(0).getUrl();String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);}}}

服务发布的时候讲到 protocolSPI,protocolSPI.refer形成的调用链为
-Protocol$Adaptie -> ProtocolSerializationWrapper -> ProtocolFilterWrapper -> QosProtocolWrapper -> ProtocolListenerWrapper -> RegistryProtocol -> RegistryProtocol

三、注册协议 RegistryProtocol

应用级服务远程协议以service-discovery-registry开头,其对应的Protocol实现就是RegistryProtocol

1、RegistryProtocol.refer

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 组装配置中心的地址url = getRegistryUrl(url);// 获取用于操作Zookeeper的Registry类型 Registry registry = getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);String group = qs.get(GROUP_KEY);if (StringUtils.isNotEmpty(group)) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);}}// 降级容错的逻辑处理对象 类型为Cluster 实际类型为MockClusterWrapper 内部包装的是FailoverCluster// 后续调用服务失败时候会先失效转移再降级Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));// 主要干活的地方 生成引用 invokerreturn doRefer(cluster, registry, type, url, qs);}

生成配置中心的url

zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-springboot-start-consumer
&dubbo=2.0.2
&pid=8896
&qos.enable=false
&release=3.1.0
&timestamp=1725379058285

2、RegistryProtocol.doRefer

    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());consumerAttribute.remove(REFER_KEY);String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);// 消费者url信息URL consumerUrl = new ServiceConfigURL (p,null,null,parameters.get(REGISTER_IP_KEY),0, getPath(parameters, type),parameters,consumerAttribute);url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);// 带迁移性质的Invoker对象ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);// 执行迁移规则创建应用级优先的服务发现Invoker对象return interceptInvoker(migrationInvoker, url, consumerUrl);}

consumerUrl如下

consumer://192.168.0.101/org.sjl.dubbo.AsyncProvider?application=dubbo-springboot-start-consumer
&background=false
&dubbo=2.0.2
&interface=org.sjl.dubbo.AsyncProvider
&methods=sayHiAsync,sayHello,sayHelloAsync&pid=33100
&qos.enable=false
&register.ip=192.168.0.101
&release=3.1.0
&side=consumer
&sticky=false
&timeout=30000
&timestamp=1725379319215

3、RegistryProtocol.interceptInvoker

    protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {// 获取激活的注册协议监听器扩展里面registry.protocol.listener,这里激活的类型为MigrationRuleListenerList<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);if (CollectionUtils.isEmpty(listeners)) {return invoker;}for (RegistryProtocolListener listener : listeners) {// MigrationRuleListener// 迁移规则应用级引用listener.onRefer(this, invoker, consumerUrl, url);}return invoker;}

接下来就进入了迁移规则的应用级服务发现了,参考 dubbo 服务消费原理分析之应用级服务发现

这篇关于dubbo 服务消费原理分析之引用服务配置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1144310

相关文章

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

SpringBoot3.4配置校验新特性的用法详解

《SpringBoot3.4配置校验新特性的用法详解》SpringBoot3.4对配置校验支持进行了全面升级,这篇文章为大家详细介绍了一下它们的具体使用,文中的示例代码讲解详细,感兴趣的小伙伴可以参考... 目录基本用法示例定义配置类配置 application.yml注入使用嵌套对象与集合元素深度校验开发

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

Java Spring 中 @PostConstruct 注解使用原理及常见场景

《JavaSpring中@PostConstruct注解使用原理及常见场景》在JavaSpring中,@PostConstruct注解是一个非常实用的功能,它允许开发者在Spring容器完全初... 目录一、@PostConstruct 注解概述二、@PostConstruct 注解的基本使用2.1 基本代

IntelliJ IDEA 中配置 Spring MVC 环境的详细步骤及问题解决

《IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决》:本文主要介绍IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决,本文分步骤结合实例给大... 目录步骤 1:创建 Maven Web 项目步骤 2:添加 Spring MVC 依赖1、保存后执行2、将新的依赖

Spring 中的循环引用问题解决方法

《Spring中的循环引用问题解决方法》:本文主要介绍Spring中的循环引用问题解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录什么是循环引用?循环依赖三级缓存解决循环依赖二级缓存三级缓存本章来聊聊Spring 中的循环引用问题该如何解决。这里聊

Golang HashMap实现原理解析

《GolangHashMap实现原理解析》HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持高效的插入、查找和删除操作,:本文主要介绍GolangH... 目录HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持

SpringBoot基于配置实现短信服务策略的动态切换

《SpringBoot基于配置实现短信服务策略的动态切换》这篇文章主要为大家详细介绍了SpringBoot在接入多个短信服务商(如阿里云、腾讯云、华为云)后,如何根据配置或环境切换使用不同的服务商,需... 目录目标功能示例配置(application.yml)配置类绑定短信发送策略接口示例:阿里云 & 腾

如何为Yarn配置国内源的详细教程

《如何为Yarn配置国内源的详细教程》在使用Yarn进行项目开发时,由于网络原因,直接使用官方源可能会导致下载速度慢或连接失败,配置国内源可以显著提高包的下载速度和稳定性,本文将详细介绍如何为Yarn... 目录一、查询当前使用的镜像源二、设置国内源1. 设置为淘宝镜像源2. 设置为其他国内源三、还原为官方

CentOS7更改默认SSH端口与配置指南

《CentOS7更改默认SSH端口与配置指南》SSH是Linux服务器远程管理的核心工具,其默认监听端口为22,由于端口22众所周知,这也使得服务器容易受到自动化扫描和暴力破解攻击,本文将系统性地介绍... 目录引言为什么要更改 SSH 默认端口?步骤详解:如何更改 Centos 7 的 SSH 默认端口1