dubbo 服务消费原理分析之引用服务配置

2024-09-07 06:28

本文主要是介绍dubbo 服务消费原理分析之引用服务配置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、服务监听ContextRefreshedEvent
    • 1、AbstractApplicationContext.refresh
    • 2、AbstractApplicationContext.finishRefresh
    • 3、DubboDeployApplicationListener.onApplicationEvent
    • 4、DefaultModuleDeployer .referServices
    • 5、SimpleReferenceCache.get
  • 二、引用服务 ReferenceConfig
    • 1、时序图
    • 2、ReferenceConfig.get
    • 3、ReferenceConfig.init
    • 4、ReferenceConfig.createProxy
    • 5、ReferenceConfig.createInvokerForRemote
  • 三、注册协议 RegistryProtocol
    • 1、RegistryProtocol.refer
    • 2、RegistryProtocol.doRefer
    • 3、RegistryProtocol.interceptInvoker


前言

文章基于3.1.0版本进行分析

		<dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>3.1.0</version></dependency>

一、服务监听ContextRefreshedEvent

在springboot 中,refresh 是 AbstractApplicationContext 中的一个方法,负责初始化 ApplicationContext 容器,容器必须调用 refresh 才能正常工作是进行,refresh的最后处理了 finishRefresh 方法,改方法会广播一个ContextRefreshedEvent容器刷新完成事件,所有监听了该事件的bean都会去执行相关逻辑处理。

1、AbstractApplicationContext.refresh

public void refresh() throws BeansException, IllegalStateException {synchronized(this.startupShutdownMonitor) {// 省略无关代码***// 初始化生命周期处理器,调用生命周期处理器onRefresh方法,发布ContextRefreshedEvent事件,JMX相关处理this.finishRefresh();// 省略无关代码***}}

2、AbstractApplicationContext.finishRefresh

	protected void finishRefresh() {// 清除上下文资源缓存(如扫描中的ASM元数据) scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// 广播刷新完成事件publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.if (!NativeDetector.inNativeImage()) {LiveBeansView.registerApplicationContext(this);}}

3、DubboDeployApplicationListener.onApplicationEvent

dubbo很好的结合了spring的这一个拓展点,在这个拓展点开始实现服务的发布。可以看到,DubboDeployApplicationListener实现了ContextRefreshedEvent的消息监听

public class DubboDeployApplicationListener implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, Ordered {private static final Logger logger = LoggerFactory.getLogger(DubboDeployApplicationListener.class);private ApplicationContext applicationContext;private ApplicationModel applicationModel;private ModuleModel moduleModel;@Overridepublic void onApplicationEvent(ApplicationContextEvent event) {if (nullSafeEquals(applicationContext, event.getSource())) {if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}}private void onContextRefreshedEvent(ContextRefreshedEvent event) {// 获取配置的deployer 进行发布,默认是 DefaultModuleDeployer ModuleDeployer deployer = moduleModel.getDeployer();Assert.notNull(deployer, "Module deployer is null");// start moduleFuture future = deployer.start();// if the module does not start in background, await finishif (!deployer.isBackground()) {try {// 等待发布结束future.get();} catch (InterruptedException e) {logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());} catch (Exception e) {logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);}}}}

4、DefaultModuleDeployer .referServices

DefaultModuleDeployer 中,真正核心的是ReferenceConfig,ReferenceConfig才是去实际发布的动作

public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {// DefaultApplicationDeployerprivate ApplicationDeployer applicationDeployer;@Overridepublic synchronized Future start() throws IllegalStateException {...// 不管是 DefaultApplicationDeployer 还是DefaultModuleDeployer的initialize方法,都是处理相关配置文件// 其功能等价于监听器 DubboConfigApplicationListenerapplicationDeployer.initialize();initialize();// 真正触发 服务注册功能exportServices();// prepare application instance// exclude internal module to avoid wait itselfif (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {applicationDeployer.prepareInternalModule();}// 服务消费// refer servicesreferServices();...return startFuture;}private void referServices() {configManager.getReferences().forEach(rc -> {try {ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;// 刷新配置if (!referenceConfig.isRefreshed()) {referenceConfig.refresh();}// 如果还没初始化if (rc.shouldInit()) {// 是否异步注入if (referAsync || rc.shouldReferAsync()) {ExecutorService executor = executorRepository.getServiceReferExecutor();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {referenceCache.get(rc);} catch (Throwable t) {logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);}}, executor);asyncReferringFutures.add(future);} else {// 查询缓存中是否存在代理对象 对应的实现类SimpleReferenceCachereferenceCache.get(rc);}}} catch (Throwable t) {logger.error(getIdentifier() + " refer catch error.");referenceCache.destroy(rc);throw t;}});}
}

5、SimpleReferenceCache.get

SimpleReferenceCache,一个用于缓存引用ReferenceConfigBase的util工具类。
ReferenceConfigBase是一个重对象,对于频繁创建ReferenceConfigBase的框架来说,有必要缓存这些对象。
如果需要使用复杂的策略,可以实现并使用自己的ReferenceConfigBase缓存
这个Cache是引用服务的开始如果我们想在代码中自定义一些服务引用的逻辑,可以直接创建SimpleReferenceCache类型对象然后调用其get方法进行引用服务。

	public <T> T get(ReferenceConfigBase<T> rc) {String key = generator.generateKey(rc);// 服务类型 如果是泛化调用则这个类型为GenericServiceClass<?> type = rc.getInterfaceClass();boolean singleton = rc.getSingleton() == null || rc.getSingleton();T proxy = null;// Check existing proxy of the same 'key' and 'type' first.// 单例if (singleton) {// 缓存数据找proxy = get(key, (Class<T>) type);} else {logger.warn("Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");}// 不存在消费的代理对象,创建rc.get(),最后走到ReferenceConfig.get()if (proxy == null) {// 获取或者创建值,为引用类型referencesOfType对象(类型为Map<Class<?>, List<ReferenceConfigBase<?>>>)缓存对象生成值(值不存咋时候会生成一个)List<ReferenceConfigBase<?>> referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>()));// 每次走到这里都会添加一个ReferenceConfigBase 引用配置对象(单例的从缓存中拿到就可以直接返回了)referencesOfType.add(rc);// 与前面一样 前面是类型映射,这里是key映射List<ReferenceConfigBase<?>> referenceConfigList = referenceKeyMap.computeIfAbsent(key, _k -> Collections.synchronizedList(new ArrayList<>()));referenceConfigList.add(rc);// 开始引用服务proxy = rc.get();}return proxy;}

二、引用服务 ReferenceConfig

服务发现和引用就是从这里开始的

1、时序图

2、ReferenceConfig.get

获取引用的代理对象

	public T get() {if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}if (ref == null) {// ensure start module, compatible with old api usage// 如果使用方直接调用了ReferenceConfigBase的get方法或者缓存对象SimpleReferenceCache类型的对象的get方法来引用服务端的时候就会造成很多配置没有初始化// 这个代码其实就是启动模块进行一些基础配置的初始化操作,比如元数据中心默认配置选择,注册中心默认配置选择这些都是比较重要的getScopeModel().getDeployer().start();synchronized (this) {if (ref == null) {init();}}}return ref;}

主要包括

  • checkAndUpdateSubConfigs() – 检查并更新缺省配置
  • init() – 消费者服务的初始化,核心逻辑。

3、ReferenceConfig.init

初始化代理对象

    protected synchronized void init() {// 初始化标记变量保证只初始化一次,这里又是加锁又是加标记变量的if (initialized && ref != null) {return;}try {// 刷新配置if (!this.isRefreshed()) {this.refresh();}// init serviceMetadata// 初始化元数据信息 如版本号,分组,服务接口名initServiceMetadata(consumer);// //继续初始化元数据信息 服务接口类型和keyserviceMetadata.setServiceType(getServiceInterfaceClass());// TODO, uncomment this line once service key is unifiedserviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));// 参数配置转成MAPMap<String, String> referenceParameters = appendConfig();// init service-application mapping// 初始化路径,参数转化成url,dubbo主要从url上面读取参数initServiceAppsMapping(referenceParameters);// 获取Module级别的服务存储仓库,其内部保存着服务提供者和服务消费者的缓存ModuleServiceRepository repository = getScopeModel().getServiceRepository();ServiceDescriptor serviceDescriptor;if (CommonConstants.NATIVE_STUB.equals(getProxy())) {serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);repository.registerService(serviceDescriptor);} else {serviceDescriptor = repository.registerService(interfaceClass);}// 创建消费者模型对象consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor,getScopeModel(), serviceMetadata, createAsyncMethodInfo(), interfaceClassLoader);// Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.consumerModel.setConfig(this);repository.registerConsumer(consumerModel);serviceMetadata.getAttachments().putAll(referenceParameters);// 创建引用的代理对象 核心代码ref = createProxy(referenceParameters);serviceMetadata.setTarget(ref);serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);consumerModel.setDestroyCaller(getDestroyRunner());consumerModel.setProxyObject(ref);consumerModel.initMethodModels();checkInvokerAvailable();} catch (Throwable t) {// *** 省略部分代码throw t;}initialized = true;}

主要流程

  • 服务引用前初始化serviceMetadata服务元数据
  • 获取服务仓库ModuleServiceRepository,并注册service 和 consumer
  • 调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象,并赋值给ref,核心逻辑

4、ReferenceConfig.createProxy

创建代理对象,实际调用需要通过这个代理对象进行调用

	private T createProxy(Map<String, String> referenceParameters) {// 本地引用if (shouldJvmRefer(referenceParameters)) {createInvokerForLocal(referenceParameters);} else {urls.clear();meshModeHandleUrl(referenceParameters);if (StringUtils.isNotEmpty(url)) {// user specified URL, could be peer-to-peer address, or register center's address.// url参数不为空,且可以配置多个。可以是点对点,也可以是注册中心,然后添加到urlsparseUrl(referenceParameters);} else {// if protocols not in jvm checkRegistryif (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {// 从注册表中获取URL并将其聚合。这个其实就是初始化一下注册中心的url配置aggregateUrlFromRegistry(referenceParameters);}}// 创建远程引用,创建远程引用调用器,承担服务调用的核心逻辑createInvokerForRemote();}if (logger.isInfoEnabled()) {logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +(Boolean.parseBoolean(referenceParameters.get(GENERIC_KEY)) ?" it's GenericService reference" : " it's not GenericService reference"));}URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,referenceParameters.get(INTERFACE_KEY), referenceParameters);consumerUrl = consumerUrl.setScopeModel(getScopeModel());consumerUrl = consumerUrl.setServiceModel(consumerModel);MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());// create service proxy// 构建一个代理对象,代理客户端的请求return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));}

5、ReferenceConfig.createInvokerForRemote

远程引用或者直连引用情况下,将会调用该方法,创建远程引用Invoker。对于一个注册中心url和多个注册中心url的处理是不一样的,一个注册中心对应一个invoker,最后封装到集群路由invoker

	private void createInvokerForRemote() {// 只有一个,秩序创建一个invokerif (urls.size() == 1) {URL curUrl = urls.get(0);// 自适应扩展类,这里和服务发布类似,先进入了RegistryProtocol.referinvoker = protocolSPI.refer(interfaceClass, curUrl);if (!UrlUtils.isRegistry(curUrl)) {List<Invoker<?>> invokers = new ArrayList<>();invokers.add(invoker);invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);}} else {List<Invoker<?>> invokers = new ArrayList<>();URL registryUrl = null;// 创建多个invokerfor (URL url : urls) {// For multi-registry scenarios, it is not checked whether each referInvoker is available.// Because this invoker may become available later.// invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl = url;}}// 创建一个集群路由invokerif (registryUrl != null) {// registry url is available// for multi-subscription scenario, use 'zone-aware' policy by defaultString cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker// (RegistryDirectory, routing happens here) -> Invokerinvoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);} else {// not a registry url, must be direct invoke.// 直连if (CollectionUtils.isEmpty(invokers)) {throw new IllegalArgumentException("invokers == null");}URL curUrl = invokers.get(0).getUrl();String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);}}}

服务发布的时候讲到 protocolSPI,protocolSPI.refer形成的调用链为
-Protocol$Adaptie -> ProtocolSerializationWrapper -> ProtocolFilterWrapper -> QosProtocolWrapper -> ProtocolListenerWrapper -> RegistryProtocol -> RegistryProtocol

三、注册协议 RegistryProtocol

应用级服务远程协议以service-discovery-registry开头,其对应的Protocol实现就是RegistryProtocol

1、RegistryProtocol.refer

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 组装配置中心的地址url = getRegistryUrl(url);// 获取用于操作Zookeeper的Registry类型 Registry registry = getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);String group = qs.get(GROUP_KEY);if (StringUtils.isNotEmpty(group)) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);}}// 降级容错的逻辑处理对象 类型为Cluster 实际类型为MockClusterWrapper 内部包装的是FailoverCluster// 后续调用服务失败时候会先失效转移再降级Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));// 主要干活的地方 生成引用 invokerreturn doRefer(cluster, registry, type, url, qs);}

生成配置中心的url

zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-springboot-start-consumer
&dubbo=2.0.2
&pid=8896
&qos.enable=false
&release=3.1.0
&timestamp=1725379058285

2、RegistryProtocol.doRefer

    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());consumerAttribute.remove(REFER_KEY);String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);// 消费者url信息URL consumerUrl = new ServiceConfigURL (p,null,null,parameters.get(REGISTER_IP_KEY),0, getPath(parameters, type),parameters,consumerAttribute);url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);// 带迁移性质的Invoker对象ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);// 执行迁移规则创建应用级优先的服务发现Invoker对象return interceptInvoker(migrationInvoker, url, consumerUrl);}

consumerUrl如下

consumer://192.168.0.101/org.sjl.dubbo.AsyncProvider?application=dubbo-springboot-start-consumer
&background=false
&dubbo=2.0.2
&interface=org.sjl.dubbo.AsyncProvider
&methods=sayHiAsync,sayHello,sayHelloAsync&pid=33100
&qos.enable=false
&register.ip=192.168.0.101
&release=3.1.0
&side=consumer
&sticky=false
&timeout=30000
&timestamp=1725379319215

3、RegistryProtocol.interceptInvoker

    protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {// 获取激活的注册协议监听器扩展里面registry.protocol.listener,这里激活的类型为MigrationRuleListenerList<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);if (CollectionUtils.isEmpty(listeners)) {return invoker;}for (RegistryProtocolListener listener : listeners) {// MigrationRuleListener// 迁移规则应用级引用listener.onRefer(this, invoker, consumerUrl, url);}return invoker;}

接下来就进入了迁移规则的应用级服务发现了,参考 dubbo 服务消费原理分析之应用级服务发现

这篇关于dubbo 服务消费原理分析之引用服务配置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1144310

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

MySQL数据库双机热备的配置方法详解

《MySQL数据库双机热备的配置方法详解》在企业级应用中,数据库的高可用性和数据的安全性是至关重要的,MySQL作为最流行的开源关系型数据库管理系统之一,提供了多种方式来实现高可用性,其中双机热备(M... 目录1. 环境准备1.1 安装mysql1.2 配置MySQL1.2.1 主服务器配置1.2.2 从

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Linux云服务器手动配置DNS的方法步骤

《Linux云服务器手动配置DNS的方法步骤》在Linux云服务器上手动配置DNS(域名系统)是确保服务器能够正常解析域名的重要步骤,以下是详细的配置方法,包括系统文件的修改和常见问题的解决方案,需要... 目录1. 为什么需要手动配置 DNS?2. 手动配置 DNS 的方法方法 1:修改 /etc/res

Redis中Hash从使用过程到原理说明

《Redis中Hash从使用过程到原理说明》RedisHash结构用于存储字段-值对,适合对象数据,支持HSET、HGET等命令,采用ziplist或hashtable编码,通过渐进式rehash优化... 目录一、开篇:Hash就像超市的货架二、Hash的基本使用1. 常用命令示例2. Java操作示例三

Linux创建服务使用systemctl管理详解

《Linux创建服务使用systemctl管理详解》文章指导在Linux中创建systemd服务,设置文件权限为所有者读写、其他只读,重新加载配置,启动服务并检查状态,确保服务正常运行,关键步骤包括权... 目录创建服务 /usr/lib/systemd/system/设置服务文件权限:所有者读写js,其他

Redis中Set结构使用过程与原理说明

《Redis中Set结构使用过程与原理说明》本文解析了RedisSet数据结构,涵盖其基本操作(如添加、查找)、集合运算(交并差)、底层实现(intset与hashtable自动切换机制)、典型应用场... 目录开篇:从购物车到Redis Set一、Redis Set的基本操作1.1 编程常用命令1.2 集