go 协程池 ants库分析

2024-03-06 09:32
文章标签 分析 go ants 协程池

本文主要是介绍go 协程池 ants库分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

相比于创建多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。但是受限于资源总量,系统中能够创建的 goroutine 数量也是受限的。默认每个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能创建 8GB/8KB = 1000000 个 goroutine,更何况系统还需要保留一部分内存运行日常管理任务,go 运行时需要内存运行 gc、处理 goroutine 切换等。使用的内存超过机器内存容量,系统会使用交换区(swap),导致性能急速下降,甚至直接error

另一方面,goroutine 的管理也是一个问题。goroutine 只能自己运行结束,外部没有任何手段可以强制j结束一个 goroutine。如果一个 goroutine 因为某种原因没有自行结束,就会出现 goroutine 泄露。此外,频繁创建 goroutine 也是一个开销。

所以,我们就会需要一个goroutine池,自动管理goroutine生命周期,可以按需创建,动态缩容,自动安排任务的执行

以上内容摘自Go 每日一库之 ants

使用

使用上可以参考 Go 每日一库之 ants 里面给的例子,是一个计算大量整数和的程序。如下


// 包装任务需要执行的内容
// ants支持将一个不接受任何参数的函数作为任务提交给 goroutine 运行。
// 由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,
// 否则就必须用某种方式将需要的数据传递进去,例如闭包。
type taskFunc func()
func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {return func() {for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {*sum += num}fmt.Printf("task:%d sum:%d\n", i+1, *sum)wg.Done()}
}// 建立协程池
p, _ := ants.NewPool(10)
defer p.Release()// 生成随机数
nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}// 提交任务,并通过waitGroup来等待所有任务的结束
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()// 
var sum int
for _, partSum := range partSums {sum += partSum
}var expect int
for _, num := range nums {expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)

设计

执行流程图

 

类图如下 

整体设计比较清楚,

  1. Pool是对外提供的协程池对象,通过Options配置生成(NewPool函数)
  2. Pool里面有一个workerArray表示worker池,是一个抽象的接口,其主要就是管理goWorker
  3. goWorker就是实际运行我们任务的载体,通过调用run来执行
  4. ants提供了两种Pool,一个就是Pool,另一个是PoolWithFunc;前者接受一个不接受任何参数的函数作为任务提交给 goroutine 运行。由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,否则就必须用某种方式将需要的数据传递进去,例如闭包。(我们初始化的时候不需要提供执行函数,在需要执行的时候传入Sumit就可以了);后者在初始化的时候就要提供执行的函数体,然后在后续执行的时候,传入参数给函数体就可以了。这两种方式其实是等价的,使用前者的话,我们利用闭包传递参数就可以了;使用后者的话,我们可以把需要的参数都封装成一个结构体再传入;

优秀的设计

Options的思想

这个思想在go里面还是比较普遍的,比如 GitHub - libp2p/go-libp2p: libp2p implementation in Go 也有这种设计。这种设计的目的其实就是为了可以灵活配置我们的目标对象(在ants就是Pool),我们通过设置一个配置类,通过配置类生成我们的目标对象;那么我们如何配置这个配置类呢?一方面我们可以直接生成配置类,另一种类似堆积木的方式,我们可以传递一个函数组,这个函数组来操作我们要生成的配置类。也就是我们要提供给开发者一个传递函数的手段,如下

// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {return func(opts *Options) {opts.MaxBlockingTasks = maxBlockingTasks}
}

开发者直接调用这个函数传入Pool的初始化函数就可以了。这样的好处就在于把配置的复杂性留给了自己,使用者只需要调用意思明确的WithMaxBlockingTasks就可以了,而且我们如果添加了新的配置的话,对应使用者来说,也只是在需要使用的时候多堆叠一个函数

使用方式如下

func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)time.Sleep(1 * time.Second)wg.Done()}
}func main() {p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))defer p.Release()var wg sync.WaitGroupwg.Add(8)for i := 1; i <= 8; i++ {go func(i int) {err := p.Submit(wrapper(i, &wg))if err != nil {fmt.Printf("task:%d err:%v\n", i, err)wg.Done()}}(i)}wg.Wait()
}

我们可以看到NewPool的实现如下:


// Option represents the optional function.
type Option func(opts *Options)// 根据传入的配置函数来生成最后的配置类
func loadOptions(options ...Option) *Options {opts := new(Options)for _, option := range options {option(opts)}return opts
}// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {opts := loadOptions(options...)// 如果没有传入配置,就使用默认配置if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{capacity: int32(size),lock:     internal.NewSpinLock(),options:  opts,}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if size <= 0 {p.infinite = true}if p.options.PreAlloc {p.workers = newWorkerArray(loopQueueType, size)} else {p.workers = newWorkerArray(stackType, 0)}p.cond = sync.NewCond(p.lock)// Start a goroutine to clean up expired workers periodically.go p.periodicallyPurge()return p, nil
}

锁的设计

type spinLock uint32func (sl *spinLock) Lock() {for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {runtime.Gosched()}
}func (sl *spinLock) Unlock() {atomic.StoreUint32((*uint32)(sl), 0)
}// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {return new(spinLock)
}

我们可以看到作者再使用lock的时候,没有直接使用sync.Mutex,而是直接使用了自己实现的自旋锁,会一直等待直到获取锁,这样做可以减少协程上下文切换的开销,因为其实再协程池里面,每个任务都是等价的,谁前数后其实没多大区别,都是为了一个目的,就是完成分配的任务。

这篇关于go 协程池 ants库分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/779593

相关文章

从基础到高级详解Go语言中错误处理的实践指南

《从基础到高级详解Go语言中错误处理的实践指南》Go语言采用了一种独特而明确的错误处理哲学,与其他主流编程语言形成鲜明对比,本文将为大家详细介绍Go语言中错误处理详细方法,希望对大家有所帮助... 目录1 Go 错误处理哲学与核心机制1.1 错误接口设计1.2 错误与异常的区别2 错误创建与检查2.1 基础

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

Go语言中json操作的实现

《Go语言中json操作的实现》本文主要介绍了Go语言中的json操作的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录 一、jsOChina编程N 与 Go 类型对应关系️ 二、基本操作:编码与解码 三、结构体标签(Struc

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

使用Go调用第三方API的方法详解

《使用Go调用第三方API的方法详解》在现代应用开发中,调用第三方API是非常常见的场景,比如获取天气预报、翻译文本、发送短信等,Go作为一门高效并发的编程语言,拥有强大的标准库和丰富的第三方库,可以... 目录引言一、准备工作二、案例1:调用天气查询 API1. 注册并获取 API Key2. 代码实现3

基于Go语言开发一个 IP 归属地查询接口工具

《基于Go语言开发一个IP归属地查询接口工具》在日常开发中,IP地址归属地查询是一个常见需求,本文将带大家使用Go语言快速开发一个IP归属地查询接口服务,有需要的小伙伴可以了解下... 目录功能目标技术栈项目结构核心代码(main.go)使用方法扩展功能总结在日常开发中,IP 地址归属地查询是一个常见需求: