Dubbo源码分析----处理请求

2024-08-30 09:58

本文主要是介绍Dubbo源码分析----处理请求,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Dubbo默认是使用Netty进行通信的,那么Netty会配置一个Handler,来处理一些事件,所以这个Handler是核心,主要找一下Dubbo初始化Netty的时候设置的Handler

回顾一下网络通信相关,在DubboProtocol中,会调用createServer方法返回一个Server对象,这是一个切入点

    private ExchangeServer createServer(URL url) {//....ExchangeServer server;try {server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}//....return server;}

bind方法内部对requestHandler进行了层层的装饰,但是这里也不是我们要找的地方,在NettyServer的doOpen才是我们要找的(具体流程可以看下网络通信的那篇文章)

    protected void doOpen() throws Throwable {//....bootstrap.setPipelineFactory(new ChannelPipelineFactory() {public ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();/*int idleTimeout = getIdleTimeout();if (idleTimeout > 10000) {pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));}*/pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});// ....}

可以看到,这个NettyHandler是处理请求的入口,看下NettyHandler的结构图,其中最下面的handler是requestHandler
image.png
当Netty接收到请求的时候,会调用NettyServer的messageReceived方法,请求依次经过图中红色框框的Handler,不同的Handler有不同的功能,只需要看下几个关键的Handler

AllChannelHandler

这个Handler在讲Dubbo的Dispatcher的ThreadPool的时候讲过,就是将请求交给线程池处理

    public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}

那么看下ChannelEventRunnable的run方法

    public void run() {switch (state) {case CONNECTED:handler.connected(channel);break;case DISCONNECTED:handler.disconnected(channel);break;case SENT:handler.sent(channel,message);break;case RECEIVED:handler.received(channel, message);break;case CAUGHT:handler.caught(channel, exception);break;default:logger.warn("unknown state: " + state + ", message is " + message);}}

在线程中将请求交由下一个handler处理,从上面的图中可以知道AllChannelHandler下两个handler是DecodeHandler和HeaderExchangeHandler,这两个Handler做了一些特殊处理,然后将请求转发给requestHandler

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {public Object reply(ExchangeChannel channel, Object message) throws RemotingException {if (message instanceof Invocation) {Invocation inv = (Invocation) message;Invoker<?> invoker = getInvoker(channel, inv);// 通过参数信息获取本地Invoker//....RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());return invoker.invoke(inv);//调用}throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}//....};

getInvoker方法如下

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{boolean isCallBackServiceInvoke = false;boolean isStubServiceInvoke = false;int port = channel.getLocalAddress().getPort();String path = inv.getAttachments().get(Constants.PATH_KEY);//xxx.xxx.xxx.xxxService//如果是客户端的回调服务.isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));if (isStubServiceInvoke){port = channel.getRemoteAddress().getPort();}//callbackisCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;if(isCallBackServiceInvoke){path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());}String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));// port+serviceName+version+group区分的key// 暴露服务的时候会将Exporter放到map中DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);if (exporter == null)throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);return exporter.getInvoker();}

getInvoker也是比较简单的,这时候得到的Invoker相当于是本地服务的一个代理对象,invoke之后就会调用到真实的方法

这篇关于Dubbo源码分析----处理请求的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120494

相关文章

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

解决docker目录内存不足扩容处理方案

《解决docker目录内存不足扩容处理方案》文章介绍了Docker存储目录迁移方法:因系统盘空间不足,需将Docker数据迁移到更大磁盘(如/home/docker),通过修改daemon.json配... 目录1、查看服务器所有磁盘的使用情况2、查看docker镜像和容器存储目录的空间大小3、停止dock

5 种使用Python自动化处理PDF的实用方法介绍

《5种使用Python自动化处理PDF的实用方法介绍》自动化处理PDF文件已成为减少重复工作、提升工作效率的重要手段,本文将介绍五种实用方法,从内置工具到专业库,帮助你在Python中实现PDF任务... 目录使用内置库(os、subprocess)调用外部工具使用 PyPDF2 进行基本 PDF 操作使用

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

Python异常处理之避免try-except滥用的3个核心原则

《Python异常处理之避免try-except滥用的3个核心原则》在Python开发中,异常处理是保证程序健壮性的关键机制,本文结合真实案例与Python核心机制,提炼出避免异常滥用的三大原则,有需... 目录一、精准打击:只捕获可预见的异常类型1.1 通用异常捕获的陷阱1.2 精准捕获的实践方案1.3

Java中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例解析

《Java中的分布式系统开发基于Zookeeper与Dubbo的应用案例解析》本文将通过实际案例,带你走进基于Zookeeper与Dubbo的分布式系统开发,本文通过实例代码给大家介绍的非常详... 目录Java 中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例一、分布式系统中的挑战二