golang实现延迟队列(delay queue)的两种实现

2025-05-26 03:50

本文主要是介绍golang实现延迟队列(delay queue)的两种实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的...

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

定义Task结构体,包含

  • ExecuteTime time.Time
  • Job func()

定义DelayQueue

  • TaskQueue []Task
  • func AddTask
  • func RemoveTawww.chinasem.cnsk
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package main

import (
	"fmt"
	"time"
)

/*
基于go实现延迟队列
*/
type Task struct {
	ExecuteTime time.Time
	Job         func()
}

type DelayQ编程ueue struct {
	Tasks []*Task
}

func (d *DelayQueue) AddTask(t *Task) {
	d.Tasks = append(d.Tasks, t)
}

func (d *DelayQueue) RemoveTask() {
	//FIFO: remove the first task to enqueue
	d.Tasks = d.Tasks[1:]
}

func (d *DelayQueue) ExecuteTask() {
	for len(d.Tasks) > 0 {
		//dequeue a task
		currentTask := d.Tasks[0]
		if time.Now().Before(currentTask.ExecuteTime) {
			//if the task execution time is not up, wait
			time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
		}
		//execute the task
		currentTask.Job()
		//remove task who has been executed
		d.RemoveTask()
	}

}

func main() {
	fmt.Println("start delayQueue")
	delayQueue := &DelayQueue{}
	firstTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 1),
		Job: func() {
			fmt.Println("executed task 1 after delay")
		},
	}
	delayQueue.AddTask(firstTask)
	secondTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 7),
		Job: func() {
			fmt.Println("executed task 2 after delay")
		},
	}
	delayQueue.AddTask(secondTask)
	delayQueue.ExecuteTask()
	fmt.Println("all tasks have been done!!!")
}

效果:

golang实现延迟队列(delay queue)的两种实现

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

初始化redis连接

延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/Centos/docker-ce.repo
yum install docker
systemctl start docker

# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	log "github.com/ziyifast/log"
	"time"
)

/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"

func initClient() (err error) {
	redisdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // not set password
		DB:       0,  //use default db
	})
	_, err = redisdb.Ping().Result()
	if err != nil {
		log.Errorf("%v", err)
		return err
	}
	return nil
}

func main() {
	err := initClient()
	if err != nil {
		log.Errorf("init redis client err: %v", err)
		return
	}
	addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
	addTaskToQueue("task2", time.Now().Add(time.Secondhttp://www.chinasem.cn*8).Unix())
	//执行队列中的任务
	getAndExecuteTask()
}

// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {
	err := redisdb.ZAdd(DelayQueueKey, redis.Z{
		Scorjse:  float64(executeTime),
		Member: task,
	}).Err()
	if err != nil {
		panic(err)
	}
}

// 从redis中取一个task并执行
func getAndExecuteTask() {
	for {
		tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
			Min:    "-inf",
			Max:    fmt.Sprintf("%d", time.Now().Unix()),
			Offset: 0,
			Count:  1,
		}).Result()
		if err != nil {
			time.Sleep(time.Second * 1)
			continue
		}
		//处理任务
		for _, task := range tasks {
			fmt.Println("Execute task: ", task)
			//执行完任务之后用 ZREM 移除该任务
			redisdb.ZRem(DelayQueueKey, task)
		}
		time.Sleep(time.Second * 1)
	}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

golang实现延迟队列(delay queue)的两种实现

到此这篇关于golang实现延迟队列(delay queue)的示例代码的文章就介绍到这了,更多相关golang 延迟队列(delay queue)内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)! 

这篇关于golang实现延迟队列(delay queue)的两种实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1154777

相关文章

Python实现文件批量重命名器

《Python实现文件批量重命名器》在日常工作和学习中,我们经常需要对大量文件进行重命名操作,本文将介绍一个使用Python开发的文件批量重命名工具,提供了多种重命名模式,有需要的小伙伴可以了解下... 目录前言功能特点模块化设计1.目录路径获取模块2.文件列表获取模块3.重命名模式选择模块4.序列号参数配

Python使用python-docx实现自动化处理Word文档

《Python使用python-docx实现自动化处理Word文档》这篇文章主要为大家展示了Python如何通过代码实现段落样式复制,HTML表格转Word表格以及动态生成可定制化模板的功能,感兴趣的... 目录一、引言二、核心功能模块解析1. 段落样式与图片复制2. html表格转Word表格3. 模板生

SpringBoot实现多环境配置文件切换

《SpringBoot实现多环境配置文件切换》这篇文章主要为大家详细介绍了如何使用SpringBoot实现多环境配置文件切换功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 示例代码结构2. pom文件3. application文件4. application-dev文

Python FastAPI实现JWT校验的完整指南

《PythonFastAPI实现JWT校验的完整指南》在现代Web开发中,构建安全的API接口是开发者必须面对的核心挑战之一,本文将深入探讨如何基于FastAPI实现JWT(JSONWebToken... 目录一、JWT认证的核心原理二、项目初始化与环境配置三、安全密码处理机制四、JWT令牌的生成与验证五、

Python使用Turtle实现精确计时工具

《Python使用Turtle实现精确计时工具》这篇文章主要为大家详细介绍了Python如何使用Turtle实现精确计时工具,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以参考一下... 目录功能特点使用方法程序架构设计代码详解窗口和画笔创建时间和状态显示更新计时器控制逻辑计时器重置功能事件

Linux给磁盘扩容(LVM方式)的方法实现

《Linux给磁盘扩容(LVM方式)的方法实现》本文主要介绍了Linux给磁盘扩容(LVM方式)的方法实现,涵盖PV/VG/LV概念及操作步骤,具有一定的参考价值,感兴趣的可以了解一下... 目录1 概念2 实战2.1 相关基础命令2.2 开始给LVM扩容2.3 总结最近测试性能,在本地打数据时,发现磁盘空

Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)

《Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)》本文主要介绍了Golang分布式锁实现,采用Redis+Lua脚本确保原子性,持可重入和自动续期,用于防止超卖及重复下单,具有一定... 目录1 概念应用场景分布式锁必备特性2 思路分析宕机与过期防止误删keyLua保证原子性可重入锁自动

golang 对象池sync.Pool的实现

《golang对象池sync.Pool的实现》:本文主要介绍golang对象池sync.Pool的实现,用于缓存和复用临时对象,以减少内存分配和垃圾回收的压力,下面就来介绍一下,感兴趣的可以了解... 目录sync.Pool的用法原理sync.Pool 的使用示例sync.Pool 的使用场景注意sync.

IDEA实现回退提交的git代码(四种常见场景)

《IDEA实现回退提交的git代码(四种常见场景)》:本文主要介绍IDEA实现回退提交的git代码(四种常见场景),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.已提交commit,还未push到远端(Undo Commit)2.已提交commit并push到

Kotlin Compose Button 实现长按监听并实现动画效果(完整代码)

《KotlinComposeButton实现长按监听并实现动画效果(完整代码)》想要实现长按按钮开始录音,松开发送的功能,因此为了实现这些功能就需要自己写一个Button来解决问题,下面小编给大... 目录Button 实现原理1. Surface 的作用(关键)2. InteractionSource3.