本文主要是介绍golang实现延迟队列(delay queue)的两种实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的...
1 延迟队列:邮件提醒、订单自动取消
延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:
- 邮件提醒
- 订单自动取消(超过多少时间未支付,就取消订单)
- 对超时任务的处理等
由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。
2 实现
2.1 simple简单版:go自带的time包实现
思路:
定义Task结构体,包含
- ExecuteTime time.Time
- Job func()
定义DelayQueue
- TaskQueue []Task
- func AddTask
- func RemoveTawww.chinasem.cnsk
- ExecuteTask
这种方案存在的问题:
Go程序重启时,存储在slice中的延迟处理任务将全部丢失
完整代码:
package main import ( "fmt" "time" ) /* 基于go实现延迟队列 */ type Task struct { ExecuteTime time.Time Job func() } type DelayQ编程ueue struct { Tasks []*Task } func (d *DelayQueue) AddTask(t *Task) { d.Tasks = append(d.Tasks, t) } func (d *DelayQueue) RemoveTask() { //FIFO: remove the first task to enqueue d.Tasks = d.Tasks[1:] } func (d *DelayQueue) ExecuteTask() { for len(d.Tasks) > 0 { //dequeue a task currentTask := d.Tasks[0] if time.Now().Before(currentTask.ExecuteTime) { //if the task execution time is not up, wait time.Sleep(currentTask.ExecuteTime.Sub(time.Now())) } //execute the task currentTask.Job() //remove task who has been executed d.RemoveTask() } } func main() { fmt.Println("start delayQueue") delayQueue := &DelayQueue{} firstTask := &Task{ ExecuteTime: time.Now().Add(time.Second * 1), Job: func() { fmt.Println("executed task 1 after delay") }, } delayQueue.AddTask(firstTask) secondTask := &Task{ ExecuteTime: time.Now().Add(time.Second * 7), Job: func() { fmt.Println("executed task 2 after delay") }, } delayQueue.AddTask(secondTask) delayQueue.ExecuteTask() fmt.Println("all tasks have been done!!!") }
效果:
2.2 complex持久版:go+redis
为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。
思路:
初始化redis连接
延迟队列采用redis的zset(有序集合)实现
前置准备:
# 安装docker yum install -y yum-utils yum-config-manager \ --add-repo \ https://download.docker.com/linux/Centos/docker-ce.repo yum install docker systemctl start docker # docker搭建redis mkdir -p /Users/ziyi2/docker-home/redis docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis
完整代码:
package main import ( "fmt" "github.com/go-redis/redis" log "github.com/ziyifast/log" "time" ) /* 基于redis zset实现延迟队列 */ var redisdb *redis.Client var DelayQueueKey = "delay-queue" func initClient() (err error) { redisdb = redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // not set password DB: 0, //use default db }) _, err = redisdb.Ping().Result() if err != nil { log.Errorf("%v", err) return err } return nil } func main() { err := initClient() if err != nil { log.Errorf("init redis client err: %v", err) return } addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix()) addTaskToQueue("task2", time.Now().Add(time.Secondhttp://www.chinasem.cn*8).Unix()) //执行队列中的任务 getAndExecuteTask() } // executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序 func addTaskToQueue(task string, executeTime int64) { err := redisdb.ZAdd(DelayQueueKey, redis.Z{ Scorjse: float64(executeTime), Member: task, }).Err() if err != nil { panic(err) } } // 从redis中取一个task并执行 func getAndExecuteTask() { for { tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{ Min: "-inf", Max: fmt.Sprintf("%d", time.Now().Unix()), Offset: 0, Count: 1, }).Result() if err != nil { time.Sleep(time.Second * 1) continue } //处理任务 for _, task := range tasks { fmt.Println("Execute task: ", task) //执行完任务之后用 ZREM 移除该任务 redisdb.ZRem(DelayQueueKey, task) } time.Sleep(time.Second * 1) } }
效果:
redis一直从延迟队列中取数据,如果处理完一批则睡眠1s
- 具体根据大家的业务调整,此处主要介绍思路
到此这篇关于golang实现延迟队列(delay queue)的示例代码的文章就介绍到这了,更多相关golang 延迟队列(delay queue)内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于golang实现延迟队列(delay queue)的两种实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!