Dubbo源码分析----发起请求

2024-08-30 09:58

本文主要是介绍Dubbo源码分析----发起请求,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

从如下代码中还是分析

String sayHello = demoService.sayHello("123123");

我们知道demoService实际上是一个代理对象,那么假设使用的是JDK的代码,看看获取代理的地方

public class JdkProxyFactory extends AbstractProxyFactory {@SuppressWarnings("unchecked")public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces,new InvokerInvocationHandler(invoker));}public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {Method method = proxy.getClass().getMethod(methodName, parameterTypes);return method.invoke(proxy, arguments);}};}}

由动态代理的知识,可以知道代理对象调用方法的时候会经过InvocationHandler的invoke方法

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();//....return invoker.invoke(new RpcInvocation(method, args)).recreate();}

其中委托了invoker进行调用,这个invoker是什么呢? 要弄清楚这个问题,要回顾服务引用中的流程(com.alibaba.dubbo.config.ReferenceConfig#createProxy方法)

    private T createProxy(Map<String, String> map) {
//....if (urls.size() == 1) {invoker = refprotocol.refer(interfaceClass, urls.get(0));} else {//....if (registryURL != null) { URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));}  else { invoker = cluster.join(new StaticDirectory(invokers));}}}// 创建服务代理return (T) proxyFactory.getProxy(invoker);}

主要是通过refprotocol.refer构造的invoker,从protocol的refer返回的是MockClusterInvoker,其装饰了FailoverClusterInvoker(默认,如果cluster配置了其他,则是其他实现),主要做mock用,忽略。
那么InvokerInvocationHandler中的invoker就是FailoverClusterInvoker,invoke方法会调用到FailoverClusterInvoker的doInvoke方法

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {//....从多个invoker中选出一个进行调用Result result = invoker.invoke(invocation);
//....return result;}

这时候的invoker结构如下:
image.png
第二层的invoker是ProtocolFilterWrapper的匿名内部类,其持有一个过滤器,这一层主要一层层调用,然后最后调用到DubboInvoker

public class DubboInvoker<T> extends AbstractInvoker<T> {@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());inv.setAttachment(Constants.VERSION_KEY, version);ExchangeClient currentClient;// 获取连接Client对象,默认为1,可通过connections配置if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {// 是否异步boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return new RpcResult();} else if (isAsync) {ResponseFuture future = currentClient.request(inv, timeout) ;RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));return new RpcResult();} else {RpcContext.getContext().setFuture(null);return (Result) currentClient.request(inv, timeout).get();}} catch () {//....}}
}

根据选择的模式分为3种:
1. 同步
2. 异步
3. 不需要返回值

同步

这种情况下,调用的是Client的request方法,底层是通过channel将请求发送出去

    public ResponseFuture request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion("2.0.0");req.setTwoWay(true);req.setData(request);DefaultFuture future = new DefaultFuture(channel, req, timeout);try{channel.send(req);}catch (RemotingException e) {future.cancel();throw e;}return future;}

由于是同步,而Netty发送是异步的,当时取不到返回结果,所以返回一个Future之后,需要等待结果返回,这时候调用的是get方法等待返回

异步

异步的情况和同步差不多,调用request方法发送请求得到future,返回放到RpcContext中,然后返回一个结果,使用如下:

            Future<Result> future = RpcContext.getContext().getFuture();result = future.get();

从源码上可以看出,如果同时异步调用了两个服务,那么后者的setFuture会覆盖前者的

不需要返回值

这种情况调用了send方法,底层类似,isSent表示是否等待消息发出

注意:
假设有这种情况,A–异步–>B–同步–>C
那么,A->B这种情况,会走上面异步的流程,因为配置了async属性,所以URL中存在这个属性,而当B->C,这个async属性被附带到B->C的调用附加参数中,导致走了异步的流程,但是其实应该是同步的
出现这种问题的原因如下,先看下A->B的时候,调用的ContextFilter

@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {Map<String, String> attachments = invocation.getAttachments();if (attachments != null) {attachments = new HashMap<String, String>(attachments);attachments.remove(Constants.PATH_KEY);attachments.remove(Constants.GROUP_KEY);attachments.remove(Constants.VERSION_KEY);attachments.remove(Constants.DUBBO_VERSION_KEY);attachments.remove(Constants.TOKEN_KEY);attachments.remove(Constants.TIMEOUT_KEY);}RpcContext.getContext().setInvoker(invoker).setInvocation(invocation).setAttachments(attachments).setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());//....}
}

这个invocation是A带过来的参数,那么attachments中自然有async=true的属性,而下面,会把attachments放到当前的RpcContext中

当B->C时,调用DubboInvoker方法前调用了AbstractInvoker的invoke方法

    public Result invoke(Invocation inv) throws RpcException {//....Map<String, String> context = RpcContext.getContext().getAttachments();if (context != null) {invocation.addAttachmentsIfAbsent(context);}//....}

这里,把context的attachments有设置回了invocation,导致B->C附带了async=true的属性

这篇关于Dubbo源码分析----发起请求的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120493

相关文章

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

SpringBoot请求参数传递与接收示例详解

《SpringBoot请求参数传递与接收示例详解》本文给大家介绍SpringBoot请求参数传递与接收示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录I. 基础参数传递i.查询参数(Query Parameters)ii.路径参数(Path Va

使用Python的requests库来发送HTTP请求的操作指南

《使用Python的requests库来发送HTTP请求的操作指南》使用Python的requests库发送HTTP请求是非常简单和直观的,requests库提供了丰富的API,可以发送各种类型的HT... 目录前言1. 安装 requests 库2. 发送 GET 请求3. 发送 POST 请求4. 发送

Android 缓存日志Logcat导出与分析最佳实践

《Android缓存日志Logcat导出与分析最佳实践》本文全面介绍AndroidLogcat缓存日志的导出与分析方法,涵盖按进程、缓冲区类型及日志级别过滤,自动化工具使用,常见问题解决方案和最佳实... 目录android 缓存日志(Logcat)导出与分析全攻略为什么要导出缓存日志?按需过滤导出1. 按

Linux中的HTTPS协议原理分析

《Linux中的HTTPS协议原理分析》文章解释了HTTPS的必要性:HTTP明文传输易被篡改和劫持,HTTPS通过非对称加密协商对称密钥、CA证书认证和混合加密机制,有效防范中间人攻击,保障通信安全... 目录一、什么是加密和解密?二、为什么需要加密?三、常见的加密方式3.1 对称加密3.2非对称加密四、

MySQL中读写分离方案对比分析与选型建议

《MySQL中读写分离方案对比分析与选型建议》MySQL读写分离是提升数据库可用性和性能的常见手段,本文将围绕现实生产环境中常见的几种读写分离模式进行系统对比,希望对大家有所帮助... 目录一、问题背景介绍二、多种解决方案对比2.1 原生mysql主从复制2.2 Proxy层中间件:ProxySQL2.3

python使用Akshare与Streamlit实现股票估值分析教程(图文代码)

《python使用Akshare与Streamlit实现股票估值分析教程(图文代码)》入职测试中的一道题,要求:从Akshare下载某一个股票近十年的财务报表包括,资产负债表,利润表,现金流量表,保存... 目录一、前言二、核心知识点梳理1、Akshare数据获取2、Pandas数据处理3、Matplotl

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致