golang实现延迟队列(delay queue)的两种实现

2025-05-26 03:50

本文主要是介绍golang实现延迟队列(delay queue)的两种实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的...

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

定义Task结构体,包含

  • ExecuteTime time.Time
  • Job func()

定义DelayQueue

  • TaskQueue []Task
  • func AddTask
  • func RemoveTawww.chinasem.cnsk
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package main

import (
	"fmt"
	"time"
)

/*
基于go实现延迟队列
*/
type Task struct {
	ExecuteTime time.Time
	Job         func()
}

type DelayQ编程ueue struct {
	Tasks []*Task
}

func (d *DelayQueue) AddTask(t *Task) {
	d.Tasks = append(d.Tasks, t)
}

func (d *DelayQueue) RemoveTask() {
	//FIFO: remove the first task to enqueue
	d.Tasks = d.Tasks[1:]
}

func (d *DelayQueue) ExecuteTask() {
	for len(d.Tasks) > 0 {
		//dequeue a task
		currentTask := d.Tasks[0]
		if time.Now().Before(currentTask.ExecuteTime) {
			//if the task execution time is not up, wait
			time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
		}
		//execute the task
		currentTask.Job()
		//remove task who has been executed
		d.RemoveTask()
	}

}

func main() {
	fmt.Println("start delayQueue")
	delayQueue := &DelayQueue{}
	firstTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 1),
		Job: func() {
			fmt.Println("executed task 1 after delay")
		},
	}
	delayQueue.AddTask(firstTask)
	secondTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 7),
		Job: func() {
			fmt.Println("executed task 2 after delay")
		},
	}
	delayQueue.AddTask(secondTask)
	delayQueue.ExecuteTask()
	fmt.Println("all tasks have been done!!!")
}

效果:

golang实现延迟队列(delay queue)的两种实现

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

初始化redis连接

延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/Centos/docker-ce.repo
yum install docker
systemctl start docker

# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	log "github.com/ziyifast/log"
	"time"
)

/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"

func initClient() (err error) {
	redisdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // not set password
		DB:       0,  //use default db
	})
	_, err = redisdb.Ping().Result()
	if err != nil {
		log.Errorf("%v", err)
		return err
	}
	return nil
}

func main() {
	err := initClient()
	if err != nil {
		log.Errorf("init redis client err: %v", err)
		return
	}
	addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
	addTaskToQueue("task2", time.Now().Add(time.Secondhttp://www.chinasem.cn*8).Unix())
	//执行队列中的任务
	getAndExecuteTask()
}

// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {
	err := redisdb.ZAdd(DelayQueueKey, redis.Z{
		Scorjse:  float64(executeTime),
		Member: task,
	}).Err()
	if err != nil {
		panic(err)
	}
}

// 从redis中取一个task并执行
func getAndExecuteTask() {
	for {
		tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
			Min:    "-inf",
			Max:    fmt.Sprintf("%d", time.Now().Unix()),
			Offset: 0,
			Count:  1,
		}).Result()
		if err != nil {
			time.Sleep(time.Second * 1)
			continue
		}
		//处理任务
		for _, task := range tasks {
			fmt.Println("Execute task: ", task)
			//执行完任务之后用 ZREM 移除该任务
			redisdb.ZRem(DelayQueueKey, task)
		}
		time.Sleep(time.Second * 1)
	}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

golang实现延迟队列(delay queue)的两种实现

到此这篇关于golang实现延迟队列(delay queue)的示例代码的文章就介绍到这了,更多相关golang 延迟队列(delay queue)内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)! 

这篇关于golang实现延迟队列(delay queue)的两种实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1154777

相关文章

c++ 类成员变量默认初始值的实现

《c++类成员变量默认初始值的实现》本文主要介绍了c++类成员变量默认初始值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录C++类成员变量初始化c++类的变量的初始化在C++中,如果使用类成员变量时未给定其初始值,那么它将被

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构

基于Python实现一个图片拆分工具

《基于Python实现一个图片拆分工具》这篇文章主要为大家详细介绍了如何基于Python实现一个图片拆分工具,可以根据需要的行数和列数进行拆分,感兴趣的小伙伴可以跟随小编一起学习一下... 简单介绍先自己选择输入的图片,默认是输出到项目文件夹中,可以自己选择其他的文件夹,选择需要拆分的行数和列数,可以通过

Python中将嵌套列表扁平化的多种实现方法

《Python中将嵌套列表扁平化的多种实现方法》在Python编程中,我们常常会遇到需要将嵌套列表(即列表中包含列表)转换为一个一维的扁平列表的需求,本文将给大家介绍了多种实现这一目标的方法,需要的朋... 目录python中将嵌套列表扁平化的方法技术背景实现步骤1. 使用嵌套列表推导式2. 使用itert

Python使用pip工具实现包自动更新的多种方法

《Python使用pip工具实现包自动更新的多种方法》本文深入探讨了使用Python的pip工具实现包自动更新的各种方法和技术,我们将从基础概念开始,逐步介绍手动更新方法、自动化脚本编写、结合CI/C... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

在Linux中改变echo输出颜色的实现方法

《在Linux中改变echo输出颜色的实现方法》在Linux系统的命令行环境下,为了使输出信息更加清晰、突出,便于用户快速识别和区分不同类型的信息,常常需要改变echo命令的输出颜色,所以本文给大家介... 目python录在linux中改变echo输出颜色的方法技术背景实现步骤使用ANSI转义码使用tpu

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Python使用OpenCV实现获取视频时长的小工具

《Python使用OpenCV实现获取视频时长的小工具》在处理视频数据时,获取视频的时长是一项常见且基础的需求,本文将详细介绍如何使用Python和OpenCV获取视频时长,并对每一行代码进行深入解析... 目录一、代码实现二、代码解析1. 导入 OpenCV 库2. 定义获取视频时长的函数3. 打开视频文

golang版本升级如何实现

《golang版本升级如何实现》:本文主要介绍golang版本升级如何实现问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录golanwww.chinasem.cng版本升级linux上golang版本升级删除golang旧版本安装golang最新版本总结gola

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具