go 协程池 ants库分析

2024-03-06 09:32
文章标签 分析 go ants 协程池

本文主要是介绍go 协程池 ants库分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

相比于创建多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。但是受限于资源总量,系统中能够创建的 goroutine 数量也是受限的。默认每个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能创建 8GB/8KB = 1000000 个 goroutine,更何况系统还需要保留一部分内存运行日常管理任务,go 运行时需要内存运行 gc、处理 goroutine 切换等。使用的内存超过机器内存容量,系统会使用交换区(swap),导致性能急速下降,甚至直接error

另一方面,goroutine 的管理也是一个问题。goroutine 只能自己运行结束,外部没有任何手段可以强制j结束一个 goroutine。如果一个 goroutine 因为某种原因没有自行结束,就会出现 goroutine 泄露。此外,频繁创建 goroutine 也是一个开销。

所以,我们就会需要一个goroutine池,自动管理goroutine生命周期,可以按需创建,动态缩容,自动安排任务的执行

以上内容摘自Go 每日一库之 ants

使用

使用上可以参考 Go 每日一库之 ants 里面给的例子,是一个计算大量整数和的程序。如下


// 包装任务需要执行的内容
// ants支持将一个不接受任何参数的函数作为任务提交给 goroutine 运行。
// 由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,
// 否则就必须用某种方式将需要的数据传递进去,例如闭包。
type taskFunc func()
func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {return func() {for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {*sum += num}fmt.Printf("task:%d sum:%d\n", i+1, *sum)wg.Done()}
}// 建立协程池
p, _ := ants.NewPool(10)
defer p.Release()// 生成随机数
nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}// 提交任务,并通过waitGroup来等待所有任务的结束
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()// 
var sum int
for _, partSum := range partSums {sum += partSum
}var expect int
for _, num := range nums {expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)

设计

执行流程图

 

类图如下 

整体设计比较清楚,

  1. Pool是对外提供的协程池对象,通过Options配置生成(NewPool函数)
  2. Pool里面有一个workerArray表示worker池,是一个抽象的接口,其主要就是管理goWorker
  3. goWorker就是实际运行我们任务的载体,通过调用run来执行
  4. ants提供了两种Pool,一个就是Pool,另一个是PoolWithFunc;前者接受一个不接受任何参数的函数作为任务提交给 goroutine 运行。由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,否则就必须用某种方式将需要的数据传递进去,例如闭包。(我们初始化的时候不需要提供执行函数,在需要执行的时候传入Sumit就可以了);后者在初始化的时候就要提供执行的函数体,然后在后续执行的时候,传入参数给函数体就可以了。这两种方式其实是等价的,使用前者的话,我们利用闭包传递参数就可以了;使用后者的话,我们可以把需要的参数都封装成一个结构体再传入;

优秀的设计

Options的思想

这个思想在go里面还是比较普遍的,比如 GitHub - libp2p/go-libp2p: libp2p implementation in Go 也有这种设计。这种设计的目的其实就是为了可以灵活配置我们的目标对象(在ants就是Pool),我们通过设置一个配置类,通过配置类生成我们的目标对象;那么我们如何配置这个配置类呢?一方面我们可以直接生成配置类,另一种类似堆积木的方式,我们可以传递一个函数组,这个函数组来操作我们要生成的配置类。也就是我们要提供给开发者一个传递函数的手段,如下

// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {return func(opts *Options) {opts.MaxBlockingTasks = maxBlockingTasks}
}

开发者直接调用这个函数传入Pool的初始化函数就可以了。这样的好处就在于把配置的复杂性留给了自己,使用者只需要调用意思明确的WithMaxBlockingTasks就可以了,而且我们如果添加了新的配置的话,对应使用者来说,也只是在需要使用的时候多堆叠一个函数

使用方式如下

func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)time.Sleep(1 * time.Second)wg.Done()}
}func main() {p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))defer p.Release()var wg sync.WaitGroupwg.Add(8)for i := 1; i <= 8; i++ {go func(i int) {err := p.Submit(wrapper(i, &wg))if err != nil {fmt.Printf("task:%d err:%v\n", i, err)wg.Done()}}(i)}wg.Wait()
}

我们可以看到NewPool的实现如下:


// Option represents the optional function.
type Option func(opts *Options)// 根据传入的配置函数来生成最后的配置类
func loadOptions(options ...Option) *Options {opts := new(Options)for _, option := range options {option(opts)}return opts
}// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {opts := loadOptions(options...)// 如果没有传入配置,就使用默认配置if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{capacity: int32(size),lock:     internal.NewSpinLock(),options:  opts,}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if size <= 0 {p.infinite = true}if p.options.PreAlloc {p.workers = newWorkerArray(loopQueueType, size)} else {p.workers = newWorkerArray(stackType, 0)}p.cond = sync.NewCond(p.lock)// Start a goroutine to clean up expired workers periodically.go p.periodicallyPurge()return p, nil
}

锁的设计

type spinLock uint32func (sl *spinLock) Lock() {for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {runtime.Gosched()}
}func (sl *spinLock) Unlock() {atomic.StoreUint32((*uint32)(sl), 0)
}// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {return new(spinLock)
}

我们可以看到作者再使用lock的时候,没有直接使用sync.Mutex,而是直接使用了自己实现的自旋锁,会一直等待直到获取锁,这样做可以减少协程上下文切换的开销,因为其实再协程池里面,每个任务都是等价的,谁前数后其实没多大区别,都是为了一个目的,就是完成分配的任务。

这篇关于go 协程池 ants库分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/779593

相关文章

慢sql提前分析预警和动态sql替换-Mybatis-SQL

《慢sql提前分析预警和动态sql替换-Mybatis-SQL》为防止慢SQL问题而开发的MyBatis组件,该组件能够在开发、测试阶段自动分析SQL语句,并在出现慢SQL问题时通过Ducc配置实现动... 目录背景解决思路开源方案调研设计方案详细设计使用方法1、引入依赖jar包2、配置组件XML3、核心配

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

Go语言开发实现查询IP信息的MCP服务器

《Go语言开发实现查询IP信息的MCP服务器》随着MCP的快速普及和广泛应用,MCP服务器也层出不穷,本文将详细介绍如何在Go语言中使用go-mcp库来开发一个查询IP信息的MCP... 目录前言mcp-ip-geo 服务器目录结构说明查询 IP 信息功能实现工具实现工具管理查询单个 IP 信息工具的实现服

Java程序进程起来了但是不打印日志的原因分析

《Java程序进程起来了但是不打印日志的原因分析》:本文主要介绍Java程序进程起来了但是不打印日志的原因分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java程序进程起来了但是不打印日志的原因1、日志配置问题2、日志文件权限问题3、日志文件路径问题4、程序

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

go 指针接收者和值接收者的区别小结

《go指针接收者和值接收者的区别小结》在Go语言中,值接收者和指针接收者是方法定义中的两种接收者类型,本文主要介绍了go指针接收者和值接收者的区别小结,文中通过示例代码介绍的非常详细,需要的朋友们下... 目录go 指针接收者和值接收者的区别易错点辨析go 指针接收者和值接收者的区别指针接收者和值接收者的

Python 迭代器和生成器概念及场景分析

《Python迭代器和生成器概念及场景分析》yield是Python中实现惰性计算和协程的核心工具,结合send()、throw()、close()等方法,能够构建高效、灵活的数据流和控制流模型,这... 目录迭代器的介绍自定义迭代器省略的迭代器生产器的介绍yield的普通用法yield的高级用法yidle

Go 语言中的select语句详解及工作原理

《Go语言中的select语句详解及工作原理》在Go语言中,select语句是用于处理多个通道(channel)操作的一种控制结构,它类似于switch语句,本文给大家介绍Go语言中的select语... 目录Go 语言中的 select 是做什么的基本功能语法工作原理示例示例 1:监听多个通道示例 2:带

C++ Sort函数使用场景分析

《C++Sort函数使用场景分析》sort函数是algorithm库下的一个函数,sort函数是不稳定的,即大小相同的元素在排序后相对顺序可能发生改变,如果某些场景需要保持相同元素间的相对顺序,可使... 目录C++ Sort函数详解一、sort函数调用的两种方式二、sort函数使用场景三、sort函数排序