Kyle's Notebook

Go Channel 应用

Word count: 4.5kReading time: 20 min
2021/07/28

Go Channel 最佳实践

Go 语言的并发模式基于 CSP(Communicating Sequential Process,通信顺序进程)模型,即使用独立的进程组件来描述系统,其只通过消息传递实现通信。相比起基于锁的共享内存,Go 更推荐使用 Channel(以下简写为 chan)实现数据共享。常见的应用场景:

  • 作为 buffer 或 queue,解决多 goroutine 下的“生产者-消费者”问题。

  • goroutine 之间的 数据传递(引用)。

  • goroutine 可将 信号传递 给一或多个其它 goroutine(closing、closed、data ready 等)。

  • 支持 goroutine 分组、按照一定顺序并发或者串行执行。

  • 实现互斥锁机制。

声明:

1
2
3
4
5
6
7
8
9
10
11
12
chan string          // 可收发 string
chan<- struct{} // 只能发送 struct{}
<-chan int // 只能接收 int
//
// 其中 Channel 也可以用于传递 Channel:
chan<- (chan int)
chan<- (<-chan int)
<-chan (<-chan int)
chan (<-chan int)

// unbuffered chan
c := make(chan interface{})

发送数据:

1
ch <- 2000

接收数据:

1
2
x := <- ch           // 赋值或丢弃
<- ch

select case:

1
2
3
4
5
6
7
8
9
10
var ch = make(chan int, 10)
for i := 0; i < 10; i++ {
select {
// 发送数据时走这个分支
case ch <- i:
// 接收数据时,把值赋给 v 并输出。
case v := <-ch:
fmt.Println(v)
}
}

for range:

1
2
3
4
5
6
for v := range ch {
fmt.Println(v)
}

for range ch {
}

其具备以下特性:

  • chan 为空时从中接收数据,或为满时向它写入数据都会阻塞。

  • unbuffered chan 只有读写都准备好才不会阻塞。

  • nil 是 chan 的零值,对其发送接收都会阻塞。

  • 接收数据时可返回两个值。其一是 chan 中的元素,其二是表示接收是否成功的 bool,为 false 则表示 chan 已经被 close 且没有缓存数据。

  • 接收数据时得到的 0,可能是 chan 中的元素,也可能是 chan 已 close 且没有缓存数据。

Goroutine

首先应当明确 goroutine 的应用场景:Go 内置的 goroutine 是调度完全由用户控制的协程。其优势在于轻量级、执行效率高,并发访问共享数据时结合 chan 使用可处理好竞态条件问题,相比起互斥锁,这也是官方更推荐的做法。

何时委托?

如果主 goroutine 在从另一个 goroutine 获得结果之前无法取得进展,则通常情况下,由 goroutine 完成这项工作比委托它更简单。

比如在以下代码中,监听端口处理 HTTP 请求的工作显然由主 goroutine 完成更合适:

1
2
3
4
5
6
7
8
9
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request){
fmt.Fprintln(w, "Hello, World!")
})
// 假设监听端口处理请求的工作交由别的 goroutine 完成,则要注意在主 goroutine 设置阻塞等待(比如 select{}),否则会直接退出。
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}

这种做法消除了将结果从 goroutine 返回到其启动器所需的大量状态跟踪和 chan 操作。

由调用者控制并发

goroutine 在何时启动、停止,应该由函数的调用者来控制。

在一些库中通常可见提供下面两种 API:

1
2
3
func ListDirectory(dir string) ([] string, error)

func ListDirectory(dir string) chan string

前者是同步调用,函数调用处理完成后将返回结果包括所有数据和执行状态,如果函数内部涉及漫长等待的操作,则调用方会一直阻塞;而且当返回数据规模较大,还需要分配大量内存。

后者是异步调用,函数调用时在内部启动 goroutine、建立与调用者沟通的 chan 后即返回,后续通过 chan 与调用者传递数据,在调用结束后关闭 chan。优点是可以一边发送、一边同时接收,并由调用者决定是否开启并发接收。但由于在函数返回后即失去对内部 goroutine 和 chan 的控制,仍存在以下问题:

  • 函数调用过程出现错误,无法通过 chan 告知调用者,调用者也无从获知数据集是否完整。

  • 调用者必须把 chan 中的数据全部读取完(哪怕只需要其中一部分),否则会导致内存泄漏。

更推荐使用回调函数实现:

1
func ListDirectory(dir string, fn func(string))

其特点是无需在被调用者内部启动 goroutine,而是在外部启动 goroutine 调用 ListDirectory,在处理过程中每有数据即通过 fn 来发送。对于调用者而言,如何处理数据会更可控。

完整的生命周期管理

除了上述 API 以外,内存泄漏的场景比较常见,在业务开发中使用 goroutine 要确保能对其生命周期有完全的把控。

比如以下代码创建 goroutine 读取外部 chan 中的数据并输出,如果外部的 chan 一直没有数据写入,这个 goroutine 就会永远阻塞,造成泄漏。

1
2
3
4
5
6
7
func leak() {
ch := make(chan int)
go func() {
val := <- ch
fmt.Println("We received a value:", val)
}
}

以下代码的 search 表示一个耗时较长的数据查询操作,在主函数中如果 ctx.Done 先执行则会返回错误,即上面的 goroutine 中往 ch 写入数据不会被取走,因此造成泄漏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func search(term string) (string error) {
time.Sleep(200 * time.Millisecond)
return "some value", nil
}
func main() {
go func() {
record, err := search(term)
ch <- result{record, err}
}

select {
case <- ctx.Done():
return errors.New("search canceled")
case result := <-ch:
if result.err != nil {
return result.err
}
fmt.Println("Received: ", result.record)
return nil
}
}

应用生命周期管理

利用 goroutine、chan 和 WaitGroup 等工具可以很方便地实现应用的生命周期管理。

以下代码以 Worker 工作模式实现:

  • Tracker 包含传递数据和发送停止信号的 chan。

  • Event 用于发送数据以及处理 Context 退出。

  • Run 由调用者决定是否开启 goroutine 调用,从 chan 消费数据并打印,停止时会发送信号。

  • Shutdown 被调用时,先关闭信道 chan,再接收来自 Run 方法的退出信号。

  • Context 实现超时控制(如果 Shutdown 执行耗时较久)。

  • 只有数据的发送者可以决定何时关闭(比如关闭 HTTP Server 后)。当没有人往 chan 写数据,才能调用 Shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func main() {
tr := NewTracker()
go tr.Run()
_ = tr.Event(context.Background(), "test1")
_ = tr.Event(context.Background(), "test2")
_ = tr.Event(context.Background(), "test3")
time.Sleep(3 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
tr.Shutdown(ctx)
_ = tr.Event(context.Background(), "test4") // painc
}

type Tracker struct {
ch chan string
stop chan struct{}
}

func NewTracker() *Tracker {
return &Tracker{ch: make(chan string, 10)}
}

func (t *Tracker) Event(ctx context.Context, data string) error {
select {
case t.ch <- data:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (t *Tracker) Run() {
for data := range t.ch {
time.Sleep(1 * time.Second)
fmt.Println(data)
}
t.stop <- struct{}{}
}

func (t *Tracker) Shutdown(ctx context.Context) {
close(t.ch)
select {
case <-t.stop:
case <-ctx.Done():
}
}

处理 Sselect Case

使用 select 处理多个 chan 的通信必须在编译期硬编码,而当 chan 个数太多、甚至不确定时,这种做法很不灵活。

可使用反射处理一组作为 case clause 的 chan:在执行的 case 中随机选择一个,将其索引 chosen 返回。另外 recvOK 还能表示无可用的 case,recv 则表示接收的元素(recv case)。

1
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

比如以下代码中的 createCases 接收不定个数的双向 chan 并生成一组 recv case 和 send case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func createCases(chs ...chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 创建 recv case
for _, ch := range chs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch),
})
}

// 创建 send case
for i, ch := range chs {
v := reflect.ValueOf(i)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectSend, Chan: reflect.ValueOf(ch), Send: v,
})
}
return cases
}

使用 select 处理不定个数的 chan:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var (
ch1 = make(chan int, 10)
ch2 = make(chan int, 10)
)

// 创建 select case
var cases = createCases(ch1, ch2)

// 每轮循环从 cases 中随机选择一个执行
for i := 0; i < 10; i++ {
chosen, recv, ok := reflect.Select(cases)
if recv.IsValid() { // recv case
fmt.Println("recv:", cases[chosen].Dir, recv, ok)
} else { // send case
fmt.Println("send:", cases[chosen].Dir, ok)
}
}

数据传递

Channel 可理解为一个线程安全的阻塞队列,多个 goroutine 可从存取数据,实现信息交流。

以 Worker Pool 为例。通过 chan 实现 worker 池的任务处理中心,可解耦前端 HTTP 请求处理和后端任务处理的逻辑:将用户请求放在一个 chan Job 中,相当于待处理任务队列。再使用一个 chan chan Job,作为存放可以处理任务的 worker 的缓存队列。通过 dispatcher 把待处理任务队列中的任务放到可用的缓存队列中,由 worker 一直处理它的缓存队列。

信号通知

利用 chan 实现 wait/notify 机制:chan 为空时阻塞,有数据到达时唤醒。

比如用于业务的优雅关闭:在退出之前执行连接关闭、文件 close、缓存落盘等动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func main() {
var (
closing = make(chan struct{})
closed = make(chan struct{})
)

// 业务操作
go func() {
for {
select {
case <-closing:
return
default:
// 业务操作
time.Sleep(100 * time.Millisecond)
}
}
}()

// 处理 CTRL+C 等中断信号
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
close(closing)

// 执行退出之前的清理动作
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
fmt.Println("清理超时,不等了")
}
fmt.Println("优雅退出")
}

func doCleanup(closed chan struct{}) {
time.Sleep((time.Minute))
close(closed)
}

清理工作可能耗时较长,但程序不能等待这么久才退出,因此需要设置最长的等待时间,超时可以直接退出。因此退出阶段分为两个:closing(正在执行退出前的清理工作)和 close。

互斥锁

chan 的内部使用了互斥锁保护其所有字段,因此 chan 的发送和接收之间存在 happens-before 关系,能保证元素放进去只有 receiver 才能读取到。

使用 chan 实现互斥锁,可以初始化一个容量为 1 的 chan,放入一个元素表示锁,取得元素表示取得锁。再利用 select 也可以轻易实现 TryLock、Timeout 等功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
type Mutex struct {
ch chan struct{}
}

func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}

func (m *Mutex) Lock() {
<-m.ch
}

func (m *Mutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}

func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}

func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
timer.Stop()
return true
case <-timer.C:
}
return false
}

func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}

func main() {
m := NewMutex()
ok := m.TryLock()
fmt.Printf("locked v %v\n", ok)
ok = m.TryLock()
fmt.Printf("locked %v\n", ok)
}

流水线模式

chan 实现的信息交流机制也可以用于任务编排,比如利用 chan 来实现 WaitGroup 功能。编排指安排 goroutine 按照指定的顺序执行,或多个 chan 按照指定的方式组合处理。

其中编排 goroutine 也被称为 流水线模式:比如创建令牌、在多个 chan 之间流转以达到按顺序轮流工作的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Token struct{}

func newWorker(id int, ch chan Token, nextCh chan Token) {
for {
// worker 的 goroutine 在无限循环中执行,只有当 ch 有数据才会被唤醒(表示取得令牌)。
token := <-ch
fmt.Println((id + 1))
time.Sleep(time.Second)

// 执行业务操作后,将令牌交给下一个 worker 的 chan。
nextCh <- token
}
}
func main() {
chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}

// 创建 worker,把令牌交给第 1 个 worker。
for i := 0; i < 4; i++ {
go newWorker(i, chs[i], chs[(i+1)%4])
}
chs[0] <- struct{}{}

// 主 goroutine 的空 select 语句一直阻塞。
select {}
}

Or Done

即允许有多个任务同时执行,只要其中任意一个执行完,就算是整体执行成功(比如发送请求到多个服务节点,只要任意一个节点返回结果就算成功)。

最简单的方法是为每个 chan 启动一个 goroutine,但可能因为创建太多而影响性能。

因此可以考虑使用递归或反射的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func or(channels ...<-chan interface{}) <-chan interface{} {
// 特殊情况,只有 0 个或 1 个。
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
// 实现 1:利用反射构建 SelectCase,最后随机选择一个可用的 case。
var cases []reflect.SelectCase
for _, c := range channels {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c),
})
}
reflect.Select(cases)
// 实现 2:对 channels 二分递归处理。
// switch len(channels) {
// case 2: // 2个也是一种特殊情况
// select {
// case <-channels[0]:
// case <-channels[1]:
// }
// default: //超过两个,二分法递归处理
// m := len(channels) / 2
// select {
// case <-or(channels[:m]...):
// case <-or(channels[m:]...):
// }
// }
}()
return orDone
}


func sig(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}


func main() {
start := time.Now()
<-or(
sig(10*time.Second), sig(20*time.Second),
sig(30*time.Second), sig(40*time.Second),
sig(50*time.Second), sig(01*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))
}

其中在 chan 数量较多时,深层次的递归开销较大,因此更建议使用反射实现。

Fan in

即由多个源 chan 输入、一个目的 chan 输出的情况,receiver 只需要监听目的 chan 就可以接收所有发送到源 chan 的数据。可以使用反射实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
// 构造 SelectCase slice。
var cases []reflect.SelectCase
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c),
})
}

// 循环中从 cases 中选择可用的 case。
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok { // 此channel已经close
cases = append(cases[:i], cases[i+1:]...)
continue
}
out <- v.Interface()
}
}()
return out
}

或递归实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
m := len(chans) / 2
return mergeTwo(fanInRec(chans[:m]...), fanInRec(chans[m:]...))
}
}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
// 还有可读的 chan
for a != nil || b != nil {
select {
case v, ok := <-a:
if !ok { // 已关闭则设置为 nil,b 同理。
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok {
b = nil
continue
}
c <- v
}
}
}()
return c
}

Fan out

与 Fan in 相反,由一个源 chan 输入、多个目的 chan 输出,常用于观察者模式:一个对象状态发生变化,所有依赖于它的对象都会被通知并自动刷新。

从源 chan 取出数据后,依次发送给目标 chan。在发送时可以同步或异步发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
go func() {
defer func() {
// 退出时关闭所有输出 chan
for i := 0; i < len(out); i++ {
close(out[i])
}
}()

// 从输入 chan 读取数据
for v := range ch {
v := v
for i := 0; i < len(out); i++ {
i := i
// 同步或异步发送数据到 out。
if async {
go func() {
out[i] <- v
}()
} else {
out[i] <- v
}
}
}
}()
}

Stream

chan 也能作为流式管道使用,即把 chan 看作流(Stream),提供跳过或只取其中几个元素等方法。

先把一个 slice 转换成流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
s := make(chan interface{}) // 创建 unbuffered chan
go func() {
defer close(s) // 退出时关闭 chan
// 遍历数组,将数组元素写入 s。
for _, v := range values {
select {
case <-done:
return
case s <- v:
}
}
}()
return s
}

实现 takeN:只取流的前 n 个元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{}) // 创建输出流
go func() {
defer close(takeStream)
for i := 0; i < num; i++ { // 只读取前 num 个元素
select {
case <-done:
return
case takeStream <- <-valueStream: //从输入流中读取元素
}
}
}()
return takeStream
}

除此之外还能实现:

  • takeN:只取流中前 n 个数据。

  • takeFn:筛选保留流中满足条件的数据。

  • takeWhile:只取满足条件的数据,不满足则不再取。

  • skipN:跳过流中前几个数据。

  • skipFn:跳过满足条件的数据。

  • skipWhile:跳过前面满足条件的数据,不满足条件则当前元素和以后的元素都输出到 chan。

Map-Reduce

map-reduce 分为两个步骤:先对队列中的数据执行映射(map),然后把列表中的每一个元素按照一定的处理方式规约(reduce)成结果,放入到结果队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
out := make(chan interface{})
if in == nil {
close(out)
return out
}

go func() {
defer close(out)
// 从输入 chan 读取数据并执行业务操作,将结果写入 out。
for v := range in {
out <- fn(v)
}
}()
return out
}


func reduceChan(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
if in == nil {
return nil
}
// 读取元素第一个元素,同时也是处理后的最终结果。
out := <-in
// 完成 reduce 后返回 out。
for v := range in {
out = fn(out, v)
}
return out
}

测试 map-reduce:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 生成一个数据流
func asStream(done <-chan struct{}) <-chan interface{} {
s := make(chan interface{})
go func() {
defer close(s)
for _, v := range []int{1, 2, 3, 4, 5} { // 从数组生成
select {
case <-done:
return
case s <- v:
}
}
}()
return s
}

func main() {
in := asStream(nil)

// 定义 map 函数和 reduce 函数。实现对源中的每个数据 *10 后累加。
mapFn := func(v interface{}) interface{} {
return v.(int) * 10
}
reduceFn := func(r, v interface{}) interface{} {
return r.(int) + v.(int)
}
sum := reduceChan(mapChan(in, mapFn), reduceFn)
fmt.Println(sum)
}
CATALOG
  1. 1. Go Channel 最佳实践
    1. 1.1. Goroutine
      1. 1.1.1. 何时委托?
      2. 1.1.2. 由调用者控制并发
      3. 1.1.3. 完整的生命周期管理
    2. 1.2. 应用生命周期管理
    3. 1.3. 处理 Sselect Case
    4. 1.4. 数据传递
    5. 1.5. 信号通知
    6. 1.6. 互斥锁
    7. 1.7. 流水线模式
    8. 1.8. Or Done
    9. 1.9. Fan in
    10. 1.10. Fan out
    11. 1.11. Stream
    12. 1.12. Map-Reduce