OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配

2024-01-22 16:08

本文主要是介绍OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Dispatcher功能是什么?

java doc:

Policy on when async requests are executed.Each dispatcher uses an ExecutorService to run calls internally. If you supply your own executor, it should be able to run the configured maximum number of calls concurrently.

简单翻译就是:

  • 控制异步请求何时执行。
  • 每个dispatcher拥有一个ExecutorService执行异步请求。
  • 用户可配置自定义ExecutorService,要求最大可执行线程至少是maxRequests。
  • maxRequests可通过dispatcher的getMaxRequests()方法获取。
     

接下来简单介绍一下我对dispatcher功能的理解,这样有利于理解后面的内容:

  • Dispatcher主要管理异步请求任务策略,负责分配异步线程资源,控制异步连接数。
  • Dispatcher只覆盖异步任务调度策略层面的逻辑,往下的执行过程对其来说是透明的。
  • 对于同步任务,Dispatcher只是简单记录当前运行的任务任务实体(RealCall),并且是由RealCall主动注册和注销。
  • dispatcher和RealCall、AsyncCall的耦合性比较高,它们之间会相互调用,所以它们的代码往往要相互结合来看。
     

Dispatcher的主要属性

//最大异步任务数,注意是异步不包括同步的。
private int maxRequests = 64;
//对同一个主机的最大异步任务数,同样是异步不包括同步。
private int maxRequestsPerHost = 5;
//请求任务结束,如果当前预执行任务队列为空,线程进入空闲状态会回调该接口。
private @Nullable Runnable idleCallback;
//执行异步任务的线程池。
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
//预执行的异步任务队列
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行的异步任务队列。
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步任务队列。
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

maxRequests 和maxRequestsPerHost 为什么只记录异步请求数呢:

  • 如果用户使用单线程 + 同步任务请求,那么同时活跃的任务数肯定只有单个,没必要控制。
  • 如果用户使用多线程或者线程池 + 同步请求的话,那相当于用户自己定制和实现了异步请求策略,那么对于异步请求的管理肯定交给用户是最合适的,OkHttp也很难去管理用户的自定义实现。
  • 用户可以通过配置OkHttpClient来修改dispatcher的属性,从而扩展异步请求的策略。
     

Dispatcher的ExecutorService默认实现

在了解异步任务的执行流程之前,我们先来简单了解一下Dispatcher用来执行异步任务的默认线程池。
代码1:

public synchronized ExecutorService executorService() {if (executorService == null) {//0:表示没有核心线程,也就是没有常驻线程。//Integer.MAX_VALUE:表示活跃线程等同于最大整数,活跃线程不会常驻,有最大空闲存活时间限制。//60和TimeUnit.SECONDS:活跃线程的最大空闲存活时间是60秒//new SynchronousQueue<>():同步阻塞队列(大家可以网上找一下这方面资料了解一下),这个队列不存在容器属性,如果消费不及时,生成端put动作会被阻塞。<br/>在这里的效果就是,如果调用了ExecutorService.execute()后,如果没有空闲线程或者还没来得及创建线程,那么execute()会被阻塞,直到有线程来消费。//Util.threadFactory("OkHttp Dispatcher", false):线程工程,创建的线程添加名称前缀OkHttp Dispatcher;创建的线程为守护线程。//第六个参数executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}

总结重要的三点:

  • 线程池几乎不限制线程数。
  • 线程默认空闲存活60秒。
  • ExecutorService.execute()方法调用时,如果没有线程及时消费会一直阻塞。

Dispatcher异步任务调度策略

异步任务的执行策略的大概流程:

从上面可以看出涉及Dispatcher的两个关键方法:enqueue(AsyncCall)和promoteAndExecute()。下面就分别来分析这两个方法。

方法:enqueue(AsyncCall)
enqueue方法还没有对AsyncCall进行真正的资源分配和调度,只是对AsyncCall进行一些设置,真正的调度逻辑是由后面的promoteAndExecute()方法实现。
我们先来简单看一下enqueue方法的流程:
 

接着我们分析一下代码:

void enqueue(AsyncCall call) {synchronized (this) {//第一步:添加AsyncCall到预执行队列readyAsyncCalls.add(call);//第二步if (!call.get().forWebSocket) {AsyncCall existingCall = findExistingCallWithHost(call.host());if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);}}//第三步promoteAndExecute();
}

这方法就三部分,我相信第1、3步大家都是一眼就看穿了,所以就只分析一下第二步,其代码逻辑是设置同一Host的连接计数器:

2.1 同一Host的连接计数器主要是和maxRequestsPerHost属性做比较,目的是控制对同一Host服务器的连接数。

2.2 通过让具有相同Host的AsyncCall对象都共用一个计数器来实现。通过synchronized锁保证同一时间进入代码块的只有一个AsyncCall对象。

  • 通过synchronized锁保证同一时间进入代码块的只有一个AsyncCall对象。
  • 调用findExistingCallWithHost(call.host())方法:查找是否已经存在至少一个相同Host的AsyncCall对象,并且返回任意一个。
@Nullable 
private AsyncCall findExistingCallWithHost(String host) {for (AsyncCall existingCall : runningAsyncCalls) {if (existingCall.host().equals(host)) return existingCall;}for (AsyncCall existingCall : readyAsyncCalls) {if (existingCall.host().equals(host)) return existingCall;}return null;
}
  • 如果存在,就把之前AsyncCall对象的计数器也设置给当前的AsyncCall对象;如果不存在就直接使用当前AsyncCall对象的计数器。因为加了锁保护,这样就保证了,如果存在一段连续的时间段,该时间段内一直存在对某Host的异步请求在执行或者等待执行,那么对于该host,后面的AsyncCall对象都是共用第一个AsyncCall对象创建的计数器,直到在某个时间点不再存在连续的异步请求。
    final class AsyncCall extends NamedRunnable {private final Callback responseCallback;//同一Host的连接计数器private volatile AtomicInteger callsPerHost = new AtomicInteger(0);...//设置计数器void reuseCallsPerHostFrom(AsyncCall other) {this.callsPerHost = other.callsPerHost;}...

方法:promoteAndExecute()

promoteAndExecute()负责真正对AsyncCall进行资源的调度。
和上面一样,我们还是先来看一下简单的流程:

接着我们在解析一下代码:

private boolean promoteAndExecute() {assert (!Thread.holdsLock(this));//创建空的可执行AsyncCall集合List<AsyncCall> executableCalls = new ArrayList<>();boolean isRunning;//锁保护synchronized (this) {//对预执行队列进行迭代循环for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall asyncCall = i.next();//正在执行的队列size是否已经>=maxRequests,如果是跳出迭代循环。if (runningAsyncCalls.size() >= maxRequests,) break; //判断同一Host的连接计数器的值是否>=maxRequestsPerHost,如果是跳出迭代循环。if (asyncCall.callsPerHost().get() >= maxRequestsPerHost,) continue; //从迭代器弹出,也就是从readyAsyncCalls删除了。i.remove();//同一Host的连接计数器自增1asyncCall.callsPerHost().incrementAndGet();//添加到可执行集合。executableCalls.add(asyncCall);//添加到正在执行队列,也就是这时候asyncCall对象已经是被当作执行中状态的了。runningAsyncCalls.add(asyncCall);}isRunning = runningCallsCount() > 0;}//遍历可执行集合for (int i = 0, size = executableCalls.size(); i < size; i++) {AsyncCall asyncCall = executableCalls.get(i);//调用asyncCall.executeOn方法。asyncCall.executeOn(executorService());}return isRunning;
}

代码的重要步骤的解析我都加在上面注释里面了,相信也不难看懂。

但是最后还是要简单介绍一下“asyncCall.executeOn(executorService())”调用的执行逻辑。其实异步任务在线程资源层面的策略,是有OkHttpClient、Dispatcher和Call之间相互协作完成的,所以你单单只看Dispatcher的代码,你可能有点难以勾勒出一个相对清晰和完整的功能流程。

asyncCall.executeOn(executorService())执行流程

在开始理解AsyncCall#executeOn(ExecutorService)执行流程之前,先简单了解AsyncCall的一些基本性质:

  • AsyncCall是NamedRunnable的子类,NamedRunnable实现了Runnable接口,因此AsyncCall对象可以直接作为参数让方法“ExecutorService#execute(Runnable)”执行。
  • NamedRunnable实现了run()方法,run()方法的具体任务逻辑委派给子类execute()方法,因此“executorService#execute(Runnable)”主要执行的是AsyncCall的execute()方法。
     

下面我们来看两个具体的代码片段:

void executeOn(ExecutorService executorService) {assert (!Thread.holdsLock(client.dispatcher()));boolean success = false;try {//Call任务被线程池执行executorService.execute(this);success = true;} catch (RejectedExecutionException e) {...} finally {if (!success) {//重点是这里client.dispatcher().finished(this);}}
}@Override 
protected void execute() {boolean signalledCallback = false;transmitter.timeoutEnter();try {Response response = getResponseWithInterceptorChain();...} catch (IOException e) {...} catch (Throwable t) {...} finally {//其他都不看,先看这client.dispatcher().finished(this);}
}

因此接着上面Call.executeOn的流程继续画:

 

Dispatcher#finished(AsyncCall)功能:

  • 同一host连接计数器递减1。
  • 把当前asyncCall对象移出正在执行队列(runningAsyncCalls)。其实到这一步当前的AsyncCall对象的使命就已经完全结束了,后面是Dispatcher自身循环调用的逻辑。
  • 再次调用promoteAndExecute(),从预执行任务队列中拉取任务执行。
  • 如果预执行任务队列已经为空,调用线程空闲回调。

Dispatcher异步任务调度策略小结

到这里异步任务请求AsyncCall在Dispatcher中的整个生命周期就已经理清楚了。Dispatcher只覆盖AsyncCall在线程资源层面的执行策略,再往下的执行过程对其来说是透明的。
AsyncCall到底是从哪里进入Dispatcher的世界的,又在里面发生了什么,最后又是怎么样离它而去的?

Dispatcher同步任务策略

相对于异步任务,Dispatcher对于同步任务的管理是非常简单的,就只有两步:

第一步,同步请求任务RealCall对象在发起请求之前,由RealCall对象主动调用Dispatcher#executed(RealCall)方法,把当前RealCall对象添加到同步任务执行中队列。

注意:同步任务执行中队列是runningSyncCalls,不是runningAsyncCalls,后者是异步任务执行中队列。

synchronized void executed(RealCall call) {runningSyncCalls.add(call);
}

第二步,同步请求任务结束后,再由RealCall对象主动调用Dispatcher#finished(RealCall)方法,把当前RealCall对象从是runningSyncCalls中移除。

void finished(RealCall call) {finished(runningSyncCalls, call);
}

上面的流程都是由RealCall发起的,Dispatcher不存在发起执行的入口,这个和异步是不一样的。

@Override 
public Response execute() throws IOException {...try {//调用Dispatcher#executed(RealCall)client.dispatcher().executed(this);//真正执行请求动作return getResponseWithInterceptorChain();} finally {//调用Dispatcher#finished(RealCall)client.dispatcher().finished(this);}
}

总结

Dispatcher主要管理异步请求任务策略,负责分配异步线程资源,控制异步连接数,只覆盖策略层面的逻辑,往下的执行过程对其来说是透明的。而对于同步任务,Dispatcher只是简单记录当前运行的任务任务实体(RealCall),并且是由RealCall主动注册和注销。

这篇关于OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/633491

相关文章

JDK21对虚拟线程的几种用法实践指南

《JDK21对虚拟线程的几种用法实践指南》虚拟线程是Java中的一种轻量级线程,由JVM管理,特别适合于I/O密集型任务,:本文主要介绍JDK21对虚拟线程的几种用法,文中通过代码介绍的非常详细,... 目录一、参考官方文档二、什么是虚拟线程三、几种用法1、Thread.ofVirtual().start(

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

Java 线程池+分布式实现代码

《Java线程池+分布式实现代码》在Java开发中,池通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,:本文主要介绍Java线程池+分布式实现代码,需要... 目录1. 线程池1.1 自定义线程池实现1.1.1 线程池核心1.1.2 代码示例1.2 总结流程2. J

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

Java JUC并发集合详解之线程安全容器完全攻略

《JavaJUC并发集合详解之线程安全容器完全攻略》Java通过java.util.concurrent(JUC)包提供了一整套线程安全的并发容器,它们不仅是简单的同步包装,更是基于精妙并发算法构建... 目录一、为什么需要JUC并发集合?二、核心并发集合分类与详解三、选型指南:如何选择合适的并发容器?在多

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe