p-limit源码解读--30行代码,高大上解决Promise的多并发问题

2023-10-21 17:12

本文主要是介绍p-limit源码解读--30行代码,高大上解决Promise的多并发问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

提起控制并发,大家应该不陌生,我们可以先来看看多并发,再去聊聊为什么要去控制它

多并发一般是指多个异步操作同时进行,而运行的环境中资源是有限的,短时间内过多的并发,会对所运行的环境造成很大的压力,比如前端的浏览器,后端的服务器,常见的多并发操作有:

  • 前端的多个接口同时请求
  • 前端多条数据异步处理
  • Nodejs的多个数据操作同时进行
  • Nodejs对多个文件同时进行修改

 正是因为多并发会造成压力,所以我们才需要去控制他,降低这个压力,比如我可以控制最大并发数是 3,这样的话即使有100个并发,我也能保证最多同时并发的最大数量是 3

代码实现

实现思路

大致思路就是,假设现在有 9 个并发,我设置最大并发为 3,那么我将会走下面这些步骤:

  • 1、先定好三个坑位
  • 2、让前三个并发进去坑位执行
  • 3、看哪个坑位并发先执行完,就从剩余的并发中拿一个进去补坑
  • 4、一直重复第 3 步,一直到所有并发执行完

Promise.all

在进行多并发的时候,我们通常会使用Promise.all,但是Promise.all并不能控制并发,或者说它本来就没这个能力,我们可以看下面的例子

const fetchFn = (delay, index) => {return new Promise(resolve => {console.log(index)setTimeout(() => {resolve(index)}, delay);})
}const promises = [fetchFn(1000, 1),fetchFn(1000, 2),fetchFn(1000, 3),fetchFn(1000, 4),fetchFn(1000, 5),fetchFn(1000, 6)
]Promise.all(promises)

最后是同时输出,这说明这几个并发是同时发生的

 const fetchFn = (delay, index) => {

          return new Promise((resolve) => {

            console.log(index)

            setTimeout(() => {

              resolve(index)

            }, delay)

          })

        }

        const promiselist = []

        for (let i = 0; i < 100; i++) {

          const promise = fetchFn(1000, i)

          promiselist.push(promise)

        }

        console.log('promiselist', promiselist)

        const results = await Promise.all(promiselist)

        console.log('results:', results)

所以我们需要做一些改造,让Promise.all执行 promises 时支持控制并发,但是我们改造的不应该是Promise.all,而是这一个个的fetchFn

期望效果

const limitFn = (limit) => {// ...coding
}// 最大并发数 2
const generator = limitFn(2)const promises = [generator(() => fetchFn(1000, 1)),generator(() => fetchFn(1000, 2)),generator(() => fetchFn(1000, 3)),generator(() => fetchFn(1000, 4)),generator(() => fetchFn(1000, 5)),generator(() => fetchFn(1000, 6))
]Promise.all(promises)

这是一个很出名的库的源码,就是p-limit

实现 limitFn

我们需要在函数内部维护两个变量:

  • queue:队列,用来存每一个改造过的并发
  • activeCount: 用来记录正在执行的并发数

并声明函数 generator ,这个函数返回一个 Promise,因为 Promise.all 最好是接收一个 Promise 数组

const limitFn = (concurrency) => {const queue = [];let activeCount = 0;const generator = (fn, ...args) =>new Promise((resolve) => {enqueue(fn, resolve, ...args);});return generator;
};

接下来我们来实现 enqueue 这个函数做两件事:

  • 将每一个 fetchFn 放进队列里
  • 将坑位里的 fetchFn 先执行
const enqueue = (fn, resolve, ...args) => {queue.push(run.bind(null, fn, resolve, ...args));if (activeCount < limit && queue.length > 0) {queue.shift()();}
};

假如我设置最大并发数为 2,那么这一段代码在一开始的时候只会执行 2 次,因为一开始只会有 2 次符合 if 判断,大家可以思考一下为什么~

if (activeCount < limit && queue.length > 0) {queue.shift()(); // 这段代码}

一开始执行 2 次,说明这时候两个坑位已经各自有一个 fetchFn 在执行了

接下来我们实现 run 函数,这个函数是用来包装 fetch 的,他完成几件事情:

  • 1、将 activeCount++ ,这时候执行中的并发数 +1
  • 2、将 fetchFn 执行,并把结果 resolve 出去,说明这个并发执行完了
  • 3、将 activeCount--,这时候执行中的并发数 -1
  • 4、从 queue 中取一个并发,拿来补坑执行
const run = async (fn, resolve, ...args) => {activeCount++;const result = (async () => fn(...args))();try {const res = await result;resolve(res);} catch { }next();
};

其实第 3、4 步,是在 next 函数里面执行的

const next = () => {activeCount--;if (queue.length > 0) {queue.shift()();}
};

完整代码

      const limitFn = (limit) => {const queue = []let activeCount = 0const next = () => {activeCount--if (queue.length > 0) {queue.shift()()}}const run = async (fn, resolve, ...args) => {activeCount++const result = (async () => fn(...args))()try {const res = await resultresolve(res)} catch {}next()}const enqueue = (fn, resolve, ...args) => {queue.push(run.bind(null, fn, resolve, ...args))if (activeCount < limit && queue.length > 0) {queue.shift()()}}const generator = (fn, ...args) =>new Promise((resolve) => {enqueue(fn, resolve, ...args)})return generator}// 最大并发数 2const generator = limitFn(2)const fetchFn = (delay, index) => {return new Promise((resolve) => {console.log(index)setTimeout(() => {resolve(index)}, delay)})}const promises = [generator(() => fetchFn(1000, 1)),generator(() => fetchFn(1000, 2)),generator(() => fetchFn(1000, 3)),generator(() => fetchFn(1000, 4)),generator(() => fetchFn(1000, 5)),generator(() => fetchFn(1000, 6)),]Promise.all(promises)

这篇关于p-limit源码解读--30行代码,高大上解决Promise的多并发问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/255797

相关文章

Python使用Tenacity一行代码实现自动重试详解

《Python使用Tenacity一行代码实现自动重试详解》tenacity是一个专为Python设计的通用重试库,它的核心理念就是用简单、清晰的方式,为任何可能失败的操作添加重试能力,下面我们就来看... 目录一切始于一个简单的 API 调用Tenacity 入门:一行代码实现优雅重试精细控制:让重试按我

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

C语言中%zu的用法解读

《C语言中%zu的用法解读》size_t是无符号整数类型,用于表示对象大小或内存操作结果,%zu是C99标准中专为size_t设计的printf占位符,避免因类型不匹配导致错误,使用%u或%d可能引发... 目录size_t 类型与 %zu 占位符%zu 的用途替代占位符的风险兼容性说明其他相关占位符验证示

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

SpringBoot监控API请求耗时的6中解决解决方案

《SpringBoot监控API请求耗时的6中解决解决方案》本文介绍SpringBoot中记录API请求耗时的6种方案,包括手动埋点、AOP切面、拦截器、Filter、事件监听、Micrometer+... 目录1. 简介2.实战案例2.1 手动记录2.2 自定义AOP记录2.3 拦截器技术2.4 使用Fi

kkFileView启动报错:报错2003端口占用的问题及解决

《kkFileView启动报错:报错2003端口占用的问题及解决》kkFileView启动报错因office组件2003端口未关闭,解决:查杀占用端口的进程,终止Java进程,使用shutdown.s... 目录原因解决总结kkFileViewjavascript启动报错启动office组件失败,请检查of

SQL Server安装时候没有中文选项的解决方法

《SQLServer安装时候没有中文选项的解决方法》用户安装SQLServer时界面全英文,无中文选项,通过修改安装设置中的国家或地区为中文中国,重启安装程序后界面恢复中文,解决了问题,对SQLSe... 你是不是在安装SQL Server时候发现安装界面和别人不同,并且无论如何都没有中文选项?这个问题也

Linux系统之lvcreate命令使用解读

《Linux系统之lvcreate命令使用解读》lvcreate是LVM中创建逻辑卷的核心命令,支持线性、条带化、RAID、镜像、快照、瘦池和缓存池等多种类型,实现灵活存储资源管理,需注意空间分配、R... 目录lvcreate命令详解一、命令概述二、语法格式三、核心功能四、选项详解五、使用示例1. 创建逻