逐步学习Go-并发通道chan(channel)

2024-03-27 22:04

本文主要是介绍逐步学习Go-并发通道chan(channel),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概述

Go的Routines并发模型是基于CSP,如果你看过七周七并发,那么你应该了解。

什么是CSP?

"Communicating Sequential Processes"(CSP)这个词组的含义来自其英文直译以及在计算机科学中的使用环境。

CSP是 Tony Hoare 在1978年提出的,论文地址在:Communicating sequential processes | Communications of the ACM

拆字解释下

Communicating Sequential Processes(CSP)的三个单词:

  • C for Communicating: 通信,什么的通信那?进程/线程/协程的通信。

  • S for Sequenctial: 顺序的,什么的顺序?进程/线程/协程之间执行任务时应该是有顺序的,完全并行执行是理想化的,现实中就是要先指定完第一个或者第一批任务才能执行第二个或者第二批任务。

  • P for Processses: 进程,这个是进程,估计是因为这个概念提出来的时候比较早。我们这儿得抽象一下,Processes指的是进程/线程/协程。

那么我们来总结一下CSP,CSP就是多个能够进行通信,并且按照顺序执行任务的独立进程。这些进程在各自执行自己的任务的时候,还可以通过某种方式是进行通信。

在Golang中就是通过Channel进行通信。

好了,CSP解释完了,我们来看Go中的Channel,另外CSP的参与者Go Routine我在之前的文章中有提到过,大家可以去:逐步学习Go-协程goroutine

这张图就描述了CSP编程模型。

file

Go中routine代表图中的Process,Channel就是goroutine之间的连接。通道可以让一个goroutine发送信息到另一个goroutine。

Go中的channel

Go中Channel有两种类型:

  1. 无缓冲Channel(Unbuffered)
  2. 有缓冲Channel(Buffered)
    有缓冲的Channel其实就是一个环形缓冲队列;无缓冲的没有队列,因为读写都会阻塞。

Channel的定义

var channel名称 chan channel类型// 类型自动推断
channel名称 := make(chan channel类型, buffer数量(int可以为0))

COPY

比如:我们可以这样来定义:

// 定义了一个channel,还没有make,不确定是否为有缓冲和无缓冲channel
var ch chan int// 定义了一个chnnel, 容量为0,无缓冲channel
ch := make(chan int, 0)// 定义了一个channel,容量为1,有缓冲channel
ch := make(chan int, 1)

COPY

我们实际使用的时候把Channel理解为队列就可以了。

Go中的Channel有两种类型:

  1. 无缓冲channel
  2. 有缓冲channel

无缓冲和有缓冲的特性如下:

  • 无缓冲Channel
    • 无缓冲Channel没有存储数据的能力
    • 发送方向Channel中发送数据的时候,发送方会阻塞直到有接受者接受这个数据
    • 无缓冲Channel典型应用就是go协程同步通信
    • 无缓冲Channel保证通信双方都要准备好数据交换
  • 有缓冲Channel
    • 有缓冲Channel需要定义Channel的容量
    • 发送方向有缓冲Channel发送数据的时候,只有容量满的时候才会阻塞
    • 接收方只有在有缓冲Channel为空时才会阻塞
    • 有缓冲通道的典型应用场景是生产者和消费者

Channel的操作

Channel主要支持2中操作:

  1. 发送(send)
  2. 接收(recv)

这三种操作在代码中的的定义和使用:

  1. 发送和接收都使用<-

来看代码:

// 先定义一个无缓冲channel
ch := make(chan int, 0)
ch := make(chan int)// 发送数据到channel
ch <- 1// 从channel中接收数据
<- ch

COPY

我们看到发送和接收都是使用<-,差别在于:

  1. ch在<-的左边,操作为发送
  2. ch在<-的右边,操作为接收

另外,channel在使用之前都要先创建,使用完毕后要关闭,分别使用makeclose关闭。

// 创建相当于分配channel(allocation)
ch := make(chan int, 0)// 关闭channel,释放channel资源
defer close(ch)

COPY

channel创建完直接关闭了还能操作发送和接收吗?

这个问题我们通过写代码来测试,我们先来测试发送,然后再测试接收。

  • 发送数据到关闭的Channel

    func TestUnbufferedChannel_ShouldPanic_whenWriteValueToAClosedChannel(t *testing.T) {f := func() {ch := make(chan int)close(ch)ch <- 1
    }assert.Panics(t, f, "should panic")
    }
    COPY

    运行截图:

    file

我们的UT PASS了表示发生了panic,这就说明我们不能向已经关闭的channel发送数据。

  • 在已经关闭的Channel上接收

func TestUnbufferedChannel_ShouldSuccess_whenRecvValueAtAClosedChannel(t *testing.T) {ch := make(chan int)close(ch)var val = <-chassert.Equal(t, 0, val)
}func TestUnbufferedChannel_ShouldSuccess_whenRecvEmptyValueAtAClosedChannel(t *testing.T) {ch := make(chan string)close(ch)var val = <-chassert.Equal(t, "", val)
}

COPY

运行截图:

file

这两个UT都可以PASS,我只截图了一个PASS,这说明我们可以在一个关闭的channel上接收数据,只是接收到的都是0值。关于0值要特别说明一下,0值是针对不同类型的,比如:int的0值就是0,string的0值就是空字符串,指针的0值就是nil,看下面代码:

file

并非“任何后续的接收操作都将立即返回零值”,而是当channel中所有已发送的值都被接收后,接下来的接收操作会立即返回零值。

无缓冲channel

无缓冲通道顾名思义:就是没有数据缓冲能力的Channel,有goroutine向无缓冲Channel发送了数据就必须有另一个goroutine来接受,否则发送的goroutine会阻塞;反之,有goroutine从这个channel接受数据而没有另一个goroutine向这个channel发送,那么接受的goroutine也会阻塞。

应用场景:

  1. 部分任务需要同步就用无缓冲channel

来看场景代码:

有发送无接受

发送goroutine会被阻塞。


func TestUnbufferedChannel_ShouldWriteTimeout_WhenNoRoutineReadTheChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)is_timeout := falsetry_to_write_value := 1// whenselect {// 直接向channel中发送case c <- try_to_write_value:case <-time.After(3 * time.Second):// shouldis_timeout = true}assert.True(t, is_timeout)}

COPY

file

有接受无发送

接收goroutine会被阻塞


func TestUnbufferedChannel_ShouldReadTimeout_WhenNoValueWriteToChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)is_timeout := falseselect {// 直接接受channel中的数据case <-c:case <-time.After(3 * time.Second):// shouldis_timeout = true}// 三秒后超时assert.True(t, is_timeout)}

COPY

有发送有接受

有发送有接收,一切正常。

func sum(s []int, c chan int) {sum := 0for _, v := range s {sum += v}// 将累加结果发送到channelc <- sum
}func TestUnbufferedChannel_ShouldRecvValues_WhenWriteValueToChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)// givens := []int{1, 2, 3, 4, 5, 6}// when// 执行数组累加go sum(s[:], c)ret1 := <-c// should// 和应该是21assert.Equal(t, 21, ret1)
}

COPY

file

使用无缓冲Channel控制并发

// 先定义一个worker函数
// worker函数从无缓冲channel中接收
// 可以接到到数据就执行后面的打印内容
// 打印完成后退出
func worker(id int, lock chan bool) {var shouldRun = <-lockif shouldRun {fmt.Printf("time: %v Worker %d is working\n", time.Now(), id)time.Sleep(time.Second)fmt.Printf("time: %v Worker %d has finished\n", time.Now(), id)}
}func TestUnbufferedChannel_ShouldRunOneByOne_When(t *testing.T) {lock := make(chan bool, 1)// 启动5个goroutine等待释放接收for i := 0; i < 5; i++ {go worker(i, lock)}// 发送5个true到channelfor i := 0; i < 5; i++ {lock <- truetime.Sleep(time.Second)}close(lock)time.Sleep(10 * time.Second)
}

COPY

file

使用无缓冲Channel实现CompleteFuture.anyOf()

CompleteFuture.anyOf() 是 Java 中的一个函数,它返回一个新的 CompletableFuture,当给定的任何 CompletableFuture 完成时,返回的 CompletableFuture 也完成,并带有完成的 CompletableFuture 的结果。


// future函数使用time.Sleep模拟实际业务处理延迟
// 业务处理完成后将业务数据写入无缓冲Channel
func future(id int, delay time.Duration, resChan chan int) {time.Sleep(delay)fmt.Printf("Hi, I have finished my task, my id is %d\n", id)resChan <- id
}// 接收一系列上面的future, 然后使用go routine启动这些future函数并将结果写入到result channel,最后再返回result channel。
func anyOf(futures ...<-chan int) <-chan int {result := make(chan int)for _, future := range futures {go func(f <-chan int) {result <- <-f}(future)}return result
}func TestAnyOf_ShouldSuccess(t *testing.T) {// 创建无缓冲的 channelresChan1 := make(chan int)resChan2 := make(chan int)resChan3 := make(chan int)// 启动 goroutinesgo future(1, 3*time.Second, resChan1)go future(2, 2*time.Second, resChan2)go future(3, 5*time.Second, resChan3)result := anyOf(resChan1, resChan2, resChan3)assert.Equal(t, 2, <-result)
}

COPY

上面有两个比较让人纠结的语法:

  1. <-chan int
  2. result <- <-f
  • <-chan int表示只读通道,anyOf只能读取通道内的数据;有了只读就有只写,只写通道chan<- int
  • result <- <-f表示从通道f中接收数据并将数据写入到result通道。这一行相当于执行了
    v := <-f
    result <- v
    COPY

    file

有缓冲channel

有缓冲channel就是你可以暂时把数据发送到channel,如果channel的缓冲区没有被占用完就不会阻塞,缓冲区被占用完了就被阻塞了。

特性:

  1. 发送goroutine在缓冲区没有用完之前不会阻塞,缓冲区被使用完了之后发送goroutine就会被阻塞
  2. 接受goroutine在缓冲区有数据时,不会阻塞,缓冲区没有数据时会被阻塞

有缓冲channel应用场景是什么?

  1. 任务队列就是最典型的场景,生产者消费者模型
  2. 其他无缓冲channel搞不定的就用有缓冲channel

实现一个有缓冲channel的RateLimiter

import ("sync""sync/atomic""testing""fmt""time""github.com/stretchr/testify/assert"
)type RateLimiter struct {tokens       chan struct{}refillTicker *time.TickercloseCh      chan struct{}
}func NewRateLimiter(rate int) *RateLimiter {r := &RateLimiter{tokens:       make(chan struct{}, rate),refillTicker: time.NewTicker(time.Second / time.Duration(rate)),closeCh:      make(chan struct{}),}go r.refill()return r
}func (r *RateLimiter) refill() {for {select {case <-r.refillTicker.C:select {case r.tokens <- struct{}{}:default:}case <-r.closeCh:r.refillTicker.Stop()return}}
}func (r *RateLimiter) Acquire() {<-r.tokens
}func (r *RateLimiter) TryAcquire() bool {select {case <-r.tokens:return truedefault:return false}
}func (r *RateLimiter) Close() {close(r.closeCh)
}func myTask(id int) {fmt.Printf("time: %v workder %d is working\n", time.Now(), id)time.Sleep(20 * time.Millisecond)fmt.Printf("time: %v workder %d has finished\n", time.Now(), id)
}func TestRateLimiter_ShouldPermitWithBlocking_WhenRequestOnce(t *testing.T) {rateLimiter := NewRateLimiter(100)startTime := time.Now()for i := 0; i < 1; i++ {rateLimiter.TryAcquire()myTask(i)}endTime := time.Now()elapsedTime := endTime.Sub(startTime)fmt.Printf("elapsed time: %v\n", elapsedTime)fmt.Printf("explect time: %v\n", 300*time.Millisecond)assert.True(t, elapsedTime < 300*time.Millisecond)
}func TestRateLimiter_ShouldLimitPermits_WhenGivenLimitedResource(t *testing.T) {var counter int32 = 0rateLimiter := NewRateLimiter(100)wg := sync.WaitGroup{}startTime := time.Now()for i := range 1000 {wg.Add(1)go func() {rateLimiter.Acquire()myTask(i)atomic.AddInt32(&counter, 1)wg.Done()}()}wg.Wait()endTime := time.Now()elapsedTime := endTime.Sub(startTime)fmt.Printf("elapsed time: %v\n", elapsedTime)fmt.Printf("should greater than explect time: %v\n", 10*time.Second)assert.Equal(t, counter, int32(1000))assert.True(t, 10*time.Second < elapsedTime)
}

COPY

file

实现无缓冲Channel实现Java中的CyclicBarrier

CyclicBarrier 是一个同步工具,它允许一组线程互相等待,直到他们都到达了一个共同的屏障点。在涉及固定大小的线程团队必须偶尔相互等待的程序中,CyclicBarriers 非常有用。之所以称之为“循环”屏障,是因为在等待的线程被释放之后,它可以被重复使用。

  • await() 所有的参与者都调用了wait方法后返回或者被中断
    我们就实现这个await方法,暂时不支持中断,代码如下:

package mainimport ("fmt""sync""sync/atomic""time"
)// CyclicBarrier 让一组goroutine在到达某个点之后才能继续执行
type CyclicBarrier struct {// 总goroutine数量participant int// 用于等待所有goroutine准备好waitGroup sync.WaitGroup// 无缓冲channel,用于goroutine间同步barrierChan chan struct{}running     int32
}// NewCyclicBarrier 创建一个新的CyclicBarrier
func NewCyclicBarrier(participant int) *CyclicBarrier {b := &CyclicBarrier{participant: participant,barrierChan: make(chan struct{}),running:     int32(participant),}// 设置等待的goroutine数b.waitGroup.Add(participant)return b
}// 当一个goroutine调用Wait时,
// 它将在屏障处等待,
// 直到所有goroutine都到达这里
func (b *CyclicBarrier) Wait() {// 一个goroutine准备好了b.waitGroup.Done()// 等待所有goroutine都准备好b.waitGroup.Wait()// 当所有goroutine都准备好了,关闭channel进行广播通知if atomic.AddInt32(&b.running, -1) == 0 {close(b.barrierChan)} else {// 等待通知<-b.barrierChan}}// 阻塞调用goroutine直到所有goroutine都调用了Wait方法,
// 屏障开放后,重新置为待关闭状态
func (b *CyclicBarrier) Await() {// 等待屏障开放的信号<-b.barrierChan// 重置屏障状态b.barrierChan = make(chan struct{})b.waitGroup.Add(b.participant)
}func (b *CyclicBarrier) Close() {close(b.barrierChan)
}func main() {// 这里我们设置3个goroutine参与barrier := NewCyclicBarrier(100)for i := 0; i < 100; i++ {go func(i int) {fmt.Printf("Goroutine %d is working...\n", i)// 模拟工作time.Sleep(time.Duration(i+1) * time.Second)fmt.Printf("Goroutine %d reached the barrier.\n", i)barrier.Wait()fmt.Printf("Goroutine %d passed the barrier.\n", i)}(i)}// 主goroutine等待所有goroutine都到达屏障barrier.Await()fmt.Println("All goroutines have passed the barrier")
}

COPY

参考

  1. go/src/runtime/chan.go at master · golang/go · GitHub
  2. 逐步学习Go-并发通道chan(channel) – FOF编程网

编写不易,如有问题请评论告知

这篇关于逐步学习Go-并发通道chan(channel)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/853525

相关文章

从基础到高级详解Go语言中错误处理的实践指南

《从基础到高级详解Go语言中错误处理的实践指南》Go语言采用了一种独特而明确的错误处理哲学,与其他主流编程语言形成鲜明对比,本文将为大家详细介绍Go语言中错误处理详细方法,希望对大家有所帮助... 目录1 Go 错误处理哲学与核心机制1.1 错误接口设计1.2 错误与异常的区别2 错误创建与检查2.1 基础

Go语言中json操作的实现

《Go语言中json操作的实现》本文主要介绍了Go语言中的json操作的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录 一、jsOChina编程N 与 Go 类型对应关系️ 二、基本操作:编码与解码 三、结构体标签(Struc

Java JUC并发集合详解之线程安全容器完全攻略

《JavaJUC并发集合详解之线程安全容器完全攻略》Java通过java.util.concurrent(JUC)包提供了一整套线程安全的并发容器,它们不仅是简单的同步包装,更是基于精妙并发算法构建... 目录一、为什么需要JUC并发集合?二、核心并发集合分类与详解三、选型指南:如何选择合适的并发容器?在多

Java 结构化并发Structured Concurrency实践举例

《Java结构化并发StructuredConcurrency实践举例》Java21结构化并发通过作用域和任务句柄统一管理并发生命周期,解决线程泄漏与任务追踪问题,提升代码安全性和可观测性,其核心... 目录一、结构化并发的核心概念与设计目标二、结构化并发的核心组件(一)作用域(Scopes)(二)任务句柄

使用Go调用第三方API的方法详解

《使用Go调用第三方API的方法详解》在现代应用开发中,调用第三方API是非常常见的场景,比如获取天气预报、翻译文本、发送短信等,Go作为一门高效并发的编程语言,拥有强大的标准库和丰富的第三方库,可以... 目录引言一、准备工作二、案例1:调用天气查询 API1. 注册并获取 API Key2. 代码实现3

基于Go语言开发一个 IP 归属地查询接口工具

《基于Go语言开发一个IP归属地查询接口工具》在日常开发中,IP地址归属地查询是一个常见需求,本文将带大家使用Go语言快速开发一个IP归属地查询接口服务,有需要的小伙伴可以了解下... 目录功能目标技术栈项目结构核心代码(main.go)使用方法扩展功能总结在日常开发中,IP 地址归属地查询是一个常见需求:

Kotlin 协程之Channel的概念和基本使用详解

《Kotlin协程之Channel的概念和基本使用详解》文章介绍协程在复杂场景中使用Channel进行数据传递与控制,涵盖创建参数、缓冲策略、操作方式及异常处理,适用于持续数据流、多协程协作等,需注... 目录前言launch / async 适合的场景Channel 的概念和基本使用概念Channel 的

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

GO语言短变量声明的实现示例

《GO语言短变量声明的实现示例》在Go语言中,短变量声明是一种简洁的变量声明方式,使用:=运算符,可以自动推断变量类型,下面就来具体介绍一下如何使用,感兴趣的可以了解一下... 目录基本语法功能特点与var的区别适用场景注意事项基本语法variableName := value功能特点1、自动类型推

GO语言中函数命名返回值的使用

《GO语言中函数命名返回值的使用》在Go语言中,函数可以为其返回值指定名称,这被称为命名返回值或命名返回参数,这种特性可以使代码更清晰,特别是在返回多个值时,感兴趣的可以了解一下... 目录基本语法函数命名返回特点代码示例命名特点基本语法func functionName(parameters) (nam