Go 学习笔记(二十四)channel 入门

本文原创地址:博客园骏马金龙Go 基础系列:channel 入门

channel 基础

channel 用于 goroutines 之间的通信,让它们之间可以进行数据交换。像管道一样,一个 goroutine_A 向 channel_A 中放数据,另一个 goroutine_B 从 channel_A 取数据

channel 是指针类型的数据类型,通过 make 来分配内存。例如:

ch := make(chan int)

这表示创建一个 channel,这个 channel 中只能保存 int 类型的数据。也就是说一端只能向此 channel 中放进 int 类型的值,另一端只能从此 channel 中读出 int 类型的值。

需要注意,chan TYPE才表示 channel 的类型。所以其作为参数或返回值时,需指定为xxx chan int类似的格式。

向 ch 这个 channel 放数据的操作形式为:

ch <- VALUE

从 ch 这个 channel 读数据的操作形式为:

<-ch             // 从ch中读取一个值
val = <-ch
val := <-ch      // 从ch中读取一个值并保存到val变量中
val,ok = <-ch    // 从ch读取一个值,判断是否读取成功,如果成功则保存到val变量中

其实很简单,当 ch 出现在<-的左边表示 send,当 ch 出现在<-的右边表示 recv。

例如:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)
	go sender(ch)         // sender goroutine
	go recver(ch)         // recver goroutine
	time.Sleep(1e9)
}

func sender(ch chan string) {
	ch <- "malongshuai"
	ch <- "gaoxiaofang"
	ch <- "wugui"
	ch <- "tuner"
}

func recver(ch chan string) {
	var recv string
	for {
		recv = <-ch
		fmt.Println(recv)
	}
}

输出结果:

malongshuai
gaoxiaofang
wugui
tuner

上面激活了一个 goroutine 用于执行 sender()函数,该函数每次向 channel ch 中发送一个字符串。同时还激活了另一个 goroutine 用于执行 recver() 函数,该函数每次从 channel ch 中读取一个字符串。

注意上面的recv = <-ch,当 channel 中没有数据可读时,recver goroutine 将会阻塞在此行。由于 recver 中读取 channel 的操作放在了无限 for 循环中,表示 recver goroutine 将一直阻塞,直到从 channel ch 中读取到数据,读取到数据后进入下一轮循环由被阻塞在recv = <-ch上。直到 main 中的 time.Sleep() 指定的时间到了,main 程序终止,所有的 goroutine 将全部被强制终止。

因为 receiver 要不断从 channel 中读取可能存在的数据,所以 receiver 一般都使用一个无限循环来读取 channel ,避免 sender 发送的数据被丢弃。

channel 的属性和分类

channel 的 3 种操作

每个 channel 都有 3 种操作:send、receive 和 close

  • send:表示 sender 端的 goroutine 向 channel 中投放数据
  • receive:表示 receiver 端的 goroutine 从 channel 中读取数据
  • close:表示关闭 channel
    • 关闭 channel 后,send 操作将导致 painc
    • 关闭 channel 后,recv 操作将返回对应类型的 0 值以及一个状态码 false
    • close 并非强制需要使用 close(ch) 来关闭 channel,在某些时候可以自动被关闭
    • 如果使用 close(),建议条件允许的情况下加上 defer
    • 只在 sender 端上显式使用 close() 关闭 channel。因为关闭通道意味着没有数据再需要发送

例如,判断 channel 是否被关闭:

val, ok := <-counter
if ok {
	fmt.Println(val)
}

因为关闭通道也会让 recv 成功读取 (只不过读取到的值为类型的空值),使得原本阻塞在 recv 操作上的 goroutine 变得不阻塞,借此技巧可以实现 goroutine 的执行先后顺序 。具体示例见后文:指定 goroutine 的执行顺序

channel 的两种分类

channel 分为两种:unbuffered channel 和 buffered channel

  • unbuffered channel:阻塞、同步模式
    • sender 端向 channel 中 send 一个数据,然后阻塞,直到 receiver 端将此数据 receive
    • receiver 端一直阻塞,直到 sender 端向 channel 发送了一个数据
  • buffered channel:非阻塞、异步模式
    • sender 端可以向 channel 中 send 多个数据 (只要 channel 容量未满),容量满之前不会阻塞
    • receiver 端按照队列的方式 (FIFO, 先进先出) 从 buffered channel 中按序 receive 其中数据

可以认为 阻塞和不阻塞是由 channel 控制的,无论是 send 还是 recv 操作,都是在向 channel 发送请求

  • 对于 unbuffered channel,sender 发送一个数据,channel 暂时不会向 sender 的请求返回 ok 消息,而是等到 receiver 准备接收 channel 数据了,channel 才会向 sender 和 receiver 双方发送 ok 消息。在 sender 和 receiver 接收到 ok 消息之前,两者一直处于阻塞。
  • 对于 buffered channel,sender 每发送一个数据,只要 channel 容量未满,channel 都会向 sender 的请求直接返回一个 ok 消息,使得 sender 不会阻塞,直到 channel 容量已满,channel 不会向 sender 返回 ok,于是 sender 被阻塞。对于 receiver 也一样,只要 channel 非空,receiver 每次请求 channel 时,channel 都会向其返回 ok 消息,直到 channel 为空,channel 不会返回 ok 消息,receiver 被阻塞。

buffered channel 的两个属性

buffered channel 有两个属性:容量和长度 :和 slice 的 capacity 和 length 的概念是一样的

  • capacity:表示 bufffered channel 最多可以缓冲多少个数据
  • length:表示 buffered channel 当前已缓冲多少个数据
  • 创建 buffered channel 的方式为make(chan TYPE,CAP)

unbuffered channel 可以认为是容量为 0 的 buffered channel,所以每发送一个数据就被阻塞。注意,不是容量为 1 的 buffered channel,因为容量为 1 的 channel,是在 channel 中已有一个数据,并发送第二个数据的时候才被阻塞。

换句话说,send 被阻塞的时候,其实是没有发送成功的,只有被另一端读走一个数据之后才算是 send 成功。对于 unbuffered channel 来说,这是 send/recv 的同步模式。而 buffered channel 则是在每次发送数据到通道的时候,(通道) 都向发送者返回一个消息,容量未满的时候返回成功的消息,发送者因此而不会阻塞,容量已满的时候因为已满而迟迟不返回消息,使得发送者被阻塞

实际上,当向一个 channel 进行 send 的时候,先关闭了 channel,再读取 channel 时会发现错误在 send,而不是 recv。它会提示向已经关闭了的 channel 发送数据。

func main() {
	counter := make(chan int)
	go func() {
		counter <- 32
	}()
	close(counter)
	fmt.Println(<-counter)
}

输出报错:

panic: send on closed channel

所以,在 Go 的内部行为中,send 和 recv 是一个整体行为,数据未读就表示未 send 成功

两种特殊的 channel

有两种特殊的 channel:nil channel 和 channal 类型的 channel。

当未为 channel 分配内存时,channel 就是 nil channel,例如var ch1 chan int。nil channel 会永远阻塞对该 channel 的读、写操作。

nil channel 在某些时候有些妙用,例如在 select(关于 select,见后文) 的某个 case 分支 A 将其它某 case 分支 B 所操作的 channel 突然设置为 nil,这将会禁用 case 分支 B。

当 channel 的类型为一个 channel 时,就是 channel 的 channel,也就是双层通道。例如:

var chch1 chan chan int

channel 的 channel 是指通道里的数据是通道,可以认为通道里面嵌套了一个或多个通道:只能将整个通道发送到外层通道,读取外层通道时获取到的是内层通道,然后可以操作内层通道。

1.png

// 发送通道给外层通道
chch1 <-ch1
chch1 <-ch2

// 从外层通道取出内层通道
c <-chch1

// 操作取出的内层通道
c <-123
val := <-c

channel of channel 的妙用之一是将外层通道作为通道的加工厂:在某个 goroutine 中不断生成通道,在其它 goroutine 可以不断取出通道来操作。

死锁 (deadlock)

当 channel 的某一端 (sender/receiver) 期待另一端的 (receiver/sender) 操作,另一端正好在期待本端的操作时,也就是说两端都因为对方而使得自己当前处于阻塞状态,这时将会出现死锁问题。

更通俗地说, 只要所有 goroutine 都被阻塞,就会出现死锁

比如,在 main 函数中,它有一个默认的 goroutine,如果在此 goroutine 中创建一个 unbuffered channel,并在 main goroutine 中向此 channel 中发送数据并直接 receive 数据,将会出现死锁:

package main 

import (
	"fmt"
)

func main (){
	goo(32)
}

func goo(s int) {
	counter := make(chan int)
	counter <- s
	fmt.Println(<-counter)
}

在上面的示例中,向 unbuffered channel 中 send 数据的操作counter <- s是在 main goroutine 中进行的,从此 channel 中 recv 的操作<-counter也是在 main goroutine 中进行的。send 的时候会直接阻塞 main goroutine,使得 recv 操作无法被执行,go 将探测到此问题,并报错:

fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:

要修复此问题,只需将 send 操作放在另一个 goroutine 中执行即可:

package main

import (
	"fmt"
)

func main() {
	goo(32)
}

func goo(s int) {
	counter := make(chan int)
	go func() {
		counter <- s
	}()
	fmt.Println(<-counter)
}

或者,将 counter 设置为一个容量为 1 的 buffered channel:

counter := make(chan int,1)

这样放完一个数据后 send 不会阻塞 (被 recv 之前放第二个数据才会阻塞),可以执行到 recv 操作。

unbuffered channel 同步通信示例

下面通过 sync.WaitGroup 类型来等待程序的结束,分析多个 goroutine 之间通信时状态的转换。因为创建的 channel 是 unbuffered 类型的,所以 send 和 recv 都是阻塞的。

package main

import (
	"fmt"
	"sync"
)

// wg用于等待程序执行完成
var wg sync.WaitGroup

func main() {
	count := make(chan int)

	// 增加两个待等待的goroutines
	wg.Add(2)
	fmt.Println("Start Goroutines")

	// 激活一个goroutine,label:"Goroutine-1"
	go printCounts("Goroutine-1", count)
	// 激活另一个goroutine,label:"Goroutine-2"
	go printCounts("Goroutine-2", count)

	fmt.Println("Communication of channel begins")
	// 向channel中发送初始数据
	count <- 1

	// 等待goroutines都执行完成
	fmt.Println("Waiting To Finish")
	wg.Wait()
	fmt.Println("\nTerminating the Program")
}
func printCounts(label string, count chan int) {
	// goroutine执行完成时,wg的计数器减1
	defer wg.Done()
	for {
		// 从channel中接收数据
		// 如果无数据可recv,则goroutine阻塞在此
		val, ok := <-count
		if !ok {
			fmt.Println("Channel was closed:",label)
			return
		}
		fmt.Printf("Count: %d received from %s \n", val, label)
		if val == 10 {
			fmt.Printf("Channel Closed from %s \n", label)
			// Close the channel
			close(count)
			return
		}
		// 输出接收到的数据后,加1,并重新将其send到channel中
		val++
		count <- val
	}
}

上面的程序中,激活了两个 goroutine,激活这两个 goroutine 后,向 channel 中发送一个初始数据值 1,然后 main goroutine 将因为 wg.Wait() 等待 2 个 goroutine 都执行完成而被阻塞。

再看这两个 goroutine,这两个 goroutine 执行完全一样的函数代码,它们都接收 count 这个 channel 的数据,但可能是 goroutine1 先接收到 channel 中的初始值 1,也可能是 goroutine2 先接收到初始值 1。接收到数据后输出值,并在输出后对数据加 1,然后将加 1 后的数据再次 send 到 channel,每次 send 都会将自己这个 goroutine 阻塞 (因为 unbuffered channel),此时另一个 goroutine 因为等待 recv 而执行。当加 1 后发送给 channel 的数据为 10 之后,某 goroutine 将关闭 count channel,该 goroutine 将退出,wg 的计数器减 1,另一个 goroutine 因等待 recv 而阻塞的状态将因为 channel 的关闭而失败,ok 状态码将让该 goroutine 退出,于是 wg 的计数器减为 0,main goroutine 因为 wg.Wait() 而继续执行后面的代码。

使用 for range 迭代 channel

前面都是在 for 无限循环中读取 channel 中的数据,但也可以使用 range 来迭代 channel,它会返回每次迭代过程中所读取的数据,直到 channel 被关闭。必须注意,只要 channel 未关闭,range 迭代 channel 就会一直被阻塞。

例如,将上面示例中的 printCounts() 改为 for-range 的循环形式。

func printCounts(label string, count chan int) {
	defer wg.Done()
	for val := range count {
		fmt.Printf("Count: %d received from %s \n", val, label)
		if val == 10 {
			fmt.Printf("Channel Closed from %s \n", label)
			close(count)
			return
		}
		val++
		count <- val
	}
}

多个 "管道":输出作为输入

channel 是 goroutine 与 goroutine 之间通信的基础,一边产生数据放进 channel,另一边从 channel 读取放进来的数据。可以借此实现多个 goroutine 之间的数据交换,例如goroutine_1->goroutine_2->goroutine_3,就像 bash 的管道一样,上一个命令的输出可以不断传递给下一个命令的输入,只不过 golang 借助 channel 可以在多个 goroutine(如函数的执行) 之间传,而 bash 是在命令之间传。

以下是一个示例,第一个函数 getRandNum()用于生成随机整数,并将生成的整数放进第一个 channel ch1 中,第二个函数 addRandNum() 用于接收 ch1 中的数据 (来自第一个函数),将其输出,然后对接收的值加 1 后放进第二个 channel ch2 中,第三个函数 printRes 接收 ch2 中的数据并将其输出。

如果将函数认为是 Linux 的命令,则类似于下面的命令行:ch1 相当于第一个管道,ch2 相当于第二个管道

getRandNum | addRandNum | printRes

以下是代码部分:

package main

import (
	"fmt"
	"math/rand"
	"sync"
)

var wg sync.WaitGroup

func main() {
	wg.Add(3)
	// 创建两个channel
	ch1 := make(chan int)
	ch2 := make(chan int)

	// 3个goroutine并行
	go getRandNum(ch1)
	go addRandNum(ch1, ch2)
	go printRes(ch2)

	wg.Wait()
}

func getRandNum(out chan int) {
	// defer the wg.Done()
	defer wg.Done()

	var random int
	// 总共生成10个随机数
	for i := 0; i < 10; i++ {
		// 生成[0,30)之间的随机整数并放进channel out
		random = rand.Intn(30)
		out <- random
	}
	close(out)
}

func addRandNum(in,out chan int) {
	defer wg.Done()
	for v := range in {
		// 输出从第一个channel中读取到的数据
		// 并将值+1后放进第二个channel中
		fmt.Println("before +1:",v)
		out <- (v + 1)
	}
	close(out)
}

func printRes(in chan int){
	defer wg.Done()
	for v := range in {
		fmt.Println("after +1:",v)
	}
}

指定 channel 的方向

上面通过两个 channel 将 3 个 goroutine 连接起来,其中起连接作用的是第二个函数 addRandNum()。在这个函数中使用了两个 channel 作为参数:一个 channel 用于接收、一个 channel 用于发送。

其实 channel 类的参数变量可以指定数据流向:

  • in <-chan int:表示 channel in 通道只用于接收数据
  • out chan<- int:表示 channel out 通道只用于发送数据

2.png

只用于接收数据的通道<-chan不可被关闭,因为关闭通道是针对发送数据而言的,表示无数据再需发送。对于 recv 来说,关闭通道是没有意义的。

所以,上面示例中三个函数可改写为:

func getRandNum(out chan<- int) {
	...
}

func addRandNum(in <-chan int, out chan<- int) {
	...
}

func printRes(in <-chan int){
	...
}

buffered channel 异步队列请求示例

下面是使用 buffered channel 实现异步处理请求的示例。

在此示例中:

  • 有 (最多)3 个 worker,每个 worker 是一个 goroutine,它们有 worker ID。
  • 每个 worker 都从一个 buffered channel 中取出待执行的任务,每个任务是一个 struct 结构,包含了任务 id(JobID),当前任务的队列号 (ID) 以及任务的状态(worker 是否执行完成该任务)。
  • 在 main goroutine 中将每个任务 struct 发送到 buffered channel 中,这个 buffered channel 的容量为 10,也就是最多只允许 10 个任务进行排队。
  • worker 每次取出任务后,输出任务号,然后执行任务 (run),最后输出任务 id 已完成。
  • 每个 worker 执行任务的方式很简单:随机睡眠 0-1 秒钟,并将任务标记为完成。

以下是代码部分:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Task struct {
	ID         int
	JobID      int
	Status     string
	CreateTime time.Time
}

func (t *Task) run() {
	sleep := rand.Intn(1000)
	time.Sleep(time.Duration(sleep) * time.Millisecond)
	t.Status = "Completed"
}

var wg sync.WaitGroup

// worker的数量,即使用多少goroutine执行任务
const workerNum = 3

func main() {
	wg.Add(workerNum)

	// 创建容量为10的buffered channel
	taskQueue := make(chan *Task, 10)

	// 激活goroutine,执行任务
	for workID := 0; workID <= workerNum; workID++ {
		go worker(taskQueue, workID)
	}
	// 将待执行任务放进buffered channel,共15个任务
	for i := 1; i <= 15; i++ {
		taskQueue <- &Task{
			ID:         i,
			JobID:      100 + i,
			CreateTime: time.Now(),
		}
	}
	close(taskQueue)
	wg.Wait()
}

// 从buffered channel中读取任务,并执行任务
func worker(in <-chan *Task, workID int) {
	defer wg.Done()
	for v := range in {
		fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
		v.run()
		fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
	}
}

select 多路监听

很多时候想要同时操作多个 channel,比如从 ch1、ch2 读数据。Go 提供了一个 select 语句块,它像 switch 一样工作,里面放一些 case 语句块,用来轮询每个 case 语句块的 send 或 recv 情况。

select

用法格式示例:

select {
	// ch1有数据时,读取到v1变量中
	case v1 := <-ch1:
		...
	// ch2有数据时,读取到v2变量中
	case v2 := <-ch2:
		...
	// 所有case都不满足条件时,执行default
	default:
		...
}

defalut 语句是可选的,不允许 fall through 行为,但允许 case 语句块为空块。select 会被 return、break 关键字中断:return 是退出整个函数,break 是退出当前 select。

select 的行为模式主要是对 channel 是否可读进行轮询,但也可以用来向 channel 发送数据。它的行为如下:

  • 如果所有的 case 语句块评估时都被阻塞,则阻塞直到某个语句块可以被处理
  • 如果多个 case 同时满足条件,则 随机选择 一个进行处理,对于这一次的选择,其它的 case 都不会被阻塞,而是处理完被选中的 case 后进入下一轮 select(如果 select 在循环中) 或者结束 select(如果 select 不在循环中或循环次数结束)
  • 如果存在 default 且其它 case 都不满足条件,则执行 default。所以 default 必须要可执行而不能阻塞

如果有所疑惑,后文的 "select 超时时间" 有更有助于理解 select 的说明和示例。

所有的 case 块都是按源代码书写顺序进行评估的。当 select 未在循环中时,它将只对所有 case 评估一次,这次结束后就结束 select。某次评估过程中如果有满足条件的 case,则所有其它 case 都直接结束评估,并退出此次 select

其实如果注意到 select 语句是在某一个 goroutine 中评估的,就不难理解只有所有 case 都不满足条件时,select 所在 goroutine 才会被阻塞,只要有一个 case 满足条件,本次 select 就不会出现阻塞的情况。

需要注意的是, 如果在 select 中执行 send 操作,则可能会永远被 send 阻塞。所以,在使用 send 的时候,应该也使用 defalut 语句块,保证 send 不会被阻塞 。如果没有 default,或者能确保 select 不阻塞的语句块,则迟早会被 send 阻塞。在后文有一个 select 中 send 永久阻塞的分析:双层 channel 的一个示例

一般来说,select 会放在一个无限循环语句中,一直轮询 channel 的可读事件。

下面是一个示例,pump1()和 pump2()都用于产生数据 (一个产生偶数,一个产生奇数),并将数据分别放进 ch1 和 ch2 两个通道,suck() 则从 ch1 和 ch2 中读取数据。然后在无限循环中使用 select 轮询这两个通道是否可读,最后 main goroutine 在 1 秒后强制中断所有 goroutine。

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go pump1(ch1)
	go pump2(ch2)
	go suck(ch1, ch2)
	time.Sleep(1e9)
}
func pump1(ch chan int) {
	for i := 0; i <= 30; i++ {
		if i%2 == 0 {
			ch <- i
		}
	}
}
func pump2(ch chan int) {
	for i := 0; i <= 30; i++ {
		if i%2 == 1 {
			ch <- i
		}
	}
}
func suck(ch1 chan int, ch2 chan int) {
	for {
		select {
		case v := <-ch1:
			fmt.Printf("Recv on ch1: %d\n", v)
		case v := <-ch2:
			fmt.Printf("Recv on ch2: %d\n", v)
		}
	}
}

上一篇 Go 学习笔记(二十三)WaitGroup 用法说明
Go 学习笔记(目录)
下一篇 Go 学习笔记(二十五)双层 channel 用法示例