go-resiliency源码解析之-batcher

2023-10-27 19:59

本文主要是介绍go-resiliency源码解析之-batcher,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

go-resiliency源码解析之-batcher

源代码地址 : https://github.com/eapache/go-resiliency/blob/master/batcher/batcher.go

1.batcher定义

创建一个batch对象需要2个参数:

Timeout:超时,这是一个batch对象收集输入参数的时间。

work函数变量:在timeout超时后,会调用一次work函数,来处理每一个输入参数。

整体处理流程如下图:

请添加图片描述

2.核心源码解析

核心结构定义

type work struct {//收集的一个参数param  interface{}//参数处理返回future chan error
}type Batcher struct {//收集参数的超时时间timeout   time.Duration//过滤器函数prefilter func(interface{}) error//互斥量,用于参数收集并发控制lock   sync.Mutex//存储收集到参数的chansubmit chan *work//批处理函数,超时后,调用该函数一次,处理全部参数//[]interface{}doWork func([]interface{}) errordone   chan bool
}

Run函数

//param是timeout内可收集参数,业务方调用Run函数传入参数
func (b *Batcher) Run(param interface{}) error {//先判断是否有过滤器函数。 prefilter相当于一个数据清洗函数,对无效param参数返回err,这样//在dowork里就不会处理这个输入参数if b.prefilter != nil {if err := b.prefilter(param); err != nil {return err}}//timeout==0表示无收集参数时间,需要立刻执行doWork函数if b.timeout == 0 {return b.doWork([]interface{}{param})}//当timeout > 0 ,就构造一个work对象放入到chan里w := &work{param:  param,future: make(chan error, 1),}b.submitWork(w)return <-w.future
}func (b *Batcher) Prefilter(filter func(interface{}) error) {b.prefilter = filter
}

submitWork函数:在Run函数里,当timeout > 0会调用submitWork函数

func (b *Batcher) submitWork(w *work) {//这里为什么要加一个互斥锁?//对,主要是防止下面if里的代码被并发执行b.lock.Lock()defer b.lock.Unlock()//创建submit的chan, 开启一个batch协程if b.submit == nil {b.done = make(chan bool)b.submit = make(chan *work, 4)go b.batch()}b.submit <- w
}func (b *Batcher) batch() {//params为收集参数集合var params []interface{}var futures []chan errorinput := b.submitgo b.timer()//for读取input这个chan,input在没有close前,这个for不会退出//所以这里就是在等待timeout时间,把输入的参数收集到params这个切片//?? 那input chan什么时候被close了?? 就是	go b.timer()这一句for work := range input {params = append(params, work.param)futures = append(futures, work.future)}//这里就是把收集到的参数传入到你设置的函数,执行业务逻辑ret := b.doWork(params)//把doWork执行结果写回到future,这样调用线程就可以读取到执行结果for _, future := range futures {future <- retclose(future)}close(b.done)
}func (b *Batcher) timer() {//阻塞协程timeout时间,然后调用flush函数time.Sleep(b.timeout)//主要就是关闭submit这个chan,让batch里收集参数for循环退出b.flush()
}func (b *Batcher) flush() {b.lock.Lock()defer b.lock.Unlock()if b.submit == nil {return}close(b.submit)b.submit = nil
}
3.测试用例

这个测试用例实现,在1s内收集传入的整形,然后求和

func TestBatcher(t *testing.T) {wg := &sync.WaitGroup{}b := New(time.Second, func(params []interface{}) error {sum := 0for _, p := range params {sum += p.(int)}t.Logf("sum %d", sum)return nil})b.Prefilter(func(param interface{}) error {// do some sort of sanity check on the parameter, and return an error if it failsreturn nil})for i := 1; i <= 10; i++ {wg.Add(1)go func(param interface{}) {go b.Run(i)wg.Done()}(i)}wg.Wait()time.Sleep(5 * time.Second)
}

这篇关于go-resiliency源码解析之-batcher的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/288030

相关文章

深度解析Python中递归下降解析器的原理与实现

《深度解析Python中递归下降解析器的原理与实现》在编译器设计、配置文件处理和数据转换领域,递归下降解析器是最常用且最直观的解析技术,本文将详细介绍递归下降解析器的原理与实现,感兴趣的小伙伴可以跟随... 目录引言:解析器的核心价值一、递归下降解析器基础1.1 核心概念解析1.2 基本架构二、简单算术表达

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

Java MCP 的鉴权深度解析

《JavaMCP的鉴权深度解析》文章介绍JavaMCP鉴权的实现方式,指出客户端可通过queryString、header或env传递鉴权信息,服务器端支持工具单独鉴权、过滤器集中鉴权及启动时鉴权... 目录一、MCP Client 侧(负责传递,比较简单)(1)常见的 mcpServers json 配置

从原理到实战解析Java Stream 的并行流性能优化

《从原理到实战解析JavaStream的并行流性能优化》本文给大家介绍JavaStream的并行流性能优化:从原理到实战的全攻略,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的... 目录一、并行流的核心原理与适用场景二、性能优化的核心策略1. 合理设置并行度:打破默认阈值2. 避免装箱

Maven中生命周期深度解析与实战指南

《Maven中生命周期深度解析与实战指南》这篇文章主要为大家详细介绍了Maven生命周期实战指南,包含核心概念、阶段详解、SpringBoot特化场景及企业级实践建议,希望对大家有一定的帮助... 目录一、Maven 生命周期哲学二、default生命周期核心阶段详解(高频使用)三、clean生命周期核心阶

GO语言短变量声明的实现示例

《GO语言短变量声明的实现示例》在Go语言中,短变量声明是一种简洁的变量声明方式,使用:=运算符,可以自动推断变量类型,下面就来具体介绍一下如何使用,感兴趣的可以了解一下... 目录基本语法功能特点与var的区别适用场景注意事项基本语法variableName := value功能特点1、自动类型推

GO语言中函数命名返回值的使用

《GO语言中函数命名返回值的使用》在Go语言中,函数可以为其返回值指定名称,这被称为命名返回值或命名返回参数,这种特性可以使代码更清晰,特别是在返回多个值时,感兴趣的可以了解一下... 目录基本语法函数命名返回特点代码示例命名特点基本语法func functionName(parameters) (nam

深入解析C++ 中std::map内存管理

《深入解析C++中std::map内存管理》文章详解C++std::map内存管理,指出clear()仅删除元素可能不释放底层内存,建议用swap()与空map交换以彻底释放,针对指针类型需手动de... 目录1️、基本清空std::map2️、使用 swap 彻底释放内存3️、map 中存储指针类型的对象

Java Scanner类解析与实战教程

《JavaScanner类解析与实战教程》JavaScanner类(java.util包)是文本输入解析工具,支持基本类型和字符串读取,基于Readable接口与正则分隔符实现,适用于控制台、文件输... 目录一、核心设计与工作原理1.底层依赖2.解析机制A.核心逻辑基于分隔符(delimiter)和模式匹

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装