Go语言并发之通知退出机制的实现

2025-07-24 20:50

本文主要是介绍Go语言并发之通知退出机制的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Go语言并发之通知退出机制的实现》本文主要介绍了Go语言并发之通知退出机制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...

1、通知退出机制

读取已经关闭的通道不会引起阻塞,也不会导致 panic,而是立即返回该通道存储类型的零值。关闭 select 监听的某个通道能使 select 立即感知这种通知,然后进行相应的处理,这就是所谓的退出通知机制(close channel tobroadcast)。 context 标准库就是利用这种机制处理更复杂的通知机制的,退出通知机制是学习使用 context库的基础。

下面通过一个随机数生成器的示例演示退出通知机制,下游的消费者不需要随机数时,显式地通知生产者停止生产。

package main

import (
	"fmt"
	"math/rand"
	"runtime"
)

//GenerateIntA是一个随机数发生器
func GenerateIntA(done chan struct{}) chan int {
	ch := make(chan int)
	go func() {
	Lable:
		for {
			select {
			case ch <- rand.Int():
			//增加一路监听,就是对退出通知信号done的监听
			case <-done:
				break Lable
			}
		}
		//收到通知后关闭通道ch
		close(ch)
	}()
	return ch
}

func main() {
	done := make(chan struct{})
	ch := GenerateIntA(done)
	fmt.Println(<-ch)
	fmt.Println(<-ch)
	// 发送通知,告诉生产者停止生产
	close(done)
	fmt.Println(<-ch)
	fmt.Println(<-ch)
	//此时生产者已经退出
	println("NumGoroutine=", runtime.NumGoroutine())
}

# 程序结果
5577006791947779410
8674665223082153551 
0 // 关闭通道会输出0值
0
NumGoroutine= 1

goroutine是Go语言提供的语言级别的轻量级线程,在我们需要使用并发时,我们只需要通过 go 关键字来开启goroutine 即可。作为Go语言中的最大特色之一,goroutine在日常的工作学习中被大量使用着,但是对于它的调度处理,尤其是goroutine的退出时机和方式,很多小伙伴都没有搞的很清楚,本文就来详细讲讲Goroutine退出机制的原理及使用。

goroutine的调度是由 golang 运行时进行管理的,同一个程序中的所有 goroutine 共享同一个地址空间,goroutine设计的退出机制是由goroutine自己退出,不能在外部强制结束一个正在执行的goroutine(只有一种情况正在运行的goroutine会因为其他goroutine的结束被终止,就是main函数退出或程序停止执行)。下面介绍几种常用的退出方式。

1.1 进程/main函数退出

1.1.1 kill进程/进程crash

当进程被强制退出,所有它占有的资源都会还给操作系统,而goroutine作为进程内的线程,资源被收回了,那么

还未结束的goroutine也会直接退出。

1.1.2 main函数结束

同理,当主函数结束,goroutine的资源也会被收回,直接退出。

package main

import (
	"fmt"
	"time"
)

func routineTest() {
	time.Sleep(time.Second)
	fmt.Println("I'm alive")
}

func main() {
	fmt.Println("start test")
	go routineTest()
	fmt.Println("end test")
}

# 程序输出
start test
end test

其中go routine里需要print出来的语句是永远也不会出现的。

1.2 通过channel退出

通俗的讲,就是各个 goroutine 之间通信的"管道",有点类似于 linux 中的管道。channel 是go最推荐的goroutine 间的通信方式,同时通过 channel 来通知 goroutine 退出也是最主要的goroutine退出方式。

goroutine 虽然不能强制结束另外一个 goroutine,但是它可以通过 channel 通知另外一个 goroutine 你的表演该结束了。

package main

import (
	"fmt"
	"time"
)

func cancelByChannel(quit <-chan time.Time) {
	for {
		select {
		case <-quit:
			fmt.Println("cancel goroutine by channel!")
			return
		default:
			fmt.Println("I'm alive")
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	quit := time.After(time.Second * 10)
	go cancelChina编程ByChannel(quit)
	time.Sleep(15 * time.Second)
	fmt.Println("I'm done")
}

# 程序输出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by channel!
I'm done

在该例子中,我们用时间定义了一个channel,当10秒后,会给到goroutine一个退出信号,然后go routine就会退出,这样我们就实现了在其他线程中通知另一个线程退出的功能。

1.3 通过context退出

通过channel通知gorouChina编程tine退出还有一个更好的方法就是使用context。没错,就是我们在日常开发中接口通用的第一个参数context。它本质还是接收一个channel数据,只是是通过ctx.Done()获取。将上面的示例稍作修改即可。

package main

import (
	"context"
	"fmt"
	"time"
)

func cancelByContext(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("cancel goroutine by context!")
			return
		default:
			fmt.Println("I'm alive")
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go cancelByContext(ctx)
	time.Sleep(10 * time.Second)
	cancel()
	time.Sleep(5 * time.Second)
}

# 程序输出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!

上面的 case 中,通过 context 自带的 WithCancel 方法将 cancel 函数传递出来,然后手动调用 cancel() 函数给goroutine 传递了 ctx.Done() 信号。context 也提供了 context.WithTimeout() 和context.WithDeadline() 方法来更方便的传递特定情况下的 Done 信号。

package main

import (
	"context"
	"fmt"
	"time"
)

func cancelByContext(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("cancel goroutine by context!")
			return
		default:
			fmt.Println("I'm alive")
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
	go cancelByContext(ctx)
	time.Sleep(15 * time.Second)
}

# 程序输出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!

上述 case 中使用了 context.WithTimeout() 来设置10秒后自动退出,使用 context.WithDeadline() 的功能基本一样。区别是 context.WithDeadline() 可以指定一个固定的时间点,当然也可以使用time.Now().Add(time.Second*10) 的方式来实现同 context.WithTimeout() 相同的功能。

package main

import (
	"context"
	"fmt"
	"time"
)

func cancelByContext(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("cancel goroutine by context!")
			return
		default:
			fmt.Println("I'm alive")
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
	go cancelByContext(ctx)
	time.Sleep(15 * time.Second)
}

# 程序输出
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
I'm alive
cancel goroutine by context!

注:这里需要注意的一点是上方两个case中为了方便读者理解,我将context传回的cancel()函数抛弃掉了,实际使用中通常会加上 defer cancel() 来保证goroutine被杀死。

Context 使用原则和技巧

不要把Context放在结构体中,要以参数的方式传递,parent Context一般为Background应该要把Context作为第一个参数传递给入口请求和出口请求链路上的每一个函数,放在第一位,变量名建议都统一,如ctx。

给一个函数方法传递Context的时候,不要传递nil,否则在tarce追踪的时候,就会断了连接Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递Context是线程安全的,可以放心的在多个goroutine中传递可以把一个 Context 对象传递给任意个数的 gorotuine,对它执行取消操作时,所有 goroutine 都会接收到取消信号。

1.4 通过Panic退出

这是一种不推荐使用的方法!!!在此给出只是提出这种操作的可能性。实际场景中尤其是生产环境请慎用!!

package main

import (
	"context"
	"fmt"
	"time"
)

func cancelByPanic(ctx context.Context) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("cancel goroutine by panic!")
		}
	}()
	for i := 0; i < 5; i++ {
		fmt.Println("hello cancelByPanic")
		timChina编程e.Sleep(1 * time.Second)
	}
	panic("panic")
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()
	go cancelByPanic(ctx)
	time.Sleep(5 * time.Second)
}

# 程序输出
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic
hello cancelByPanic

这里我们通过在 defer 函数中使用 recover 来捕获 panic error 并从 panic 中拿回控制权,确保程序不会再panic 展开到 goroutine 调用栈顶部后崩溃。

2、阻止goroutine退出的方法

了解到goroutine的退出方式后,我们已经可以解决一类问题。那就是当你需要手动控制某个goro编程utine结束的时

候应该怎么办。但是在实际生产中关于goroutine还有一类问题需要解决,那就是当你的主进程结束时,应该如何

等待goroutine全部执行完毕后再使主进程退出。

2.1 通过sync.WaitGroup

package main

import (
	"fmt"
)

func main() {
	arr := [3]string{"a", "b", "c"}
	for _, v := range arr {
		go func(s string) {
			fmt.Println(s)
		}(v)
	}
	fmt.Println("End")
}

# 程序输出
End

以上方的 case 为例,可见我们在什么都不加的时候,不会等待 go func 执行完主程序就会退出。因此下面给出使

用 WaitGroup 的方法。

package main

import (
	"fmt"
	"sync"
)

func main() {
	// 定义 WaitGroup
	var wg sync.WaitGroup
	arr := [3]string{"a", "b", "c"}
	for _, v := range arr {
		// 增加一个 wait 任务
		wg.Add(1)
		go func(s string) {
			// 函数结束时,通知此 wait 任务已经完成
			defer wg.Done()
			fmt.Println(s)
		}(phpv)
	}
	// 等待所有任务完成
	wg.Wait()
}

# 程序输出
c
a
b

WaitGroup 可以理解为一个 goroutine 管理者。他需要知道有多少个 goroutine 在给他干活,并且在干完的时候

需要通知他干完了,否则他就会一直等,直到所有的小弟的活都干完为止。我们加上 WaitGroup 之后,程序会进

行等待,直到它收到足够数量的 Done() 信号为止。

WaitGroup 可被调用的方法只有三个:Add() 、Done()、Wait()。

1、wg.Done() 函数实际上实现的是 wg.Add(-1),因此直接使用 wg.Add(-1) 是会造成同样的结果的。在实际使

用中要注意避免误操作,使得监听的 goroutine 数量出现误差。

2、wg.Add() 函数可以一次性加n。但是实际使用时通常都设为1。但是wg本身的counter不能设为负数。假设你

在没有Add到10以前,一次性 wg.Add(-10),会出现panic !

package main

import (
	"fmt"
	"sync"
)

func main() {
	// 定义 WaitGroup
	var wg sync.WaitGroup
	arr := [3]string{"a", "b", "c"}
	for _, v := range arr {
		// 增加一个 wait 任务
		wg.Add(1)
		go func(s string) {
			// 函数结束时,通知此 wait 任务已经完成
			defer wg.Done()
			fmt.Println(s)
		}(v)
	}
	wg.Add(-10)
	// 等待所有任务完成
	wg.Wait()

}

# 程序输出
panic: sync: negative WaitGroup counter

goroutine 1 [running]:

如果你的程序写的有问题,出现了始终等待的 waitgroup 会造成死锁。

package main

import (
	"fmt"
	"sync"
)

func main() {
	// 定义 WaitGroup
	var wg sync.WaitGroup
	arr := [3]string{"a", "b", "c"}
	for _, v := range arr {
		// 增加一个 wait 任务
		wg.Add(1)
		go func(s string) {
			// 函数结束时,通知此 wait 任务已经完成
			defer wg.Done()
			fmt.Println(s)
		}(v)
	}
	wg.Add(1)
	// 等待所有任务完成
	wg.Wait()
}

# 程序输出
c
a
b
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [seMACquire]:

2.2 通过channel

package main

import "fmt"

func main() {
	arr := [3]string{"a", "b", "c"}
	ch := make(chan struct{}, len(arr))
	for _, v := range arr {
		go func(s string) {
			fmt.Println(s)
			ch <- struct{}{}
		}(v)
	}
	for i := 0; i < len(arr); i++ {
		<-ch
	}
}

# 程序输出
c
a
b

需要注意的是,channel 同样会导致死锁。

package main

import "fmt"

func main() {

	arr := [3]string{"a", "b", "c"}
	ch := make(chan struct{}, len(arr))
	for _, v := range arr {
		go func(s string) {
			fmt.Println(s)
			ch <- struct{}{}
		}(v)
	}
	for i := 0; i < len(arr); i++ {
		<-ch
	}
	<-ch
}

# 程序输出
c
a
b
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:

2.3 封装

利用 go routine 的这一特性,我们可以将 waitGroup 等方式封装起来,保证 go routine 在主进程结束时会继续执行完。

package main

import (
	"fmt"
	"sync"
)

type WaitGroupWrapper struct {
	sync.WaitGroup
}

func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {
	wg.Add(1)
	go func() {
		f(args...)
		wg.Done()
	}()
}

func printArray(args ...interface{}) {
	fmt.Println(args)
}

func main() {
	// 定义 WaitGroup
	var w WaitGroupWrapper
	arr := [3]string{"a", "b", "c"}
	for _, v := range arr {
		w.Wrap(printArray, v)
	}
	w.Wait()
}

# 程序输出
[c]
[a]
[b]

还可以加上更高端一点的功能,增加时间、事件双控制的 wrapper。

package main

import (
	"fmt"
	"sync"
	"time"
)

type WaitGroupWrapper struct {
	sync.WaitGroup
}

func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {
	wg.Add(1)
	go func() {
		f(args...)
		wg.Done()
	}()
}

func (w *WaitGroupWrapper) WaitWithTimeout(d time.Duration) bool {
	ch := make(chan struct{})
	t := time.NewTimer(d)
	defer t.Stop()
	go func() {
		w.Wait()
		ch <- struct{}{}
	}()
	select {
	case <-ch:
		fmt.Println("job is done!")
		return true
	case <-t.C:
		fmt.Println("time is out!")
		return false
	}
}

func printArray(args ...interface{}) {
	// 如果设置3秒,那么w.Wait()需要等待的时间是3秒,而超时时间的设置是2秒,所以会超时
	//3秒后会触发time is out分支
	time.Sleep(1 * time.Second)
	//如果改为time.Sleep(time.Second)即会触发job is done分支
	fmt.Println(args)
}

func main() {
	// 定义 WaitGroup
	var w WaitGroupWrapper
	arr := [3]string{"a", "b", "c"}
	for _, v := range arr {
		w.Wrap(printArray, v)
	}
	w.WaitWithTimeout(2 * time.Second)
}

# 程序输出
[b]
[a]
[c]
job is done!

到此这篇关于Go语言并发之通知退出机制的实现的文章就介绍到这了,更多相关Go 通知退出机制内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于Go语言并发之通知退出机制的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155474

相关文章

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

Python实现PDF按页分割的技术指南

《Python实现PDF按页分割的技术指南》PDF文件处理是日常工作中的常见需求,特别是当我们需要将大型PDF文档拆分为多个部分时,下面我们就来看看如何使用Python创建一个灵活的PDF分割工具吧... 目录需求分析技术方案工具选择安装依赖完整代码实现使用说明基本用法示例命令输出示例技术亮点实际应用场景扩

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

如何在Java Spring实现异步执行(详细篇)

《如何在JavaSpring实现异步执行(详细篇)》Spring框架通过@Async、Executor等实现异步执行,提升系统性能与响应速度,支持自定义线程池管理并发,本文给大家介绍如何在Sprin... 目录前言1. 使用 @Async 实现异步执行1.1 启用异步执行支持1.2 创建异步方法1.3 调用

Go语言编译环境设置教程

《Go语言编译环境设置教程》Go语言支持高并发(goroutine)、自动垃圾回收,编译为跨平台二进制文件,云原生兼容且社区活跃,开发便捷,内置测试与vet工具辅助检测错误,依赖模块化管理,提升开发效... 目录Go语言优势下载 Go  配置编译环境配置 GOPROXYIDE 设置(VS Code)一些基本

Spring Boot配置和使用两个数据源的实现步骤

《SpringBoot配置和使用两个数据源的实现步骤》本文详解SpringBoot配置双数据源方法,包含配置文件设置、Bean创建、事务管理器配置及@Qualifier注解使用,强调主数据源标记、代... 目录Spring Boot配置和使用两个数据源技术背景实现步骤1. 配置数据源信息2. 创建数据源Be

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

linux批量替换文件内容的实现方式

《linux批量替换文件内容的实现方式》本文总结了Linux中批量替换文件内容的几种方法,包括使用sed替换文件夹内所有文件、单个文件内容及逐行字符串,强调使用反引号和绝对路径,并分享个人经验供参考... 目录一、linux批量替换文件内容 二、替换文件内所有匹配的字符串 三、替换每一行中全部str1为st

Java中的xxl-job调度器线程池工作机制

《Java中的xxl-job调度器线程池工作机制》xxl-job通过快慢线程池分离短时与长时任务,动态降级超时任务至慢池,结合异步触发和资源隔离机制,提升高频调度的性能与稳定性,支撑高并发场景下的可靠... 目录⚙️ 一、调度器线程池的核心设计 二、线程池的工作流程 三、线程池配置参数与优化 四、总结:线程