Go Channel 最佳实践 Go 语言的并发模式基于 CSP(Communicating Sequential Process,通信顺序进程)模型,即使用独立的进程组件来描述系统,其只通过消息传递实现通信。相比起基于锁的共享内存,Go 更推荐使用 Channel(以下简写为 chan)实现数据共享。常见的应用场景:
作为 buffer 或 queue,解决多 goroutine 下的“生产者-消费者 ”问题。
goroutine 之间的 数据传递 (引用)。
goroutine 可将 信号传递 给一或多个其它 goroutine(closing、closed、data ready 等)。
支持 goroutine 分组、按照一定顺序并发或者串行执行。
实现互斥锁机制。
…
声明:
1 2 3 4 5 6 7 8 9 10 11 12 chan string chan <- struct {} <-chan int chan <- (chan int )chan <- (<-chan int )<-chan (<-chan int ) chan (<-chan int )c := make (chan interface {})
发送数据:
接收数据:
select case:
1 2 3 4 5 6 7 8 9 10 var ch = make (chan int , 10 )for i := 0 ; i < 10 ; i++ { select { case ch <- i: case v := <-ch: fmt.Println(v) } }
for range:
1 2 3 4 5 6 for v := range ch { fmt.Println(v) } for range ch {}
其具备以下特性:
chan 为空时从中接收数据,或为满时向它写入数据都会阻塞。
unbuffered chan 只有读写都准备好才不会阻塞。
nil 是 chan 的零值,对其发送接收都会阻塞。
接收数据时可返回两个值。其一是 chan 中的元素,其二是表示接收是否成功的 bool,为 false 则表示 chan 已经被 close 且没有缓存数据。
接收数据时得到的 0,可能是 chan 中的元素,也可能是 chan 已 close 且没有缓存数据。
Goroutine 首先应当明确 goroutine 的应用场景:Go 内置的 goroutine 是调度完全由用户控制的协程。其优势在于轻量级、执行效率高,并发访问共享数据时结合 chan 使用可处理好竞态条件问题,相比起互斥锁,这也是官方更推荐的做法。
何时委托? 如果主 goroutine 在从另一个 goroutine 获得结果之前无法取得进展,则通常情况下,由 goroutine 完成这项工作比委托它更简单。
比如在以下代码中,监听端口处理 HTTP 请求的工作显然由主 goroutine 完成更合适:
1 2 3 4 5 6 7 8 9 func main () { http.HandleFunc("/" , func (w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "Hello, World!" ) }) if err := http.ListenAndServe(":8080" , nil ); err != nil { log.Fatal(err) } }
这种做法消除了将结果从 goroutine 返回到其启动器所需的大量状态跟踪和 chan 操作。
由调用者控制并发 goroutine 在何时启动、停止,应该由函数的调用者来控制。
在一些库中通常可见提供下面两种 API:
1 2 3 func ListDirectory (dir string ) ([] string , error) func ListDirectory (dir string ) chan string
前者是同步调用,函数调用处理完成后将返回结果包括所有数据和执行状态,如果函数内部涉及漫长等待的操作,则调用方会一直阻塞;而且当返回数据规模较大,还需要分配大量内存。
后者是异步调用,函数调用时在内部启动 goroutine、建立与调用者沟通的 chan 后即返回,后续通过 chan 与调用者传递数据,在调用结束后关闭 chan。优点是可以一边发送、一边同时接收,并由调用者决定是否开启并发接收。但由于在函数返回后即失去对内部 goroutine 和 chan 的控制,仍存在以下问题:
更推荐使用回调函数实现:
1 func ListDirectory (dir string , fn func (string ) )
其特点是无需在被调用者内部启动 goroutine,而是在外部启动 goroutine 调用 ListDirectory
,在处理过程中每有数据即通过 fn
来发送。对于调用者而言,如何处理数据会更可控。
完整的生命周期管理 除了上述 API 以外,内存泄漏的场景比较常见,在业务开发中使用 goroutine 要确保能对其生命周期有完全的把控。
比如以下代码创建 goroutine 读取外部 chan 中的数据并输出,如果外部的 chan 一直没有数据写入,这个 goroutine 就会永远阻塞,造成泄漏。
1 2 3 4 5 6 7 func leak () { ch := make (chan int ) go func () { val := <- ch fmt.Println("We received a value:" , val) } }
以下代码的 search
表示一个耗时较长的数据查询操作,在主函数中如果 ctx.Done
先执行则会返回错误,即上面的 goroutine 中往 ch 写入数据不会被取走,因此造成泄漏。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func search (term string ) (string error) { time.Sleep(200 * time.Millisecond) return "some value" , nil } func main () { go func () { record, err := search(term) ch <- result{record, err} } select { case <- ctx.Done(): return errors.New("search canceled" ) case result := <-ch: if result.err != nil { return result.err } fmt.Println("Received: " , result.record) return nil } }
应用生命周期管理 利用 goroutine、chan 和 WaitGroup
等工具可以很方便地实现应用的生命周期管理。
以下代码以 Worker 工作模式实现:
Tracker
包含传递数据和发送停止信号的 chan。
Event
用于发送数据以及处理 Context 退出。
Run
由调用者决定是否开启 goroutine 调用,从 chan 消费数据并打印,停止时会发送信号。
Shutdown
被调用时,先关闭信道 chan,再接收来自 Run
方法的退出信号。
Context
实现超时控制(如果 Shutdown
执行耗时较久)。
只有数据的发送者可以决定何时关闭(比如关闭 HTTP Server 后)。当没有人往 chan 写数据,才能调用 Shutdown
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func main () { tr := NewTracker() go tr.Run() _ = tr.Event(context.Background(), "test1" ) _ = tr.Event(context.Background(), "test2" ) _ = tr.Event(context.Background(), "test3" ) time.Sleep(3 * time.Second) ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5 *time.Second)) defer cancel() tr.Shutdown(ctx) _ = tr.Event(context.Background(), "test4" ) } type Tracker struct { ch chan string stop chan struct {} } func NewTracker () *Tracker { return &Tracker{ch: make (chan string , 10 )} } func (t *Tracker) Event (ctx context.Context, data string ) error { select { case t.ch <- data: return nil case <-ctx.Done(): return ctx.Err() } } func (t *Tracker) Run () { for data := range t.ch { time.Sleep(1 * time.Second) fmt.Println(data) } t.stop <- struct {}{} } func (t *Tracker) Shutdown (ctx context.Context) { close (t.ch) select { case <-t.stop: case <-ctx.Done(): } }
处理 Sselect Case 使用 select 处理多个 chan 的通信必须在编译期硬编码,而当 chan 个数太多、甚至不确定时,这种做法很不灵活。
可使用反射处理一组作为 case clause 的 chan:在执行的 case 中随机选择一个,将其索引 chosen
返回。另外 recvOK
还能表示无可用的 case,recv
则表示接收的元素(recv case)。
1 func Select (cases []SelectCase) (chosen int , recv Value, recvOK bool )
比如以下代码中的 createCases
接收不定个数的双向 chan 并生成一组 recv case 和 send case:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func createCases (chs ...chan int ) []reflect .SelectCase { var cases []reflect.SelectCase for _, ch := range chs { cases = append (cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch), }) } for i, ch := range chs { v := reflect.ValueOf(i) cases = append (cases, reflect.SelectCase{ Dir: reflect.SelectSend, Chan: reflect.ValueOf(ch), Send: v, }) } return cases }
使用 select 处理不定个数的 chan:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 var ( ch1 = make (chan int , 10 ) ch2 = make (chan int , 10 ) ) var cases = createCases(ch1, ch2)for i := 0 ; i < 10 ; i++ { chosen, recv, ok := reflect.Select(cases) if recv.IsValid() { fmt.Println("recv:" , cases[chosen].Dir, recv, ok) } else { fmt.Println("send:" , cases[chosen].Dir, ok) } }
数据传递 Channel 可理解为一个线程安全的阻塞队列,多个 goroutine 可从存取数据,实现信息交流。
以 Worker Pool 为例。通过 chan 实现 worker 池的任务处理中心,可解耦前端 HTTP 请求处理和后端任务处理的逻辑:将用户请求放在一个 chan Job 中,相当于待处理任务队列。再使用一个 chan chan Job,作为存放可以处理任务的 worker 的缓存队列。通过 dispatcher 把待处理任务队列中的任务放到可用的缓存队列中,由 worker 一直处理它的缓存队列。
信号通知 利用 chan 实现 wait/notify 机制:chan 为空时阻塞,有数据到达时唤醒。
比如用于业务的优雅关闭:在退出之前执行连接关闭、文件 close、缓存落盘等动作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func main () { var ( closing = make (chan struct {}) closed = make (chan struct {}) ) go func () { for { select { case <-closing: return default : time.Sleep(100 * time.Millisecond) } } }() termChan := make (chan os.Signal) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan close (closing) go doCleanup(closed) select { case <-closed: case <-time.After(time.Second): fmt.Println("清理超时,不等了" ) } fmt.Println("优雅退出" ) } func doCleanup (closed chan struct {}) { time.Sleep((time.Minute)) close (closed) }
清理工作可能耗时较长,但程序不能等待这么久才退出,因此需要设置最长的等待时间,超时可以直接退出。因此退出阶段分为两个:closing(正在执行退出前的清理工作)和 close。
互斥锁 chan 的内部使用了互斥锁保护其所有字段,因此 chan 的发送和接收之间存在 happens-before 关系,能保证元素放进去只有 receiver 才能读取到。
使用 chan 实现互斥锁,可以初始化一个容量为 1 的 chan,放入一个元素表示锁,取得元素表示取得锁。再利用 select 也可以轻易实现 TryLock、Timeout 等功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 type Mutex struct { ch chan struct {} } func NewMutex () *Mutex { mu := &Mutex{make (chan struct {}, 1 )} mu.ch <- struct {}{} return mu } func (m *Mutex) Lock () { <-m.ch } func (m *Mutex) Unlock () { select { case m.ch <- struct {}{}: default : panic ("unlock of unlocked mutex" ) } } func (m *Mutex) TryLock () bool { select { case <-m.ch: return true default : } return false } func (m *Mutex) LockTimeout (timeout time.Duration) bool { timer := time.NewTimer(timeout) select { case <-m.ch: timer.Stop() return true case <-timer.C: } return false } func (m *Mutex) IsLocked () bool { return len (m.ch) == 0 } func main () { m := NewMutex() ok := m.TryLock() fmt.Printf("locked v %v\n" , ok) ok = m.TryLock() fmt.Printf("locked %v\n" , ok) }
流水线模式 chan 实现的信息交流机制也可以用于任务编排,比如利用 chan 来实现 WaitGroup
功能。编排指安排 goroutine 按照指定的顺序执行,或多个 chan 按照指定的方式组合处理。
其中编排 goroutine 也被称为 流水线模式 :比如创建令牌、在多个 chan 之间流转以达到按顺序轮流工作的效果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 type Token struct {}func newWorker (id int , ch chan Token, nextCh chan Token) { for { token := <-ch fmt.Println((id + 1 )) time.Sleep(time.Second) nextCh <- token } } func main () { chs := []chan Token{make (chan Token), make (chan Token), make (chan Token), make (chan Token)} for i := 0 ; i < 4 ; i++ { go newWorker(i, chs[i], chs[(i+1 )%4 ]) } chs[0 ] <- struct {}{} select {} }
Or Done 即允许有多个任务同时执行,只要其中任意一个执行完,就算是整体执行成功(比如发送请求到多个服务节点,只要任意一个节点返回结果就算成功)。
最简单的方法是为每个 chan 启动一个 goroutine,但可能因为创建太多而影响性能。
因此可以考虑使用递归或反射的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 func or (channels ...<-chan interface {}) <-chan interface {} { switch len (channels) { case 0 : return nil case 1 : return channels[0 ] } orDone := make (chan interface {}) go func () { defer close (orDone) var cases []reflect.SelectCase for _, c := range channels { cases = append (cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), }) } reflect.Select(cases) }() return orDone } func sig (after time.Duration) <-chan interface {} { c := make (chan interface {}) go func () { defer close (c) time.Sleep(after) }() return c } func main () { start := time.Now() <-or( sig(10 *time.Second), sig(20 *time.Second), sig(30 *time.Second), sig(40 *time.Second), sig(50 *time.Second), sig(01 *time.Minute), ) fmt.Printf("done after %v" , time.Since(start)) }
其中在 chan 数量较多时,深层次的递归开销较大,因此更建议使用反射实现。
Fan in 即由多个源 chan 输入、一个目的 chan 输出的情况,receiver 只需要监听目的 chan 就可以接收所有发送到源 chan 的数据。可以使用反射实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func fanInReflect (chans ...<-chan interface {}) <-chan interface {} { out := make (chan interface {}) go func () { defer close (out) var cases []reflect.SelectCase for _, c := range chans { cases = append (cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), }) } for len (cases) > 0 { i, v, ok := reflect.Select(cases) if !ok { cases = append (cases[:i], cases[i+1 :]...) continue } out <- v.Interface() } }() return out }
或递归实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func fanInRec (chans ...<-chan interface {}) <-chan interface {} { switch len (chans) { case 0 : c := make (chan interface {}) close (c) return c case 1 : return chans[0 ] case 2 : return mergeTwo(chans[0 ], chans[1 ]) default : m := len (chans) / 2 return mergeTwo(fanInRec(chans[:m]...), fanInRec(chans[m:]...)) } } func mergeTwo (a, b <-chan interface {}) <-chan interface {} { c := make (chan interface {}) go func () { defer close (c) for a != nil || b != nil { select { case v, ok := <-a: if !ok { a = nil continue } c <- v case v, ok := <-b: if !ok { b = nil continue } c <- v } } }() return c }
Fan out 与 Fan in 相反,由一个源 chan 输入、多个目的 chan 输出,常用于观察者模式:一个对象状态发生变化,所有依赖于它的对象都会被通知并自动刷新。
从源 chan 取出数据后,依次发送给目标 chan。在发送时可以同步或异步发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func fanOut (ch <-chan interface {}, out []chan interface {}, async bool ) { go func () { defer func () { for i := 0 ; i < len (out); i++ { close (out[i]) } }() for v := range ch { v := v for i := 0 ; i < len (out); i++ { i := i if async { go func () { out[i] <- v }() } else { out[i] <- v } } } }() }
Stream chan 也能作为流式管道使用,即把 chan 看作流(Stream),提供跳过或只取其中几个元素等方法。
先把一个 slice 转换成流:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func asStream (done <-chan struct {}, values ...interface {}) <-chan interface {} { s := make (chan interface {}) go func () { defer close (s) for _, v := range values { select { case <-done: return case s <- v: } } }() return s }
实现 takeN:只取流的前 n 个元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func takeN (done <-chan struct {}, valueStream <-chan interface {}, num int ) <-chan interface {} { takeStream := make (chan interface {}) go func () { defer close (takeStream) for i := 0 ; i < num; i++ { select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream }
除此之外还能实现:
Map-Reduce map-reduce 分为两个步骤:先对队列中的数据执行映射(map),然后把列表中的每一个元素按照一定的处理方式规约(reduce)成结果,放入到结果队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func mapChan (in <-chan interface {}, fn func (interface {}) interface {}) <-chan interface {} { out := make (chan interface {}) if in == nil { close (out) return out } go func () { defer close (out) for v := range in { out <- fn(v) } }() return out } func reduceChan (in <-chan interface {}, fn func (r, v interface {}) interface {}) interface {} { if in == nil { return nil } out := <-in for v := range in { out = fn(out, v) } return out }
测试 map-reduce:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func asStream (done <-chan struct {}) <-chan interface {} { s := make (chan interface {}) go func () { defer close (s) for _, v := range []int {1 , 2 , 3 , 4 , 5 } { select { case <-done: return case s <- v: } } }() return s } func main () { in := asStream(nil ) mapFn := func (v interface {}) interface {} { return v.(int ) * 10 } reduceFn := func (r, v interface {}) interface {} { return r.(int ) + v.(int ) } sum := reduceChan(mapChan(in, mapFn), reduceFn) fmt.Println(sum) }