Go 并发原语
Go 提供同步原语(Synchronization primitives,又称并发原语),常用于共享资源(互斥锁),编排任务(协程、等待组),传递消息(信道)。
Mutex 互斥锁
Mutex
实现了 Locker
接口,提供互斥锁功能。以加锁、解锁操作用于限制 goroutine 访问临界区:
1 | func(m *Mutex) Lock() |
如常见的 count++
操作(其汇编代码如下),非原子操作在并发场景下必然有资源竞争问题。
1 | MOVQ "".count(SB), AX ; 读取 count 当前值 |
因此引入 Mutex
才能确保多个 goroutine 并发执行 count++
的安全性。
1 | func main() { |
使用基于 sanitizers 开发的 race detector 工具,可在程序运行时监控对共享变量的非同步访问,打印警告信息。比如将上述代码中的 Mutex
去掉后运行:
1 | go run -race main.go |
这种做法只能在运行时检测,但在生产环境中使用会有性能损耗。
另外是使用 go race detector,在编译时插入指令,在运行时通过指令检测并发读写从而发现 data race 问题。
1 | go tool compile -race -S main.go |
Mutex
可以作为自定义 struct 的字段来控制其它字段的并发访问(对于多个字段,则把 Mutex 放在要控制的字段上,使用空格把字段分隔开来);再把并发操作封装成方法,对外不需要暴露锁等逻辑:
1 | func main() { |
实现细节
Mutex
利用 CAS 操作设置 state
标记(阻塞等待的 waiter 数量、饥饿标记、唤醒标记、持有状态标记)。
申请者释放: Mutex
本身没有包含持有锁的 goroutine 的信息,Unlock 方法可以被任意 goroutine 调用释放锁(即使不是由它持有)。正因如此无法实现 可重入。
可能存在 goroutine 以为自己持有锁、但实际上已经被其它 goroutine 释放的情况,导致 data race 问题。因此建议谁申请谁释放,且不要跨方法加锁解锁。
非公平竞争:已知取锁失败的 goroutine 会进入休眠,在锁释放后被唤醒、再次参与竞争。其中 CPU 中正在执行的 goroutine 有更多机会获取到锁,可减少上下文切换(甚至一个 goroutine 可以连续获得锁)。具体的处理逻辑如下:
goroutine | 当前锁被持有 | 当前锁未被持有 |
---|---|---|
新加入的 goroutine | waiter++ 休眠 |
获取锁 |
被唤醒的 goroutine | 清除 mutexWoken 标记 重新休眠,加入等待队列 |
清除 mutexWoken 标记 获取锁 |
自旋锁:如果新加入的或被唤醒的 goroutine 首次获取不到锁,就会先执行一定次数的自旋(runtime 实现的 spin 检查锁是否被释放),再执行原来的逻辑。当临界区代码较短,等待一小短时间来避免抢占、休眠和重新调度,进一步提高性能。
避免饥饿:以上两条提高性能的措施,可能导致一些等待中的 goroutine 一直得不到锁。因此后续的版本中做了以下调整:
饥饿模式:非公平等待时间限制在 1ms,等待超过该阈值,则竞争时会被优待。
修复“把唤醒 goroutine 放在等待队列尾部”导致不公平等待的 bug。
将 fast path(正好得到锁)和 slow path(尝试自旋或参与竞争)拆成独立的方法以便内联,提高性能。
调度器可以有更高优先级去执行 Mutex 唤醒后持有锁 waiter。
…
源码解析可参考:02 | Mutex:庖丁解牛看实现。
使用须知
Lock/Unlock 必须成对出现。一直不调用 Unlock
会导致死锁,对未加锁的 Mutex
调用 Unlock
会导致 panic
。
Mutex
不可复用。sync 的同步原语都不能复制使用(比如作为参数传入),因为可能不是初始状态(state
标记)。
1 | type Counter struct { |
可在编译时用 vet 工具(基于 copylock 静态分析)被检查出来:
1 | go vet main.go |
另外 Mutex
是不可重入的,Go 标准库中没有提供可重入锁。要自行实现可参考:03|Mutex:4种易错场景大盘点。
扩展:实现 TryLock
为 Mutex
添加 TryLock
的方法,实现尝试获取锁:即能取到就直接取用,否则就直接返回、不阻塞(后续可放弃操作,而不是一直阻塞)。
1 | const ( |
测试:
1 | var mu Mutex |
扩展:获取 Waiter 数量等指标
如同上面的方法利用 unsafe.Pointer
获取锁地址,再从前 4 字节中得到 state
字段的值:
1 | type Mutex struct { |
其中 state
字段共有四个标记:
1 | [mutexWaiters][mutexStarving][mutexWoken][mutexLocked] |
在 Mutex
的源码中可见,mutexWaiterShift
值为 3:
1 | const ( |
因此将 state
右移 3 位即可得到当前 waiter 的数量:
1 | waiterCount := state >> mutexWaiterShift |
如果当前锁已被其他 goroutine 持有,则还需要加上 1:
1 | waiterCount = waiterCount + (waiterCount & mutexLocked) // 再加上锁持有者的数量,0 或 1 |
同理可取得 锁是否被持有、是否有等待者被唤醒、是否处于饥饿状态 的信息:
1 | type Mutex struct { |
测试:
1 | var mu Mutex |
扩展:实现线程安全队列
1 | type NonBlockingQueue struct { |
RWMutex 读写锁
读写分离:在读多写少的场景下,区分读写操作、只对写操作加锁,以提高读性能(Readers–writers problem)。
Go 标准库提供 RWMutex
,在某一时刻只能由任意数量的 reader 持有,或者是只被单个的 writer 持有。
Lock
和Unlock
:写时调用,如锁已被 reader 或者 writer 持有,Lock
会一直阻塞直到能获取到锁。RLock
和RUnlock
:读时调用。如锁已被 writer 持有,RLock
方法会一直阻塞直到能获取到锁,否则就直接返回。RLocker
:为读操作返回一个Locker
接口对象。其Lock
方法会调用RWMutex
的RLock
方法,Unlock
方法会调用RWMutex
的RUnlock
方法。
以封装一个线程安全的计数器为例,在读时(Count
)加读锁、写时(Incr
)加写锁:
1 | type Counter struct { |
RWMutex
是基于 Mutex
实现的,其对写操作优先,即如果已有 writer 在等待请求锁(阻塞的 Lock
调用),则会阻止新的 reader 获取锁,优先保障 writer(注意不是抢占)。
1 | type RWMutex struct { |
实现细节
RLock
和 RUnlock
:
在加锁时先对
readerCount
+1。如仍 <0 则表示有 writer 等待锁,请求的 reader 会阻塞等待锁的释放。在解锁时先对
readerCount
-1。如仍 <0 则表示有 writer 等待锁,检查是否所有 reader 都释放了读锁,是则唤醒请求写锁的 writer;否则 writer 等待所有 reader 释放完(writer 的优先只是针对新参加竞争的 reader 而言)。
1 | func (rw *RWMutex) RLock() { |
Lock
:
writer 之间也有竞争,因此 writer 要取得获得内部的互斥锁。
随后反转
readerCount
字段(readerCount - rwmutexMaxReaders
,因此readerCount
还表示当前有 writer 占有或等待)。如果发现此时锁正被 reader 占用,则把值保存在
readerWait
,writer 进入阻塞;每当 reader 调用RUnlock
释放读锁readerWait
-1,直到所有 reader 都释放,则唤醒该 writer。
1 | func (rw *RWMutex) Lock() { |
Unlock
:
占用锁的 writer 在释放时再反转
readerCount
字段,唤醒新加入竞争、阻塞的 reader。最后再释放内部的互斥锁,允许其它 writer 参与竞争。
1 | func (rw *RWMutex) Unlock() { |
使用须知
与 Mutex
类似,RWMutex
也是不可复制、重入、未加锁先解锁,但其出现死锁的情况更复杂。
比如当有活跃 reader 时,writer 会等待,如果在 reader 的读操作中调用 writer 的写操作,reader 和 writer 就会形成互相依赖的死锁状态。
又比如 writer 请求锁时已经有活跃的 reader,它会等待活跃的 reader 完成才获取到锁,但是之后活跃的 reader 再依赖新的 reader,新的 reader 就会等待 writer 释放锁之后才能执行,此时就会形成循环依赖。
1 | // 形成循环等待: |
WaitGroup 等待组
WaitGroup
用于解决并发等待问题,类似 Java 的 CountDownLatch
和 CycliBarrier
。比如任务 A 中间需要等待并行的任务 B1、B2、B3 都完成才能继续执行,使用阻塞等待可以避免对 B 的轮询,一是实时性得到保证,二是避免空耗 CPU。
其包含三个方法:
Add
:设置计数值。Done
:计数值减 1(即Add(-1)
)。Wait
:阻塞当前 goroutine,直到计数值变为 0。
1 | type Counter struct { |
实现细节
WaitGroup
内部:
noCopy
:辅助 vet 工具检查该WaitGroup
是否复制使用。state1
:包含计数、阻塞在检查点的 waiter 数和信号量。
对于自定义结构,也可以通过嵌入
noCopy
实现 vet 检查。
1 | type WaitGroup struct { |
1 | 64 位对齐 |
Add
和 Done
:操作的是 state
的计数部分。可为计数值增加一个 delta 值,内部通过原子操作把该值加到计数值上。
1 | func (wg *WaitGroup) Add(delta int) { |
Wait
:不断检查 state 的值。如果计数值为 0,即所有任务完成,直接返回。否则调用者变成等待者、加入 waiter 队列并阻塞。
1 | func (wg *WaitGroup) Wait() { |
使用须知
由于
Add
可以传入负数(或多次调用Done
),当内部计数值 < 0,则会导致panic
。虽然
Add
可以在运行过程中增加计数值,但也建议预设,即确保Wait
方法必须在所有Add
方法调用之后才调用。如果在使用过程中需要调用
Add
增加计数值,应避免和Wait
操作的代码放在不同的 goroutine 中执行,即要杜绝对同一个WaitGroup
值的两种操作的并发执行。另外如果需要重用
WaitGroup
(比如类似 JavaCycliBarrier
),必须确保Wait
结束后才重置WaitGroup
的值,否则会导致panic
。
1 |
|
Cond 条件
一组 goroutine 等待某个条件(bool 变量),条件满足时其中一个或者所有 goroutine 被唤醒执行。其包括以下方法:
-
Signal
:调用者唤醒一个等待此Cond
的 goroutine。如果Cond
等待队列有一或多个等待的 goroutine,则从等待队列中移除第一个并把它唤醒(即其他语言的notify
)。 -
Broadcast
:调用者唤醒所有等待此Cond
的 goroutine(即其它语言的notifyAll
)。 -
Wait
:把调用者放入Cond
等待队列中并阻塞,直到被Signal
或Broadcast
从等待队列中移除并唤醒(注意唤醒后要检查条件)。
1 | type Cond |
1 | func main() { |
实现细节
1 | type Cond struct { |
Wait
把调用者加入等待队列并释放锁,被唤醒之后还会请求锁。在阻塞休眠期间,调用者不持有锁,可使其他 goroutine 有机会检查或者更新等待变量。等调用者被唤醒之后再去争抢这把锁。
使用须知
调用 Wait
时必须加锁,否则可能导致 Unlock
一个未加锁的 Locker
。
调用 Wait
必须检查等待条件是否满足,只有满足才执行。
Go 提供的 Channel
和 WaitGroup
是更广泛使用的通知机制,在 wait-notify 场景下比 Cond
更实用。而 Cond
独有的特点在于:
和一个
Locker
关联,可对相关的依赖条件更改提供保护。同时支持
Signal
和Broadcast
,而Channel
只能同时支持一种。Broadcast
可被重复调用。等待条件变成不满足后,又可以调用Broadcast
再次唤醒等待的 goroutine。而Channel
被close
后不能再使用。
扩展:实现阻塞队列
1 | type BlockingQueue struct { |
Once 单例
线程安全的单例一般用于初始化资源,可通过 package 级别的变量或 init
函数实现。
在创建资源(比如数据库连接等)要确保线程安全,使用懒汉式加锁的方式会有性能问题:
1 | var connMu sync.Mutex |
利用 Once
提供的 func (o *Once) Do(f func())
可确保在多次调用中,只有首次调用时传入的 f
函数才会执行。
1 | func main() { |
实现细节
自己实现 Once
:
1 | type Once struct { |
使用须知
由于 f
方法是无参无返回值的,当执行时 panic
或初始化资源失败,Once
还是会认为初次执行已成功,再次调用 Do
方法也不会再次执行 f
。
可以自己实现 Once
,既返回当前调用 Do
方法是否成功,还可以在初始化失败后调用 Do
方法再次尝试初始化直到成功。
1 | type Once struct { |
如果在标准库的 Once
上扩展,可以:
1 | type Once struct { |
另外 Go 没有提供 Immutable 类型,任何全局变量都可以修改。以全局变量形式实现的单例容易带来安全问题,因此需要控制好其作用域:比如不暴露变量本身(包内),而只对外提供 GetXxx
方法。
Pool 池
Go 采用 三色并发标记算法 实现自动标记对象和回收,但在垃圾收集阶段还是会有 STW 时间。为了减少对象创建回收开销可使用对象池。
Pool
用于保存一组可独立访问的临时(即当失去引用时会被回收)对象,其本身是线程安全的,但与其它并发原语一样不能复用。其提供以下方法:
New
:创建对象,当调用Get
但池中没有空闲元素时,即调用New
;如果没有设置New
,则调用Get
时返回 nil。Get
:从池中取走一个对象,可能是 nil(没有设置New
或没有空闲对象返回)。Put
:归还一个对象给池以供复用,如果传入 nil 会被忽略。
常见的使用场景是连接池:比如标准库提供的
Client
、数据库连接池DB
、第三方库 fatih/pool(不局限于 TCP 连接)等(但由于Pool
无通知地回收对象,连接池不建议基于Pool
实现)。
实现细节
Pool
内部字段 local
和 victim
用于存储空闲元素,每次垃圾回收时,Pool
会把 victim
的对象移除,把 local
的数据移给 victim
。因此 local
被清空,而 victim
存放临时对象:可在 GC 时被移除,或通过 Get
重用。
其中 GC 时 Poll
的处理逻辑如下:
1 |
|
其中 local
包含 poolLocalInternal
字段并提供 CPU 缓存对齐,从而避免 false sharing。其中 poolLocalInternal
:
private
:表示只能由一个 P 存取的缓存元素。一个 P 同时只能执行一个 goroutine,所以不会有并发的问题。shared
:可以由任意的 P 访问,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail,相当于只有一个本地的 P 作为生产者,多个 P 作为消费者(使用 local-free 的 queue 列表实现)。
Get
:按以下优先级取对象:本地 private
-> 本地 shared
-> getSlow
(其它的 shared
)-> New
。
1 | func (p *Pool) Get() interface{} { |
Put
:设置本地 private
,如已经有值就把此元素 push 到本地队列中。
1 | func (p *Pool) Put(x interface{}) { |
使用须知
内存泄漏:比如下面的 buffer 池,在取出 bytes.Buffer 使用时可以无限制地往里面插入大量 byte。此时即使 Reset 再放回到池中 byte slice 容量也不会改变,所占空间依然很大;由于 Pool 回收机制,大的 Buffer 可能不被回收。因此使用 Pool
回收 buffer 时,要检查回收对象大小,如果 buffer 太大就不要回收,否则太浪费。
1 | var buffers = sync.Pool{ |
内存浪费:即当池中 buffer 比较大,但实际上只用到较小的部分。可以根据对象大小把池分为几层,根据需要在指定的池中存取。比如:
1 | var ( |
在一些第三方库中也有这种 Pool
的实现:
Vitess 的 Bucketpool
Worker Pool
尽管 goroutine 很轻量级(栈 2048bytes,最大可扩展到 1GB),但大量的 goroutine 在调度和垃圾回收时的开销也会很大。因此可以创建 Worker Pool 减少 goroutine 的使用。比如用固定数量的 goroutine 处理大量连接请求:fasthttp。
大部分 Worker Pool 是通过 Channel 来缓存任务(能方便地实现并发保护),有的是多个 Worker 共享同一个任务 Channel,有的是每个 Worker 都有独立的 Channel。
推荐的第三方库:
gammazero/workerpool:可无限制地提交任务,有更便利的 Submit 和 SubmitWait 方法提交任务,还可以提供当前 Worker 数和任务数以及关闭 Pool 的功能。
ivpusic/grpool:创建 Pool 时需要提供 Worker 的数量和等待执行的任务的最大数量,可直接往 Channel 放入任务。
dpaks/goworkers:提供了更便利的 Submit 方法提交任务以及 Worker 数、任务数等查询方法、关闭 Pool 的方法。执行结果需要在 ResultChan 和 ErrChan 中去获取,没有提供阻塞方法,但可以在初始化时设置 Worker 数量和任务数。
panjf2000/ants、Jeffail/tunny、benmanns/goworker、go-playground/pool、Sherifabdlnaby/gpool、alitto/pond 等。
Context 上下文
上下文即在 API 或方法调用之间,传递的除了业务参数外的信息(比如 HTTP 请求的附带信息),Go 标准库提供的 Context
甚至还提供了超时和取消机制。
在 Go 服务中,传入的请求通过创建 goroutine 去处理,在此 goroutine 中通常需要派生出额外的 goroutine 来访问其他后端,比如同时访问数据库和 RPC 服务。
这些 goroutine 通常需要访问特定于请求的值(如用户身份相关信息),而且当请求被取消或超时,处理该请求的所有 goroutine 都应该快速退出(fail fast),使系统回收其正在使用的资源。
Context
包括以下方法:
Deadline
:返回该Context
被取消的截止日期。如果没有设置则 ok 的值是 false。每次调用都是返回相同结果。Done
:返回Channel
对象,在Context
被取消时此Channel
会被 close,可能会返回 nil。cancel、timeout、deadline 都可能导致Done
被 close,每次调用都是返回相同结果。Err
:当Done
被 close 时可以通过Err
获取错误信息,否则返回 nil。Value
:返回此Context
中和指定 key 关联的 value。
CancelFunc 调用 WithXxx 后返回的函数句柄,用于主动让下游结束。而 Done 则是被上游通知结束。
生成顶层 Context
的方法(底层一样):
Background
:返回非 nil 的、空的Context
。没有值、不会被 cancel、不会超时、没有截止日期。一般用于主函数、初始化、测试以及创建根Context
。TODO
:返回非 nil 的、空的Context
。没有值、不会被 cancel、不会超时、没有截止日期。一般用于不清楚是否该用 Context,或者目前还不知道要传递什么上下文信息时。
使用时遵循规则:
- 一般函数使用
Context
时会把它放在第一个参数的位置。 - 不要把 nil 用做
Context
类型的参数值(可以使用context.Background()
)。 -
Context
只用来临时做函数之间的上下文透传,不能持久化(数据库、本地文件、全局变量、缓存)。 - 常使用
struct{}
定义 key 的类型。对于 exported key 的静态类型,常是接口或指针。目的是尽量减少内存分配。
特殊 Context
WithXxx
函数的功能是为父节点生成带有 Done
方法的子节点,并返回子节点的 CancelFunc
函数句柄。
WithValue
基于 parent Context
生成新的 Context
,其保存了一个 key-value 键值对,用来传递上下文。它覆盖了 Value
方法,优先从自己的存储中检查 key,不存在则从 parent 中继续检查。
1 | [context] <---+ c.Context.Value(key) |
1 | type valueCtx struct { |
支持链式查找。查找不存在还会向 parent Context
查找,如果 parent 还是 valueCtx 则遵循相同原则。
1 | ctx = context.TODO() |
key 不应该是 string 或其它内建类型,否则在包之间使用 Context 时容易产生冲突(如果能保证 key 不会冲突,则可以使用自定义类型或内建类型)。
WithCancel
方法返回 parent 的副本,副本中的 Done Channel
是新建对象,类型是 cancelCtx。常在需要主动取消长时间的任务时使用:创建 Context
,把它传给长时间执行任务的 goroutine。需要中止任务时就可以 cancel 这个 Context
,长时间执行任务的 goroutine 就可以通过检查它,知道 Context
已经被取消。
WithCancel
返回的第二个值是一个 cancel 函数。只要任务正常完成就需要调用 cancel,这个 Context
才能释放资源(通知 children 处理 cancel,从它的 parent 中把自己移除,释放相关的 goroutine)。
其内部实现:
1 | func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { |
cancelCtx 被取消时,Err
就是这个 Canceled 错误:
1 | var Canceled = errors.New("context canceled") |
WithTimeout
和 WithDeadline
一样,其参数是超时时间。超时时间加上当前时间即截止时间,WithTimeout
的实现:
1 | func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { |
WithDeadline
WithDeadline
返回一个 parent 的副本,并设置不晚于参数 d 的截止时间,类型为 timerCtx 或 cancelCtx。如果其截止时间晚于 parent 的截止时间,就以 parent 的截止时间为准,并返回一个类型为 cancelCtx 的 Context(parent 的截止时间到了就会取消这个 cancelCtx)。
如果当前时间已经超过了截止时间就直接返回一个已经被 cancel 的 timerCtx;否则启动一个定时器,到截止时间取消这个 timerCtx。
因此触发 timerCtx 的 Done
被 close:
截止时间到。
cancel 函数被调用。
parent 的
Done
被 close。
1 |
|
使用方法:
1 | func slowOperationWithTimeout(ctx context.Context) (Result, error) { |
管理 goroutine 生命周期
把 Context 传递给 goroutine,由 goroutine 检查 Context 的 Done
是否关闭:
1 | func main() { |
使用须知
参数传递
context.WithValue
每次调用都会创建新的对象,以链表关联。这种结构是线程安全,支持并发访问,但由于访问效率较低,并不适合大量地调用(可以传递 map 或 struct)。
传递的数据是面向请求,适用于链路追踪、多租户染色发布等。不应该作为函数的可选参数来使用(比如 Context
中存放 sql.Tx
对象传递到 data 层使用等),因为元数据是隐式、面向请求的旁路信息挂载,而函数参数是显式、用于处理业务流程的。
同一个 Context
对象可以传递给在不同 goroutine 中运行的函数,对于多个 goroutine 同时使用是安全的。其中 value 应该是 immutable 的,即 context.WithValue(ctx, oldvalue)
。如果携带了集合类型的数据,要注意到读写操作可能造成 data race,可以用 COW(Copy on Write)的思路解决。
级联取消
当一个 Context
被取消时,从它派生的所有 Context
也将被取消。WithCancel(ctx)
参数 ctx 认为是 parent ctx,在内部会进行传播关系链的关联。调用 Done()
会返回一个 chan,当取消某个 parent context,实际上会递归层层 cancel 关闭自己 child context 的 done chan,从而让整个调用链中所有监听 cancel 的 goroutine 退出。
1 | gen := func(ctx context.Context) <-chan int { |
超时调用
利用 parent/child 机制,只需要启动一个定时器,每当超时直接将当前的 Context
cancel,就可以使监听在当前和下层的 context.Done()
的 goroutine 退出。
1 | const shortDuration = 1 * time.Millisecond |
为 Context
设计带超时的服务调用,并不意味着会通知远程服务就会自动取消这次调用,一般只是避免客户端长时间等待,服务端依然还在处理请求。所以有时 Context
并不会减少对服务端的请求负担。
如果在 Context
被 cancel 时能关闭和服务端的连接、中断和数据库的通讯、停止对本地文件的读写(依赖于超时的底层处理机制),才能减少服务调用的压力。
作用域
将 Context
集成到 API 中时,需要注意其作用域是请求级别的。比如贯穿请求处理过程,但于数据模型存在则没有意义,因此不应该将其存储在结构体中或持久化。但有例外是该结构纯粹用作通过信道传递的消息。
1 | type message struct { |
可用以下方法在 API 中集成 Context
对象:
首参数传递
Context
对象,参考 net 包func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error)
。可通过 context 对象取消函数调用。在第一个
Request
对象中携带可选的Context
对象。例如 net/http 包的func (r *Request) WithContext(ctx context.Context) *Request
,通过携带给定的Context
对象返回一个新的Request
对象。
Atomic 原子操作
原子操作即由 CPU 提供基础支持、确保指令必然完全执行/不执行的操作。即使在多处理器、多核、有 CPU cache(多 goroutine 访问),该操作也能保证原子性。
Go 标准库 sync/atomic
为 int32、int64、uint32、uint64、uintptr、Pointer 等类型提供了原子操作方法:AddXXX、CompareAndSwapXXX、SwapXXX、LoadXXX、StoreXXX 等,其操作对象是 变量的地址。
Add
:加法运算的第一个参数 delta 可以是负数,相当于减法。
1 | func AddInt32(addr *int32, delta int32) (new int32) |
对于无符号整型,可以利用补码规则:
1 | AddUint32(&x, ^uint32(c-1)) |
CAS
:比较替换,只有 addr 的值为 old,才把它修改为 new 并返回 true,否则返回 false(同理还有 Swap
,区别是直接替换、不比较)。
1 | func CompareAndSwapInt32(addr *int32, new int32) (swapped bool) |
Load
和 Store
:取出 addr 地址中的值,以及把一个值存入到指定的 addr 地址中。
1 | func LoadInt32(addr *int32) (val int32) |
Value
:支持原子地存取对象类型(非 nil,不能 CAS
和 Swap
),常用于配置变更等场景中。
参考以下实现配置变更的例子:
定义
Value
类型的变量config
, 用于存储配置信息。启动一个 goroutine,让它随机 sleep 一段时间,之后变更配置,并通过
Cond
通知其它 reader 加载新配置。启动一个 goroutine 等待配置变更的信号,一旦有变更就会加载最新的配置。
1 | type Config struct { |
在操作系统中,write 的地址基本上是对齐的(aligned)。 比如 32 位操作系统、CPU 和编译器,write 的地址总是 4 的倍数,64 位系统则总是 8 的倍数。这种情况下通过一个指令就可以实现地址的写操作,不会导致其他线程看到部分写。如果地址不是对齐,处理器需要分成两个指令处理,如果只执行了一个指令便被其它线程看到,被称为 撕裂写(torn write) 。因此赋值操作必须是原子操作,才能保证数据完整。
对于多核处理器,某个核对地址的值的更改在更新到主内存中前,是在多级缓存中存放的,此时其它核看到的数据可能不一样。由于 cache、指令重排,可见性等问题的存在,实现原子性依赖于内存屏障(memory fence):写内存屏障告知处理器,必须等到其管道中未完成的写被刷新到内存中再进行操作。此操作会让其它处理器的缓存失效,以便使之从主内存中获取最新值。
atomic 包方法会提供内存屏障功能,所以可以保证赋值的完整性,还能保证可见性:某个核更新了某地址的值,其它处理器总是能读取到它的最新值。需要注意的是,处理器之间保证数据的一致性也是会降低性能的。
功能扩展
一些第三方库对标准库的原子操作 API 提供了进一步封装。比如 uber-go/atomic 定义和封装了几种与常见类型对应的原子操作类型:Bool、Duration、Error、Float64、Int32、Int64、String、Uint32、Uint64 等。以及原子操作方法:CAS、Store、Swap、Toggle 等。其使用也更友好:
1 | var running atomic.Bool |
扩展:实现无锁队列
原子操作常用来实现 Lock-Free 数据结构,相比起同步的数据结构往往有更高的性能。
1 | type LKQueue struct { |
使用须知:
不要把内部使用的原子值暴露给外界。
如果不得不让包外或模块外的代码使用原子值,可声明一个包级私有的原子变量,再通过公开的函数让外界间接地使用到它,但不要把原子值传递到外界(包括指针)。
如果通过某个函数可以向内部的原子值存储值,应该在这个函数中先判断被存储值类型的合法性,避免造成
panic
。可以把原子值封装到结构体类型,既可以通过该类型的方法更安全地存储值,又可以在该类型中包含可存储值的合法类型信息。
Semaphore 信号量
信号量表示为使用一个(范围 [0, n] 的)变量实现并发控制的能力。当 goroutine 完成对此信号量等待(wait)时该计数值就减 1,对此信号量释放(release)时,该计数值就加 1。只有当计数器大于 0,等待的 goroutine 才有可能成功返回。
更复杂的信号量可以使用抽象数据类型代替变量,用来代表复杂的资源类型。
P/V 操作
信号量支持以下操作:
初始化:设定初始的资源数量。
P:信号量计数值 -1,如果新值为负,则调用者被阻塞并加入到等待队列中。否则调用者继续执行,并获得一个资源。
V:信号量计数值 +1,如果先前计数值为负,表示当前有等待的 P 操作调用者,则会从等待队列中取出一个等待的调用者唤醒,使之继续执行。
信号量可分为 计数信号量(counting semaphre)和 二进位信号量(binary semaphore)。计数信号量的计数值可以是任意整数,在特殊的情况下只为 0 或 1,则为二进位信号量,可提供互斥功能。
因此有时互斥锁也会使用二进位信号量实现(常用于保护一组资源,比如数据库连接池、客户端连接等),此时 P/V 操作相当于互斥锁的 Lock/Unlock。
在 Windows 中,互斥锁只能由持有锁的线程释放。而二进位信号量则没有这个限制。但对于 Go 而言,互斥锁也可以由非持锁 goroutine 释放,所以在行为上没有严格区别。
标准库实现
Go 标准库提供的 Mutex
内部使用信号量控制 goroutine 的等待和唤醒,并实现 P/V 操作的函数:
1 | type Mutex struct { |
但只作内部使用,没有封装对外暴露。要使用信号量可以用扩展包 semaphore 提供的 Weighted
。
1 | type Weighted struct { |
提供以下方法:
Acquire
:相当于 P 操作,可一次获取多个资源,如果没有足够多的资源调用者就会被阻塞。第一个参数是Context
,用于增加超时或 cancel 机制。如果正常获取资源就返回 nil;否则返回ctx.Err()
且信号量不改变。其内部实现需要监控资源是否可用,以及检测 Context 的 Done 是否已关闭。Release
:相当于 V 操作,可将 n 个资源释放、返还给信号量。如果调用该方法时,传递比请求到的数量更大的数值就会panic
。TryAcquire
:尝试不阻塞地获取 n 个资源,要么成功获取 n 个资源返回 true,要么获取失败返回 false。
其中 Acquire
和 Release
的实现:
1 | func (s *Weighted) Acquire(ctx context.Context, n int64) error { |
利用信号量实现 Worker Pool:
主 goroutine 是负责任务分发的 dispacher。先请求信号量,获取成功则启动一个 goroutine 处理计算,该 goroutine 会释放信号量(信号量的获取在主 goroutine,释放则在 worker goroutine);如获取失败就等到有信号量可用时再获取。
在实际应用中如果想等所有 Worker 执行完,可获取最大计数值的信号量(表示所有申请的都已释放)。
1 | var ( |
使用信号量需要注意,当公平性和安全性损害会导致程序 panic
。常见错误:
请求资源但没有释放。
释放从未请求的资源。
长时间持有资源(即使不需要)。
不持有资源却直接使用。
使用不同的信号量控制不同的资源时,也可能导致死锁。
因此必须确保按需请求、按量释放。
使用 Channel 实现
使用带 buffer 的 Channel 可以实现信号量(其中 P/V 操作以 Lock/Unlock 表示 ,且无法提供一次性申请多个的功能):
1 | type semaphore struct { |
由于带 buffer 的 Channel 本身就能表示多个资源,更建议直接使用 Channel,除非有必要一次性申请多个。
除此之外还有第三方库 marusama/semaphore,推荐在资源数量不固定、动态变化的场景使用。
SingleFlight 请求合并
多个 goroutine 同时调用同一个函数时,SingleFlight
可限制只有一个 goroutine 调用,当该 goroutine 返回,再把结果返回给其它 goroutine,可减少并发调用数。
区别于 sync.Once
保证永远只执行一次,SingleFlight
保证多个请求同时调用时只执行一个。前者常用在单次初始化场景,而后者常用于合并并发请求的场景(比如缓存)。尤其当面对类似秒杀等大量并发读时,可有效减缓后端服务压力。
实现原理
SingleFlight
使用互斥锁 Mutex
(提供并发读写保护)和 Map
(保存同一个 key 的正在处理请求)实现。其数据结构是 Group
,并提供以下方法:
Do
:执行函数并返回其执行结果。需提供用于确保在同时只有一个请求执行的 key,相同 key 的其它请求会等待该执行请求返回结果。指定fn
无参函数,返回执行结果或 error。Do
方法返回fn
执行的结果,以另一个返回值shared
指示是否返回给多个请求。DoChan
:类似Do
,区别是返回一个 chan。等fn
函数执行完,可从该 chan 接收结果。Forget
:告知Group
忘记指定 key,之后该 key 请求会执行fn
,而不是等待前一个未完成的fn
函数返回结果。
SingleFlight
内部使用辅助结构 call
,代表正在执行、或已完成的 fn
函数请求。
1 | // 表示一个正在处理或已经处理完的请求。 |
Do
方法:
1 | func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { |
应用场景
在 Go 标准库中也用到 SingleFlight
:
在 net/lookup.go 中,如果同时有查询同一个 host 的请求,
lookupGroup
会把请求合并。Go 在查询仓库版本信息时,将并发的请求合并成 1 个。
SingleFlight
最适合用于解决缓存击穿:并发的请求可以共享同一个结果,避免大量请求不经缓存直接落到数据库上。而且由于是缓存查询,不用考虑幂等性问题。比如缓存框架 groupcache:
1 | func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) |
其它项目比如 Cockroachdb、CoreDNS 都有 SingleFlight
的应用。但这一般只用于并发读场景,或是设置单一值的并发写场景。而对于并发的增减写操作则不合适(存在幂等性问题)。
CyclicBarrier 循环栅栏
CyclicBarrier
由第三方库 marusama/cyclicbarrier 提供,类似 WaitGroup
,常用于等待多个 goroutine 并发执行,而前者更适用于 固定数量的 goroutine 等待同一执行点 的场景,在放行 goroutine 后还能更方便地重复利用(在重用时后者还要处理重置计数值时的并发问题);后者更适用于 一个 goroutine 等待一组 goroutine 到达同一执行点 的场景。
两者对应关系:
CyclicBarrier | WaitGroup |
---|---|
New(n) | var wg WaitGroup(n) wg.Add(n) |
Await | wg.Done wg.Wait wg.Add(b) |
实现原理
CyclicBarrier
的初始化方法:
New
:只需要一个参数指定参与者数量。NewWithAction
:额外提供一个函数,用于在每次到达执行点时执行一次。时间点最后一个参与者到达后、其它参与者未被放行前。可利用它实现放行前的共享状态更新等操作。
CyclicBarrier
定义如下:
1 | type CyclicBarrier interface { |
基本应用
在使用时循环栅栏的参与者只需调用 Await
等待,当所有参与者到达后再执行下一步。当执行下一步时,循环栅栏的状态又恢复到初始状态,可迎接下一轮同样数量的参与者。
假设需求如下:
工厂提供多条生产线,每条负责生产氧原子(N 条)或氢原子(2N 条),各由一个 goroutine 负责。
通过一个栅栏,只有一个氧原子和两个氢原子准备好,才能生成出一个水分子,否则所有生产线都处于等待状态。
水分子是逐个按照顺序产生的(原子种类和数量有要求)。
需要引入:
信号量 semaH:控制氢原子。空槽数资源数设置为 2。
信号量 semaO:控制氧原子。空槽数资源数设置为 1。
循环栅栏:等待两个氢原子和一个氧原子填补空槽,直到任务完成。
1 | type H2O struct { |
氢原子与氧原子的流水线:
1 | func (h2o *H2O) hydrogen(releaseHydrogen func()) { |
测试:
1 | func TestWaterFactory(t *testing.T) { |
ErrGroup 子任务编排
ErrGroup
在 Go 的扩展库中提供,常用于将一个通用的父任务拆成几个小任务并发执行的场景。相比起 WaitGroup
其功能更丰富:
和
Context
集成。error 向上传播,可把子任务的错误传递给
Wait
的调用者。
提供三个方法:
WithContext
:传入Context
以创建Group
对象。其返回一个Group
实例以及一个使用context.WithCancel(ctx)
生成的新Context
。一旦有一个子任务返回错误或是Wait
调用返回,新Context
就会被cancel
。如果传递参数是可cancel
的Context
,则其被cancel
时不会终止正在执行的子任务。Go
:传入一个无参、带 error 返回值的函数,创建 goroutine 执行子任务。Wait
:阻塞直到所有子任务完成后才返回。如多个子任务返回错误,则只会返回第一个出现的错误,所有子任务都执行成功则返回 nil。
默认只返回第一个错误,如要实现收集所有子任务执行结果,可使用全局变量:
1 | var g errgroup.Group |
实现 Pipeline
官方文档提供的 例子:一个子任务遍历目录下的文件,把遍历出的文件交给 20 个 goroutine、并行计算文件的 md5。
1 | func main() { |
ErrGroup
也存在一些问题:如果无限制地直接调用 Go
方法,就会创建出非常多 goroutine,会带来调度和 GC 的压力、占用更多内存。当前 Go 运行时创建的 g 对象只会增长和重用,不会回收。在高并发场景下要尽可能减少 goroutine 的使用(比如使用 Work Pool 或类似 containerd/stargz-snapshotter 的方案,信号量的资源数就是可并行的 goroutine 的数量)。
扩展库
很多在 Go 官方 ErrGroup
基础上进行扩展、或自行实现分组功能的第三方库,提供了更多更丰富的功能:
bilibili/errgroup:可以使用固定数量的 goroutine 处理子任务。并提供了 cancel(失败的子任务可以 cancel 所有正在执行任务)和 recover(把
panic
的堆栈信息放到 error 中,避免子任务panic
导致的程序崩溃)功能。但一旦设置了并发数,超过并发数的子任务需要等到调用者调用Wait
之后才执行(而不是 goroutine 空闲下来就自动执行)。而且在高并发下如果任务数大于 goroutine 数,且任务被集中加入到 Group 中,该库会把子任务加入到一个非线程安全的数组。neilotoole/errgroup:对标准库
WaitGroup
的扩展。其Wait
方法可返回一或多个 error。子任务在调用Done
之前可把自己的 error 信息设置给ErrGroup
。在Wait
返回时,把这些 error 信息返回给调用者。go-pkgz/syncs:提供了
SizedGroup
和ErrSizedGroup
,支持直接控制子任务的并发数(而不是 goroutine 数),并提供相应的 failfast 的错误处理能力。vardius/gollback:用于处理一组子任务的执行,且解决了
ErrGroup
收集子任务返回结果的痛点。其提供三个方法:All
:等待所有的异步函数都执行完才返回执行结果和错误信息,且返回结果的顺序与传入函数的顺序保持一致。Race
:与All
方法类似,但在使用时只要一个异步函数执行成功就马上返回,不会返回所有的子任务信息。只有全部失败才返回最后一个 error 信息。Retry
:执行一个子任务,执行失败就会尝试一定次数(为 0 则一致尝试直到成功),如一直不成功就会返回错误信息 ,否则它会立即返回。
AaronJan/Hunch:和 gollback 类似,但提供的方法更多(
All
、Take
、Last
、Retry
、Waterfall
等)。mdlayher/schedgroup:提供与时间相关、处理一组 goroutine 的并发原语 schedgroup,可指定任务在某时间后执行(提供
Delay
、Schedule
、Wait
等方法)。
待续。
参考
Practical Go: Real world advice for writing maintainable Go programs (cheney.net)
The Go Memory Model - The Go Programming Language (golang.org)
Ice cream makers and data races – The acme of foolishness (cheney.net)
Go: Discovery of the Trace Package | by Vincent Blanchon | A Journey With Go | Medium
Go: Buffered and Unbuffered Channels | by Vincent Blanchon | A Journey With Go | Medium
Go: Ordering in Select Statements | by Vincent Blanchon | A Journey With Go | Medium
Go Concurrency Patterns: Timing out, moving on - The Go Blog (golang.org)
Go Concurrency Patterns: Pipelines and cancellation - The Go Blog (golang.org)
Running MongoDB Queries Concurrently With Go (ardanlabs.com)
Go advanced concurrency patterns: part 3 (channels) - Blog Title
Pool Go Routines To Process Task Oriented Work (ardanlabs.com)
The Go Programming Language Specification - The Go Programming Language (golang.org)
Go: Context and Cancellation by Propagation | by Vincent Blanchon | A Journey With Go | Medium
How to correctly use context.Context in Go 1.7 | by Jack Lindamood | Medium