go-resiliency源码解析之-batcher

2023-10-27 19:59

本文主要是介绍go-resiliency源码解析之-batcher,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

go-resiliency源码解析之-batcher

源代码地址 : https://github.com/eapache/go-resiliency/blob/master/batcher/batcher.go

1.batcher定义

创建一个batch对象需要2个参数:

Timeout:超时,这是一个batch对象收集输入参数的时间。

work函数变量:在timeout超时后,会调用一次work函数,来处理每一个输入参数。

整体处理流程如下图:

请添加图片描述

2.核心源码解析

核心结构定义

type work struct {//收集的一个参数param  interface{}//参数处理返回future chan error
}type Batcher struct {//收集参数的超时时间timeout   time.Duration//过滤器函数prefilter func(interface{}) error//互斥量,用于参数收集并发控制lock   sync.Mutex//存储收集到参数的chansubmit chan *work//批处理函数,超时后,调用该函数一次,处理全部参数//[]interface{}doWork func([]interface{}) errordone   chan bool
}

Run函数

//param是timeout内可收集参数,业务方调用Run函数传入参数
func (b *Batcher) Run(param interface{}) error {//先判断是否有过滤器函数。 prefilter相当于一个数据清洗函数,对无效param参数返回err,这样//在dowork里就不会处理这个输入参数if b.prefilter != nil {if err := b.prefilter(param); err != nil {return err}}//timeout==0表示无收集参数时间,需要立刻执行doWork函数if b.timeout == 0 {return b.doWork([]interface{}{param})}//当timeout > 0 ,就构造一个work对象放入到chan里w := &work{param:  param,future: make(chan error, 1),}b.submitWork(w)return <-w.future
}func (b *Batcher) Prefilter(filter func(interface{}) error) {b.prefilter = filter
}

submitWork函数:在Run函数里,当timeout > 0会调用submitWork函数

func (b *Batcher) submitWork(w *work) {//这里为什么要加一个互斥锁?//对,主要是防止下面if里的代码被并发执行b.lock.Lock()defer b.lock.Unlock()//创建submit的chan, 开启一个batch协程if b.submit == nil {b.done = make(chan bool)b.submit = make(chan *work, 4)go b.batch()}b.submit <- w
}func (b *Batcher) batch() {//params为收集参数集合var params []interface{}var futures []chan errorinput := b.submitgo b.timer()//for读取input这个chan,input在没有close前,这个for不会退出//所以这里就是在等待timeout时间,把输入的参数收集到params这个切片//?? 那input chan什么时候被close了?? 就是	go b.timer()这一句for work := range input {params = append(params, work.param)futures = append(futures, work.future)}//这里就是把收集到的参数传入到你设置的函数,执行业务逻辑ret := b.doWork(params)//把doWork执行结果写回到future,这样调用线程就可以读取到执行结果for _, future := range futures {future <- retclose(future)}close(b.done)
}func (b *Batcher) timer() {//阻塞协程timeout时间,然后调用flush函数time.Sleep(b.timeout)//主要就是关闭submit这个chan,让batch里收集参数for循环退出b.flush()
}func (b *Batcher) flush() {b.lock.Lock()defer b.lock.Unlock()if b.submit == nil {return}close(b.submit)b.submit = nil
}
3.测试用例

这个测试用例实现,在1s内收集传入的整形,然后求和

func TestBatcher(t *testing.T) {wg := &sync.WaitGroup{}b := New(time.Second, func(params []interface{}) error {sum := 0for _, p := range params {sum += p.(int)}t.Logf("sum %d", sum)return nil})b.Prefilter(func(param interface{}) error {// do some sort of sanity check on the parameter, and return an error if it failsreturn nil})for i := 1; i <= 10; i++ {wg.Add(1)go func(param interface{}) {go b.Run(i)wg.Done()}(i)}wg.Wait()time.Sleep(5 * time.Second)
}

这篇关于go-resiliency源码解析之-batcher的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/288030

相关文章

Spring三级缓存解决循环依赖的解析过程

《Spring三级缓存解决循环依赖的解析过程》:本文主要介绍Spring三级缓存解决循环依赖的解析过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、循环依赖场景二、三级缓存定义三、解决流程(以ServiceA和ServiceB为例)四、关键机制详解五、设计约

Redis实现分布式锁全解析之从原理到实践过程

《Redis实现分布式锁全解析之从原理到实践过程》:本文主要介绍Redis实现分布式锁全解析之从原理到实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景介绍二、解决方案(一)使用 SETNX 命令(二)设置锁的过期时间(三)解决锁的误删问题(四)Re

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

Golang HashMap实现原理解析

《GolangHashMap实现原理解析》HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持高效的插入、查找和删除操作,:本文主要介绍GolangH... 目录HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持

Go语言开发实现查询IP信息的MCP服务器

《Go语言开发实现查询IP信息的MCP服务器》随着MCP的快速普及和广泛应用,MCP服务器也层出不穷,本文将详细介绍如何在Go语言中使用go-mcp库来开发一个查询IP信息的MCP... 目录前言mcp-ip-geo 服务器目录结构说明查询 IP 信息功能实现工具实现工具管理查询单个 IP 信息工具的实现服

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

Python利用ElementTree实现快速解析XML文件

《Python利用ElementTree实现快速解析XML文件》ElementTree是Python标准库的一部分,而且是Python标准库中用于解析和操作XML数据的模块,下面小编就来和大家详细讲讲... 目录一、XML文件解析到底有多重要二、ElementTree快速入门1. 加载XML的两种方式2.

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组