Kyle's Notebook

Go 并发原语

Word count: 17.6kReading time: 77 min
2021/10/15

Go 并发原语

Go 提供同步原语(Synchronization primitives,又称并发原语),常用于共享资源(互斥锁),编排任务(协程、等待组),传递消息(信道)。

Mutex 互斥锁

Mutex 实现了 Locker 接口,提供互斥锁功能。以加锁、解锁操作用于限制 goroutine 访问临界区:

1
2
func(m *Mutex) Lock()
func(m *Mutex) Unlock()

如常见的 count++ 操作(其汇编代码如下),非原子操作在并发场景下必然有资源竞争问题。

1
2
3
MOVQ    "".count(SB), AX    ; 读取 count 当前值
LEAQ 1(AX), CX ; 对值 +1
MOVQ CX, "".count(SB) ; 将结果重新写入

因此引入 Mutex 才能确保多个 goroutine 并发执行 count++ 的安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
var mu sync.Mutex
var count = 0

// 用于确认所有的 goroutine 都完成
var wg sync.WaitGroup
wg.Add(10)

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100000; j++ {
mu.Lock()
count++
mu.Unlock()
}
}()
}
wg.Wait()
fmt.Println(count)
}

使用基于 sanitizers 开发的 race detector 工具,可在程序运行时监控对共享变量的非同步访问,打印警告信息。比如将上述代码中的 Mutex 去掉后运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go run -race main.go

# # command-line-arguments
# .\main.go:10:9: mu declared but not used
#
# E:\Project\test\main>go run -race main.go
# ==================
# WARNING: DATA RACE
# Read at 0x00c00012e058 by goroutine 8:
# main.main.func1()
# E:/Project/test/main/main.go:25 +0x84
#
# Previous write at 0x00c00012e058 by goroutine 7:
# main.main.func1()
# E:/Project/test/main/main.go:25 +0x9d
#
# Goroutine 8 (running) created at:
# main.main()
# E:/Project/test/main/main.go:20 +0xeb
#
# Goroutine 7 (running) created at:
# main.main()
# E:/Project/test/main/main.go:20 +0xeb
# ==================
# 422886
# Found 1 data race(s)
# exit status 66

# 其提示在源代码中的 25 行,有多个 goroutine 对内存地址 xxx 有并发读写行为。

这种做法只能在运行时检测,但在生产环境中使用会有性能损耗。

另外是使用 go race detector,在编译时插入指令,在运行时通过指令检测并发读写从而发现 data race 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go tool compile -race -S main.go
# 0x002a 00042 (main.go:13) CALL runtime.racefuncenter(SB)
# ......
# 0x0061 00097 (main.go:14) JMP 173
# 0x0063 00099 (main.go:15) MOVQ AX, "".j+8(SP)
# 0x0068 00104 (main.go:16) PCDATA $0, $1
# 0x0068 00104 (main.go:16) MOVQ "".&count+128(SP), AX
# 0x0070 00112 (main.go:16) PCDATA $0, $0
# 0x0070 00112 (main.go:16) MOVQ AX, (SP)
# 0x0074 00116 (main.go:16) CALL runtime.raceread(SB)
# 0x0079 00121 (main.go:16) PCDATA $0, $1
# 0x0079 00121 (main.go:16) MOVQ "".&count+128(SP), AX
# 0x0081 00129 (main.go:16) MOVQ (AX), CX
# 0x0084 00132 (main.go:16) MOVQ CX, ""..autotmp_8+16(SP)
# 0x0089 00137 (main.go:16) PCDATA $0, $0
# 0x0089 00137 (main.go:16) MOVQ AX, (SP)
# 0x008d 00141 (main.go:16) CALL runtime.racewrite(SB)
# 0x0092 00146 (main.go:16) MOVQ ""..autotmp_8+16(SP), AX
# ......
# 0x00b6 00182 (main.go:18) CALL runtime.deferreturn(SB)
# 0x00bb 00187 (main.go:18) CALL runtime.racefuncexit(SB)
# 0x00c0 00192 (main.go:18) MOVQ 104(SP), BP
# 0x00c5 00197 (main.go:18) ADDQ $112, SP

Mutex 可以作为自定义 struct 的字段来控制其它字段的并发访问(对于多个字段,则把 Mutex 放在要控制的字段上,使用空格把字段分隔开来);再把并发操作封装成方法,对外不需要暴露锁等逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func main() {
var counter Counter

var wg sync.WaitGroup
wg.Add(10)

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100000; j++ {
counter.Incr()
}
}()
}
wg.Wait()
fmt.Println(counter.Count())
}

type Counter struct {
CounterType int
Name string
mu sync.Mutex
count uint64
}

func (c *Counter) Incr() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}

func (c *Counter) Count() uint64 {
c.mu.Lock()
// Go 1.14+ 对 defer 做了优化(采用更有效的内联方式,取代之前生成 defer 对象到 defer chain 中的做法),利用 defer 释放锁更合理:Lock 和 Unlock 总是成对出现。
defer c.mu.Unlock()
return c.count
}

实现细节

Mutex 利用 CAS 操作设置 state 标记(阻塞等待的 waiter 数量、饥饿标记、唤醒标记、持有状态标记)。

申请者释放Mutex 本身没有包含持有锁的 goroutine 的信息,Unlock 方法可以被任意 goroutine 调用释放锁(即使不是由它持有)。正因如此无法实现 可重入

可能存在 goroutine 以为自己持有锁、但实际上已经被其它 goroutine 释放的情况,导致 data race 问题。因此建议谁申请谁释放,且不要跨方法加锁解锁。

非公平竞争:已知取锁失败的 goroutine 会进入休眠,在锁释放后被唤醒、再次参与竞争。其中 CPU 中正在执行的 goroutine 有更多机会获取到锁,可减少上下文切换(甚至一个 goroutine 可以连续获得锁)。具体的处理逻辑如下:

goroutine 当前锁被持有 当前锁未被持有
新加入的 goroutine waiter++
休眠
获取锁
被唤醒的 goroutine 清除 mutexWoken 标记
重新休眠,加入等待队列
清除 mutexWoken 标记
获取锁

自旋锁:如果新加入的或被唤醒的 goroutine 首次获取不到锁,就会先执行一定次数的自旋(runtime 实现的 spin 检查锁是否被释放),再执行原来的逻辑。当临界区代码较短,等待一小短时间来避免抢占、休眠和重新调度,进一步提高性能。

避免饥饿:以上两条提高性能的措施,可能导致一些等待中的 goroutine 一直得不到锁。因此后续的版本中做了以下调整:

  • 饥饿模式:非公平等待时间限制在 1ms,等待超过该阈值,则竞争时会被优待。

  • 修复“把唤醒 goroutine 放在等待队列尾部”导致不公平等待的 bug。

  • 将 fast path(正好得到锁)和 slow path(尝试自旋或参与竞争)拆成独立的方法以便内联,提高性能。

  • 调度器可以有更高优先级去执行 Mutex 唤醒后持有锁 waiter。

源码解析可参考:02 | Mutex:庖丁解牛看实现

使用须知

Lock/Unlock 必须成对出现。一直不调用 Unlock 会导致死锁,对未加锁的 Mutex 调用 Unlock 会导致 panic

Mutex 不可复用。sync 的同步原语都不能复制使用(比如作为参数传入),因为可能不是初始状态(state 标记)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Counter struct {
sync.Mutex
Count int
}

func foo(c Counter) {
// 重复加锁。
c.Lock()
defer c.Unlock()
fmt.Println("in foo")
}

func main() {
var c Counter
c.Lock()
defer c.Unlock()
c.Count++
// 调用时将锁定状态的 Counter 传入。
foo(c)
}

可在编译时用 vet 工具(基于 copylock 静态分析)被检查出来:

1
2
3
4
5
6
go vet main.go
# 分析函数调用、range 遍历、复制、声明、函数返回值等位置,是否出现锁 copy。

# # command-line-arguments
# .\main.go:19:9: call of foo copies lock value: command-line-arguments.Counter
# .\main.go:23:12: foo passes lock by value: command-line-arguments.Counter

另外 Mutex 是不可重入的,Go 标准库中没有提供可重入锁。要自行实现可参考:03|Mutex:4种易错场景大盘点

扩展:实现 TryLock

Mutex 添加 TryLock 的方法,实现尝试获取锁:即能取到就直接取用,否则就直接返回、不阻塞(后续可放弃操作,而不是一直阻塞)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const (
mutexLocked = 1 << iota // 加锁标识位置(从 1 开始)
mutexWoken // 唤醒标识位置
mutexStarving // 锁饥饿标识位置
mutexWaiterShift = iota // 标识 waiter 的起始 bit 位置
)

type Mutex struct {
sync.Mutex
}

// TryLock 尝试获取锁
func (m *Mutex) TryLock() bool {

// 取锁的地址,从中可获取 state 的值。
addr := (*int32)(unsafe.Pointer(&m.Mutex))

// fast path:如果能成功抢到锁,返回 true。
if atomic.CompareAndSwapInt32(addr, 0, mutexLocked) {
return true
}

// 如果处于唤醒、加锁或者饥饿状态,不参与竞争返回 false。
old := atomic.LoadInt32(addr)
if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
return false
}

// 尝试在竞争的状态下请求锁
new := old | mutexLocked
return atomic.CompareAndSwapInt32(addr, old, new)
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var mu Mutex

// 启动 goroutine,在一段时间持有锁。
go func() {
mu.Lock()
time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
mu.Unlock()
}()
time.Sleep(time.Second)

// 尝试获取到锁
if mu.TryLock() {
fmt.Println("got the lock")
// do something
mu.Unlock()
return
}

// 没有获取到
fmt.Println("can't get the lock")

扩展:获取 Waiter 数量等指标

如同上面的方法利用 unsafe.Pointer 获取锁地址,再从前 4 字节中得到 state 字段的值:

1
2
3
4
5
6
7
type Mutex struct {
state int32
sema uint32
}

var mu Mutex
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))

其中 state 字段共有四个标记:

1
2
[mutexWaiters][mutexStarving][mutexWoken][mutexLocked]
阻塞 waiter 数 饥饿标记 唤醒标记 锁持有标记

Mutex 的源码中可见,mutexWaiterShift 值为 3:

1
2
3
4
5
6
const (
mutexLocked = 1 << iota // 1 加锁标识位置
mutexWoken // 2 唤醒标识位置
mutexStarving // 4 锁饥饿标识位置
mutexWaiterShift = iota // 3 标识 waiter 的起始 bit 位置
)

因此将 state 右移 3 位即可得到当前 waiter 的数量:

1
waiterCount := state >> mutexWaiterShift

如果当前锁已被其他 goroutine 持有,则还需要加上 1:

1
waiterCount = waiterCount + (waiterCount & mutexLocked)    // 再加上锁持有者的数量,0 或 1

同理可取得 锁是否被持有是否有等待者被唤醒是否处于饥饿状态 的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Mutex struct {
sync.Mutex
}

func (m *Mutex) Count() int {
v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
v = v >> mutexWaiterShift
v = v + (v & mutexLocked)
return int(v)
}

func (m *Mutex) IsLocked() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexLocked == mutexLocked
}

func (m *Mutex) IsWoken() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexWoken == mutexWoken
}

func (m *Mutex) IsStarving() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexStarving == mutexStarving
}

测试:

1
2
3
4
5
6
7
8
9
10
var mu Mutex
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
time.Sleep(time.Second)
mu.Unlock()
}()
}
time.Sleep(time.Second)
fmt.Printf("waitings: %d, isLocked: %t, woken: %t, starving: %t\n", mu.Count(), mu.IsLocked(), mu.IsWoken(), mu.IsStarving())

扩展:实现线程安全队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type NonBlockingQueue struct {
data []interface{}
mu sync.Mutex
}

func NewBlockingQueue(n int) (q *NonBlockingQueue) {
return &NonBlockingQueue{data: make([]interface{}, 0, n)}
}

func (q *NonBlockingQueue) Add(v interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
q.data = append(q.data, v)
}

func (q *NonBlockingQueue) Poll() interface{} {
q.mu.Lock()
if len(q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0]
q.data = q.data[1:]
q.mu.Unlock()
return v
}

RWMutex 读写锁

读写分离:在读多写少的场景下,区分读写操作、只对写操作加锁,以提高读性能(Readers–writers problem)。

Go 标准库提供 RWMutex,在某一时刻只能由任意数量的 reader 持有,或者是只被单个的 writer 持有。

  • LockUnlock:写时调用,如锁已被 reader 或者 writer 持有,Lock 会一直阻塞直到能获取到锁。

  • RLockRUnlock:读时调用。如锁已被 writer 持有,RLock 方法会一直阻塞直到能获取到锁,否则就直接返回。

  • RLocker:为读操作返回一个 Locker 接口对象。其 Lock 方法会调用 RWMutexRLock 方法,Unlock 方法会调用 RWMutexRUnlock 方法。

以封装一个线程安全的计数器为例,在读时(Count)加读锁、写时(Incr)加写锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Counter struct {
mu sync.RWMutex
count uint64
}

func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}

func (c *Counter) Count() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}

func main() {
var counter Counter
for i := 0; i < 10; i++ {
go func() {
for {
counter.Count() // 读频率 1次/ms
time.Sleep(time.Millisecond)
}
}()
}
for {
counter.Incr() // 写频率 1次/s
time.Sleep(time.Second)
}
}

RWMutex 是基于 Mutex 实现的,其对写操作优先,即如果已有 writer 在等待请求锁(阻塞的 Lock 调用),则会阻止新的 reader 获取锁,优先保障 writer(注意不是抢占)。

1
2
3
4
5
6
7
8
9
type RWMutex struct {
w Mutex // 互斥锁
writerSem uint32 // writer 信号量
readerSem uint32 // reader 信号量
readerCount int32 // reader 数量(<0 表示有 writer 等待锁或占用)
readerWait int32 // writer 等待完成的 reader 数量
}

const rwmutexMaxReaders = 1 << 30

实现细节

RLockRUnlock

  • 在加锁时先对 readerCount +1。如仍 <0 则表示有 writer 等待锁,请求的 reader 会阻塞等待锁的释放。

  • 在解锁时先对 readerCount -1。如仍 <0 则表示有 writer 等待锁,检查是否所有 reader 都释放了读锁,是则唤醒请求写锁的 writer;否则 writer 等待所有 reader 释放完(writer 的优先只是针对新参加竞争的 reader 而言)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (rw *RWMutex) RLock() {
// 加锁时 reader 数 +1,如果值仍 <0,表示有 writer 等待请求锁。
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 由于 writer 优先级高,后来的 reader 都会阻塞休眠。
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
// 解锁时 reader 量 -1,如果值仍 <0,表示当前有 writer 竞争锁。
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 检查是否 reader 都释放读锁,是则唤醒请求写锁的 writer。
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 此为最后一个 reader,writer 可以获得锁。
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

Lock

  • writer 之间也有竞争,因此 writer 要取得获得内部的互斥锁。

  • 随后反转 readerCount 字段(readerCount - rwmutexMaxReaders,因此 readerCount 还表示当前有 writer 占有或等待)。

  • 如果发现此时锁正被 reader 占用,则把值保存在 readerWait,writer 进入阻塞;每当 reader 调用 RUnlock 释放读锁 readerWait -1,直到所有 reader 都释放,则唤醒该 writer。

1
2
3
4
5
6
7
8
9
10
11
12
func (rw *RWMutex) Lock() {
// 取得获得内部的互斥锁,解决其他 writer 竞争问题。
rw.w.Lock()
// 反转 readerCount,告知 reader 此时有 writer 竞争锁。
// 当前活跃的 reader 数量:即持有读锁未释放。
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

// 如果有 reader 持有锁就需要等待:需要把当前 readerCount 保存到 readerWait 字段保存,并使 writer 阻塞。
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}

Unlock

  • 占用锁的 writer 在释放时再反转 readerCount 字段,唤醒新加入竞争、阻塞的 reader。

  • 最后再释放内部的互斥锁,允许其它 writer 参与竞争。

1
2
3
4
5
6
7
8
9
func (rw *RWMutex) Unlock() {
// 告知 reader 此时没有活跃的 writer,并唤醒。
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放内部的互斥锁
rw.w.Unlock()
}

使用须知

Mutex 类似,RWMutex 也是不可复制、重入、未加锁先解锁,但其出现死锁的情况更复杂。

比如当有活跃 reader 时,writer 会等待,如果在 reader 的读操作中调用 writer 的写操作,reader 和 writer 就会形成互相依赖的死锁状态。

又比如 writer 请求锁时已经有活跃的 reader,它会等待活跃的 reader 完成才获取到锁,但是之后活跃的 reader 再依赖新的 reader,新的 reader 就会等待 writer 释放锁之后才能执行,此时就会形成循环依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 形成循环等待:
// writer 依赖活跃的 reader,活跃的 reader 依赖新来的 reader(等待退递归时释放),新来的 reader 依赖 writer。
func main() {
var mu sync.RWMutex

// 请求写锁(先等待 200ms,此时 factorial 在不断递归调用读锁)
go func() {
time.Sleep(200 * time.Millisecond)
mu.Lock()
fmt.Println("Lock")
time.Sleep(100 * time.Millisecond)
mu.Unlock()
fmt.Println("Unlock")
}()
go func() {
factorial(&mu, 10)
}()
select {}
}

func factorial(m *sync.RWMutex, n int) int {
if n < 1 {
return 0
}
fmt.Println("RLock")
// 请求读锁
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
// 递归执行,不断产生对读锁的调用。
return factorial(m, n-1) * n
}

WaitGroup 等待组

WaitGroup 用于解决并发等待问题,类似 Java 的 CountDownLatchCycliBarrier。比如任务 A 中间需要等待并行的任务 B1、B2、B3 都完成才能继续执行,使用阻塞等待可以避免对 B 的轮询,一是实时性得到保证,二是避免空耗 CPU。

其包含三个方法:

  • Add:设置计数值。

  • Done:计数值减 1(即 Add(-1))。

  • Wait:阻塞当前 goroutine,直到计数值变为 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Counter struct {
mu sync.Mutex
count uint64
}

func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}

func (c *Counter) Count() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}

func main() {
var counter Counter
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
time.Sleep(time.Second)
counter.Incr()
}()
}
// checkpoint:等待 goroutine 都完成。
wg.Wait()
fmt.Println(counter.Count())
}

实现细节

WaitGroup 内部:

  • noCopy :辅助 vet 工具检查该 WaitGroup 是否复制使用。

  • state1:包含计数、阻塞在检查点的 waiter 数和信号量。

对于自定义结构,也可以通过嵌入 noCopy 实现 vet 检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}

// 得到 state 的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 地址是 64bit 对齐,数组前两个元素用作 state,后一个元素做信号量。
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 地址是 32bit 对齐,数组后两个元素用作 state,可用来做 64bit 原子操作,第一个元素 32bit 用作信号量。
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
1
2
3
4
5
6
7
8
9
64 位对齐
state1[0] state1[1] state1[2]
waiter 数 计数值 信号量

32 位对齐
64 位对齐

state1[0] state1[1] state1[2]
信号量 waiter 数 计数值

AddDone:操作的是 state 的计数部分。可为计数值增加一个 delta 值,内部通过原子操作把该值加到计数值上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 高 32bit 是计数值,所以把 delta 左移 32,增加到计数上。
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
// 如果计数值大于 0,且没有 waiter,直接返回。
if v > 0 || w == 0 {
return
}

// state 为 waiter 的数量:将 waiter 数量设置为 0,与计数值组合 *statep 直接设置为0。此时唤醒所有 waiter。
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}

func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait:不断检查 state 的值。如果计数值为 0,即所有任务完成,直接返回。否则调用者变成等待者、加入 waiter 队列并阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()

// 期间 wait 可能被并发调用,因此循环执行。
for {
state := atomic.LoadUint64(statep)
// 取当前计数值和 waiter 数量
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// 计数值为 0, 调用方法的 goroutine 不必等待,继续执行。
return
}
// 否则 waiter 数量 +1。
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待,直到被唤醒返回。
runtime_Semacquire(semap)
return
}
}
}

使用须知

  • 由于 Add 可以传入负数(或多次调用 Done),当内部计数值 < 0,则会导致 panic

  • 虽然 Add 可以在运行过程中增加计数值,但也建议预设,即确保 Wait 方法必须在所有 Add 方法调用之后才调用。

  • 如果在使用过程中需要调用 Add 增加计数值,应避免和 Wait 操作的代码放在不同的 goroutine 中执行,即要杜绝对同一个 WaitGroup 值的两种操作的并发执行。

  • 另外如果需要重用 WaitGroup(比如类似 Java CycliBarrier),必须确保 Wait 结束后才重置 WaitGroup 的值,否则会导致 panic

1
2
3
4
5
6
7
8
9
10
11
12

func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done()
wg.Add(1)
}()
// 主 goroutine 等待,有可能和第 8 行并发执行。
wg.Wait()
}

Cond 条件

一组 goroutine 等待某个条件(bool 变量),条件满足时其中一个或者所有 goroutine 被唤醒执行。其包括以下方法:

  • Signal:调用者唤醒一个等待此 Cond 的 goroutine。如果 Cond 等待队列有一或多个等待的 goroutine,则从等待队列中移除第一个并把它唤醒(即其他语言的 notify)。
  • Broadcast:调用者唤醒所有等待此 Cond 的 goroutine(即其它语言的 notifyAll)。
  • Wait:把调用者放入 Cond 等待队列中并阻塞,直到被 SignalBroadcast 从等待队列中移除并唤醒(注意唤醒后要检查条件)。
1
2
3
4
5
type Cond
func NeWCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int
for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)
// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("goroutine #%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}
c.L.Lock()
for ready != 10 {
c.Wait()
log.Println("指挥者被唤醒一次")
}
c.L.Unlock()
log.Println("条件满足,开始执行...")
}

实现细节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Cond struct {
noCopy noCopy
// 当观察或者修改等待条件时要加锁
L Locker
// 等待队列
notify notifyList
// 运行时检查 Cond 是否被复制使用。
checker copyChecker
}

func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

func (c *Cond) Wait() {
c.checker.check()
// 添加到等待队列中
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 阻塞休眠直到被唤醒
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

Wait 把调用者加入等待队列并释放锁,被唤醒之后还会请求锁。在阻塞休眠期间,调用者不持有锁,可使其他 goroutine 有机会检查或者更新等待变量。等调用者被唤醒之后再去争抢这把锁。

使用须知

调用 Wait 时必须加锁,否则可能导致 Unlock 一个未加锁的 Locker

调用 Wait 必须检查等待条件是否满足,只有满足才执行。

Go 提供的 ChannelWaitGroup 是更广泛使用的通知机制,在 wait-notify 场景下比 Cond 更实用。而 Cond 独有的特点在于:

  • 和一个 Locker 关联,可对相关的依赖条件更改提供保护。

  • 同时支持 SignalBroadcast,而 Channel 只能同时支持一种。

  • Broadcast 可被重复调用。等待条件变成不满足后,又可以调用 Broadcast 再次唤醒等待的 goroutine。而 Channelclose 后不能再使用。

扩展:实现阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
type BlockingQueue struct {
cond *sync.Cond
data []interface{}
capacity int
logs []string
}

func NewQueue(capacity int) *BlockingQueue {
return &BlockingQueue{
cond: &sync.Cond{L: &sync.Mutex{}},
data: make([]interface{}, 0),
capacity: capacity, logs: make([]string, 0),
}
}

func (q *BlockingQueue) Add(d interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.data) == q.capacity {
q.cond.Wait()
}
q.data = append(q.data, d)
// 记录操作日志
q.logs = append(q.logs, fmt.Sprintf("En %v\n", d))
// 通知其他 waiter 进行 Poll 或 Add 操作
q.cond.Broadcast()

}

func (q *BlockingQueue) Poll() (d interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

for len(q.data) == 0 {
q.cond.Wait()
}
d = q.data[0]

q.data = q.data[1:]

// 记录操作日志
q.logs = append(q.logs, fmt.Sprintf("De %v\n", d))

// 通知其他 waiter 进行 Poll 或 Add 操作
q.cond.Broadcast()
return
}

func (q *BlockingQueue) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.data)
}

func (q *BlockingQueue) String() string {
var b strings.Builder
for _, log := range q.logs {
//fmt.Fprint(&b, log)
b.WriteString(log)
}
return b.String()
}

Once 单例

线程安全的单例一般用于初始化资源,可通过 package 级别的变量或 init 函数实现。

在创建资源(比如数据库连接等)要确保线程安全,使用懒汉式加锁的方式会有性能问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var connMu sync.Mutex
var conn net.Conn

func getConn() net.Conn {
connMu.Lock()
defer connMu.Unlock()
if conn != nil {
return conn
}
conn, _ = net.DialTimeout("tcp", "baidu.com:80", 10*time.Second)
return conn
}

func main() {
conn := getConn()
if conn == nil {
panic("conn is nil")
}
}

利用 Once 提供的 func (o *Once) Do(f func()) 可确保在多次调用中,只有首次调用时传入的 f 函数才会执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
var once sync.Once
f1 := func() {
fmt.Println("in f1")
}
once.Do(f1) // in f1

a := "xxx" // 闭包
f2 := func() {
fmt.Println(a + "in f2")
}
once.Do(f2) //
}

实现细节

自己实现 Once

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func()) {
// 双重校验
if atomic.LoadUint32(&o.done) == 0 {
// 如果有并发的 goroutine,可进入 doSlow。
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
// 再校验一次,否则可能存在后续调用 Do 方法的 goroutine 看到 done 已被设置,但获取资源时候可能会得到空,因为 f 未执行完。
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

使用须知

由于 f 方法是无参无返回值的,当执行时 panic 或初始化资源失败,Once 还是会认为初次执行已成功,再次调用 Do 方法也不会再次执行 f

可以自己实现 Once,既返回当前调用 Do 方法是否成功,还可以在初始化失败后调用 Do 方法再次尝试初始化直到成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type Once struct {
m sync.Mutex
done uint32 // 1 表示初始化。
}
// 传入函数 f 有返回值 error,用于初始化失败时返回。
func (o *Once) Do(f func() error) error {
// fast path
if atomic.LoadUint32(&o.done) == 1 {
return nil
}
// 如果未初始化则执行 slowDo。
return o.slowDo(f)
}

func (o *Once) slowDo(f func() error) error {
o.m.Lock()
defer o.m.Unlock()
var err error
// 再校验一次,否则可能存在后续调用 Do 方法的 goroutine 看到 done 已被设置,但获取资源时候可能会得到空,因为 f 未执行完。
if o.done == 0 {
err = f()
if err == nil { // 初始化成功才将标记置为已初始化。
atomic.StoreUint32(&o.done, 1)
}
}
return err
}

// ===== 但以上只是保证 `Once` 可放心地被重试,需要自行检查资源是否已被初始化:
type AnimalStore struct {
once sync.Once
inited uint32
}

func (a *AnimalStore) Init()
a.once.Do(func() {
longOperationSetupDbOpenFilesQueuesEtc()
atomic.StoreUint32(&a.inited, 1)
})
}

func (a *AnimalStore) CountOfCats() (int, error) {
if atomic.LoadUint32(&a.inited) == 0 {
return 0, NotYetInitedError
}
// ...
}

如果在标准库的 Once 上扩展,可以:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Once struct {
sync.Once
}

func (o *Once) Done() bool {
return atomic.LoadUint32((*uint32)(unsafe.Pointer(&o.Once))) == 1
}

func main() {
var flag Once
fmt.Println(flag.Done())

flag.Do(func() {
time.Sleep(time.Second)
})

fmt.Println(flag.Done())
}

另外 Go 没有提供 Immutable 类型,任何全局变量都可以修改。以全局变量形式实现的单例容易带来安全问题,因此需要控制好其作用域:比如不暴露变量本身(包内),而只对外提供 GetXxx 方法。

Pool 池

Go 采用 三色并发标记算法 实现自动标记对象和回收,但在垃圾收集阶段还是会有 STW 时间。为了减少对象创建回收开销可使用对象池。

Pool 用于保存一组可独立访问的临时(即当失去引用时会被回收)对象,其本身是线程安全的,但与其它并发原语一样不能复用。其提供以下方法:

  • New:创建对象,当调用 Get 但池中没有空闲元素时,即调用 New;如果没有设置 New,则调用 Get 时返回 nil。

  • Get:从池中取走一个对象,可能是 nil(没有设置 New 或没有空闲对象返回)。

  • Put:归还一个对象给池以供复用,如果传入 nil 会被忽略。

常见的使用场景是连接池:比如标准库提供的 Client、数据库连接池 DB、第三方库 fatih/pool(不局限于 TCP 连接)等(但由于 Pool 无通知地回收对象,连接池不建议基于 Pool 实现)。

实现细节

Pool 内部字段 localvictim 用于存储空闲元素,每次垃圾回收时,Pool 会把 victim 的对象移除,把 local 的数据移给 victim。因此 local 被清空,而 victim 存放临时对象:可在 GC 时被移除,或通过 Get 重用。

其中 GC 时 Poll 的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

func poolCleanup() {
// 丢弃当前 victim,此时 STW(所以不用加锁)。
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 将 local 复制给 victim, 并将原 local 置为 nil。
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}

其中 local 包含 poolLocalInternal 字段并提供 CPU 缓存对齐,从而避免 false sharing。其中 poolLocalInternal

  • private:表示只能由一个 P 存取的缓存元素。一个 P 同时只能执行一个 goroutine,所以不会有并发的问题。

  • shared:可以由任意的 P 访问,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail,相当于只有一个本地的 P 作为生产者,多个 P 作为消费者(使用 local-free 的 queue 列表实现)。

Get:按以下优先级取对象:本地 private -> 本地 shared -> getSlow(其它的 shared)-> New

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (p *Pool) Get() interface{} {
// 把当前 goroutine 固定在当前的 P上
l, pid := p.pin()
x := l.private // 优先从 local 的 private 取。
l.private = nil
if x == nil {
// 从 local.shared 取(从 head 读取并移除)
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
// 如果没有获取到,尝试使用New函数生成一个新的
if x == nil && p.New != nil {
x = p.New()
}
return x
}

func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize)
locals := p.local
// 从其它 proc 中尝试偷取一个元素
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

// 如果其它 proc 也没有可用元素,尝试从 vintim 中获取。
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil { // 先从 vintim 中的 local private 获取。
l.private = nil
return x
}
for i := 0; i < int(size); i++ { // 从 vintim 其它 proc 尝试偷取。
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果 victim 中都没有,则把 victim 标记为空,快速跳过后续查找。
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}

Put:设置本地 private,如已经有值就把此元素 push 到本地队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (p *Pool) Put(x interface{}) {
// 直接抛弃 nil。
if x == nil {
return
}
l, _ := p.pin()
// 如果本地 private 没有值,直接设置。
if l.private == nil {
l.private = x
x = nil
}
// 否则加入到本地队列中。
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
}

使用须知

内存泄漏:比如下面的 buffer 池,在取出 bytes.Buffer 使用时可以无限制地往里面插入大量 byte。此时即使 Reset 再放回到池中 byte slice 容量也不会改变,所占空间依然很大;由于 Pool 回收机制,大的 Buffer 可能不被回收。因此使用 Pool 回收 buffer 时,要检查回收对象大小,如果 buffer 太大就不要回收,否则太浪费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var buffers = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

func GetBuffer() *bytes.Buffer {
return buffers.Get().(*bytes.Buffer)
}

func PutBuffer(buf *bytes.Buffer) {
buf.Reset()
buffers.Put(buf)
}

内存浪费:即当池中 buffer 比较大,但实际上只用到较小的部分。可以根据对象大小把池分为几层,根据需要在指定的池中存取。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var (
bufioReaderPool sync.Pool
bufioWriter2kPool sync.Pool
bufioWriter4kPool sync.Pool
)

var copyBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 32*1024)
return &b
}
}

func bufioWriterPool(size int) *sync.Pool {
switch size {
case 2 << 10:
return &bufioWriter2kPool
case 4 << 10:
return &bufioWriter4kPool
}
return nil
}

在一些第三方库中也有这种 Pool 的实现:

Worker Pool

尽管 goroutine 很轻量级(栈 2048bytes,最大可扩展到 1GB),但大量的 goroutine 在调度和垃圾回收时的开销也会很大。因此可以创建 Worker Pool 减少 goroutine 的使用。比如用固定数量的 goroutine 处理大量连接请求:fasthttp

大部分 Worker Pool 是通过 Channel 来缓存任务(能方便地实现并发保护),有的是多个 Worker 共享同一个任务 Channel,有的是每个 Worker 都有独立的 Channel。

推荐的第三方库:

  • gammazero/workerpool:可无限制地提交任务,有更便利的 Submit 和 SubmitWait 方法提交任务,还可以提供当前 Worker 数和任务数以及关闭 Pool 的功能。

  • ivpusic/grpool:创建 Pool 时需要提供 Worker 的数量和等待执行的任务的最大数量,可直接往 Channel 放入任务。

  • dpaks/goworkers:提供了更便利的 Submit 方法提交任务以及 Worker 数、任务数等查询方法、关闭 Pool 的方法。执行结果需要在 ResultChan 和 ErrChan 中去获取,没有提供阻塞方法,但可以在初始化时设置 Worker 数量和任务数。

  • panjf2000/antsJeffail/tunnybenmanns/goworkergo-playground/poolSherifabdlnaby/gpoolalitto/pond 等。

Context 上下文

上下文即在 API 或方法调用之间,传递的除了业务参数外的信息(比如 HTTP 请求的附带信息),Go 标准库提供的 Context 甚至还提供了超时和取消机制。

在 Go 服务中,传入的请求通过创建 goroutine 去处理,在此 goroutine 中通常需要派生出额外的 goroutine 来访问其他后端,比如同时访问数据库和 RPC 服务。

这些 goroutine 通常需要访问特定于请求的值(如用户身份相关信息),而且当请求被取消或超时,处理该请求的所有 goroutine 都应该快速退出(fail fast),使系统回收其正在使用的资源。

Context 包括以下方法:

  • Deadline:返回该 Context 被取消的截止日期。如果没有设置则 ok 的值是 false。每次调用都是返回相同结果。

  • Done:返回 Channel 对象,在 Context 被取消时此 Channel 会被 close,可能会返回 nil。cancel、timeout、deadline 都可能导致 Done 被 close,每次调用都是返回相同结果。

  • Err:当 Done 被 close 时可以通过 Err 获取错误信息,否则返回 nil。

  • Value:返回此 Context中和指定 key 关联的 value。

CancelFunc 调用 WithXxx 后返回的函数句柄,用于主动让下游结束。而 Done 则是被上游通知结束。

生成顶层 Context 的方法(底层一样):

  • Background:返回非 nil 的、空的 Context。没有值、不会被 cancel、不会超时、没有截止日期。一般用于主函数、初始化、测试以及创建根 Context

  • TODO:返回非 nil 的、空的 Context。没有值、不会被 cancel、不会超时、没有截止日期。一般用于不清楚是否该用 Context,或者目前还不知道要传递什么上下文信息时。

使用时遵循规则:

  • 一般函数使用 Context 时会把它放在第一个参数的位置。
  • 不要把 nil 用做 Context 类型的参数值(可以使用 context.Background())。
  • Context 只用来临时做函数之间的上下文透传,不能持久化(数据库、本地文件、全局变量、缓存)。
  • 常使用 struct{} 定义 key 的类型。对于 exported key 的静态类型,常是接口或指针。目的是尽量减少内存分配。

特殊 Context

WithXxx 函数的功能是为父节点生成带有 Done 方法的子节点,并返回子节点的 CancelFunc 函数句柄。

WithValue

基于 parent Context 生成新的 Context,其保存了一个 key-value 键值对,用来传递上下文。它覆盖了 Value 方法,优先从自己的存储中检查 key,不存在则从 parent 中继续检查。

1
2
3
4
5
6
[context] <---+ c.Context.Value(key)
|
[context] <---+
|
[G] -----> [context]
context.Value(key)
1
2
3
4
5
6
7
8
9
10
11
type valueCtx struct {
Context
key, val interface{}
}

func (c* valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

支持链式查找。查找不存在还会向 parent Context 查找,如果 parent 还是 valueCtx 则遵循相同原则。

1
2
3
4
5
6
7
ctx = context.TODO()
ctx = context.WithValue(ctx, "key1", "0001")
ctx = context.WithValue(ctx, "key2", "0001")
ctx = context.WithValue(ctx, "key3", "0001")
ctx = context.WithValue(ctx, "key4", "0004")

fmt.Println(ctx.Value("key1"))

key 不应该是 string 或其它内建类型,否则在包之间使用 Context 时容易产生冲突(如果能保证 key 不会冲突,则可以使用自定义类型或内建类型)。

WithCancel

方法返回 parent 的副本,副本中的 Done Channel 是新建对象,类型是 cancelCtx。常在需要主动取消长时间的任务时使用:创建 Context,把它传给长时间执行任务的 goroutine。需要中止任务时就可以 cancel 这个 Context,长时间执行任务的 goroutine 就可以通过检查它,知道 Context 已经被取消。

WithCancel 返回的第二个值是一个 cancel 函数。只要任务正常完成就需要调用 cancel,这个 Context 才能释放资源(通知 children 处理 cancel,从它的 parent 中把自己移除,释放相关的 goroutine)。

其内部实现:

1
2
3
4
5
6
7
8
9
10
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)// 把 c 向上传播
return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx 返回初始化的 cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

cancelCtx 被取消时,Err 就是这个 Canceled 错误:

1
var Canceled = errors.New("context canceled")

WithTimeout

WithDeadline 一样,其参数是超时时间。超时时间加上当前时间即截止时间,WithTimeout 的实现:

1
2
3
4
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
// 当前时间 + timeout 就是 deadline
return WithDeadline(parent, time.Now().Add(timeout))
}

WithDeadline

WithDeadline 返回一个 parent 的副本,并设置不晚于参数 d 的截止时间,类型为 timerCtx 或 cancelCtx。如果其截止时间晚于 parent 的截止时间,就以 parent 的截止时间为准,并返回一个类型为 cancelCtx 的 Context(parent 的截止时间到了就会取消这个 cancelCtx)。

如果当前时间已经超过了截止时间就直接返回一个已经被 cancel 的 timerCtx;否则启动一个定时器,到截止时间取消这个 timerCtx。

因此触发 timerCtx 的 Done 被 close:

  • 截止时间到。

  • cancel 函数被调用。

  • parent 的 Done 被 close。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 如果 parent 截止时间更早,返回一个 cancelCtx 即可
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c) // 同 cancelCtx 的处理逻辑
dur := time.Until(d)
if dur <= 0 { // 当前时间已超过了截止时间,直接 cancel
c.cancel(true, DeadlineExceeded)
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 设置定时器到截止时间后取消
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

使用方法:

1
2
3
4
5
func slowOperationWithTimeout(ctx context.Context) (Result, error) {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel() // 一旦 slowOperation 完成就调用 cancel
return slowOperation(ctx)
}

管理 goroutine 生命周期

把 Context 传递给 goroutine,由 goroutine 检查 Context 的 Done 是否关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer func() {
fmt.Println("goroutine exit")
}()
for {
select {
case <-ctx.Done():
return
default:
time.Sleep(time.Second)
}
}
}()
time.Sleep(time.Second)
cancel()
time.Sleep(2 * time.Second)
}

使用须知

参数传递

context.WithValue 每次调用都会创建新的对象,以链表关联。这种结构是线程安全,支持并发访问,但由于访问效率较低,并不适合大量地调用(可以传递 map 或 struct)。

传递的数据是面向请求,适用于链路追踪、多租户染色发布等。不应该作为函数的可选参数来使用(比如 Context 中存放 sql.Tx 对象传递到 data 层使用等),因为元数据是隐式、面向请求的旁路信息挂载,而函数参数是显式、用于处理业务流程的。

同一个 Context 对象可以传递给在不同 goroutine 中运行的函数,对于多个 goroutine 同时使用是安全的。其中 value 应该是 immutable 的,即 context.WithValue(ctx, oldvalue)。如果携带了集合类型的数据,要注意到读写操作可能造成 data race,可以用 COW(Copy on Write)的思路解决。

级联取消

当一个 Context 被取消时,从它派生的所有 Context 也将被取消。WithCancel(ctx) 参数 ctx 认为是 parent ctx,在内部会进行传播关系链的关联。调用 Done() 会返回一个 chan,当取消某个 parent context,实际上会递归层层 cancel 关闭自己 child context 的 done chan,从而让整个调用链中所有监听 cancel 的 goroutine 退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <- ctx.Done():
return
case dst <- n:
n++
}
}
}()
return dst
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
break
}
}

超时调用

利用 parent/child 机制,只需要启动一个定时器,每当超时直接将当前的 Context cancel,就可以使监听在当前和下层的 context.Done() 的 goroutine 退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const shortDuration = 1 * time.Millisecond

d := time.Now().Add(shortDuration)
ctx, cancel := context.WithDeadline(context.Background(), d)

defer cancel()

select {
// 超时未完成
case <- time.After(1 * time.Second):
fmt.Println("overslept")
// 完成
case <- ctx.Done():
fmt.Println(ctx.Err())
}

Context 设计带超时的服务调用,并不意味着会通知远程服务就会自动取消这次调用,一般只是避免客户端长时间等待,服务端依然还在处理请求。所以有时 Context 并不会减少对服务端的请求负担。

如果在 Context 被 cancel 时能关闭和服务端的连接、中断和数据库的通讯、停止对本地文件的读写(依赖于超时的底层处理机制),才能减少服务调用的压力。

作用域

Context 集成到 API 中时,需要注意其作用域是请求级别的。比如贯穿请求处理过程,但于数据模型存在则没有意义,因此不应该将其存储在结构体中或持久化。但有例外是该结构纯粹用作通过信道传递的消息。

1
2
3
4
5
type message struct {
responseChan chan<- int
parameter string
ctx context.Context
}

可用以下方法在 API 中集成 Context 对象:

  • 首参数传递 Context 对象,参考 net 包 func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error)。可通过 context 对象取消函数调用。

  • 在第一个 Request 对象中携带可选的 Context 对象。例如 net/http 包的 func (r *Request) WithContext(ctx context.Context) *Request,通过携带给定的 Context 对象返回一个新的 Request 对象。

Atomic 原子操作

原子操作即由 CPU 提供基础支持、确保指令必然完全执行/不执行的操作。即使在多处理器、多核、有 CPU cache(多 goroutine 访问),该操作也能保证原子性。

Go 标准库 sync/atomic 为 int32、int64、uint32、uint64、uintptr、Pointer 等类型提供了原子操作方法:AddXXX、CompareAndSwapXXX、SwapXXX、LoadXXX、StoreXXX 等,其操作对象是 变量的地址

Add:加法运算的第一个参数 delta 可以是负数,相当于减法。

1
func AddInt32(addr *int32, delta int32) (new int32)

对于无符号整型,可以利用补码规则:

1
2
3
4
5
6
7
AddUint32(&x, ^uint32(c-1))

// -1
AddUint32(&x, ^uint32(0))

// 比如执行 -n:
AddUint32(&x, ^uint32(-n-1)))

CAS:比较替换,只有 addr 的值为 old,才把它修改为 new 并返回 true,否则返回 false(同理还有 Swap,区别是直接替换、不比较)。

1
func CompareAndSwapInt32(addr *int32, new int32) (swapped bool)

LoadStore:取出 addr 地址中的值,以及把一个值存入到指定的 addr 地址中。

1
2
func LoadInt32(addr *int32) (val int32)
func StoreInt32(addr *int32, val int32)

Value:支持原子地存取对象类型(非 nil,不能 CASSwap),常用于配置变更等场景中。

参考以下实现配置变更的例子:

  • 定义 Value 类型的变量 config, 用于存储配置信息。

  • 启动一个 goroutine,让它随机 sleep 一段时间,之后变更配置,并通过 Cond 通知其它 reader 加载新配置。

  • 启动一个 goroutine 等待配置变更的信号,一旦有变更就会加载最新的配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type Config struct {
NodeName string
Addr string
Count int32
}

func loadNewConfig() Config {
return Config{
NodeName: "北京",
Addr: "10.77.95.27",
Count: rand.Int31(),
}
}

func main() {
var config atomic.Value
config.Store(loadNewConfig())
var cond = sync.NewCond(&sync.Mutex{})

// 设置新 config
go func() {
for {
time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
config.Store(loadNewConfig())
cond.Broadcast() // 通知等待着配置已变更
}
}()
go func() {
for {
cond.L.Lock()
cond.Wait() // 等待变更信号
c := config.Load().(Config) // 读取新的配置
fmt.Printf("new config: %+v\n", c)
cond.L.Unlock()
}
}()
select {}
}

在操作系统中,write 的地址基本上是对齐的(aligned)。 比如 32 位操作系统、CPU 和编译器,write 的地址总是 4 的倍数,64 位系统则总是 8 的倍数。这种情况下通过一个指令就可以实现地址的写操作,不会导致其他线程看到部分写。如果地址不是对齐,处理器需要分成两个指令处理,如果只执行了一个指令便被其它线程看到,被称为 撕裂写(torn write) 。因此赋值操作必须是原子操作,才能保证数据完整。

对于多核处理器,某个核对地址的值的更改在更新到主内存中前,是在多级缓存中存放的,此时其它核看到的数据可能不一样。由于 cache、指令重排,可见性等问题的存在,实现原子性依赖于内存屏障(memory fence):写内存屏障告知处理器,必须等到其管道中未完成的写被刷新到内存中再进行操作。此操作会让其它处理器的缓存失效,以便使之从主内存中获取最新值。

atomic 包方法会提供内存屏障功能,所以可以保证赋值的完整性,还能保证可见性:某个核更新了某地址的值,其它处理器总是能读取到它的最新值。需要注意的是,处理器之间保证数据的一致性也是会降低性能的。

功能扩展

一些第三方库对标准库的原子操作 API 提供了进一步封装。比如 uber-go/atomic 定义和封装了几种与常见类型对应的原子操作类型:Bool、Duration、Error、Float64、Int32、Int64、String、Uint32、Uint64 等。以及原子操作方法:CAS、Store、Swap、Toggle 等。其使用也更友好:

1
2
3
4
var running atomic.Bool
running.Store(true)
running.Toggle()
fmt.Println(running.Load()) // false

扩展:实现无锁队列

原子操作常用来实现 Lock-Free 数据结构,相比起同步的数据结构往往有更高的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
type LKQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
}

// 通过链表实现,这个数据结构代表链表中的节点
type node struct {
value interface{}
next unsafe.Pointer
}

func NewLKQueue() *LKQueue {
n := unsafe.Pointer(&node{})
return &LKQueue{head: n, tail: n}
}

func (q *LKQueue) Add(v interface{}) {
n := &node{value: v}
for {
tail := load(&q.tail)
next := load(&tail.next)
if tail == load(&q.tail) {
if next == nil { // 还没有新数据入队
if cas(&tail.next, next, n) { // 增加到队尾
cas(&q.tail, tail, n) // 入队成功,移动尾巴指针
return
}
} else { // 已有新数据加到队列后面,需要移动尾指针
cas(&q.tail, tail, next)
}
}
}
}

func (q *LKQueue) Poll() interface{} {
for {
head := load(&q.head)
tail := load(&q.tail)
next := load(&head.next)
if head == load(&q.head) {
if head == tail { // head 和 tail一样
if next == nil { // 空队列
return nil
}
// 只是尾指针还未调整,尝试调整它指向下一个。
cas(&q.tail, tail, next)
} else {
// 读取出队的数据,头指针移动到下一个。
v := next.value
if cas(&q.head, head, next) {
return v
}
}
}
}
}

func load(p *unsafe.Pointer) (n *node) {
// 将 unsafe.Pointer 原子加载转换成 node
return (*node)(atomic.LoadPointer(p))
}

func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
// 封装 CAS,避免直接将 *node 转换成 unsafe.Pointer
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
}

使用须知:

  • 不要把内部使用的原子值暴露给外界。

  • 如果不得不让包外或模块外的代码使用原子值,可声明一个包级私有的原子变量,再通过公开的函数让外界间接地使用到它,但不要把原子值传递到外界(包括指针)。

  • 如果通过某个函数可以向内部的原子值存储值,应该在这个函数中先判断被存储值类型的合法性,避免造成 panic

  • 可以把原子值封装到结构体类型,既可以通过该类型的方法更安全地存储值,又可以在该类型中包含可存储值的合法类型信息。

Semaphore 信号量

信号量表示为使用一个(范围 [0, n] 的)变量实现并发控制的能力。当 goroutine 完成对此信号量等待(wait)时该计数值就减 1,对此信号量释放(release)时,该计数值就加 1。只有当计数器大于 0,等待的 goroutine 才有可能成功返回。

更复杂的信号量可以使用抽象数据类型代替变量,用来代表复杂的资源类型。

P/V 操作

信号量支持以下操作:

  • 初始化:设定初始的资源数量。

  • P:信号量计数值 -1,如果新值为负,则调用者被阻塞并加入到等待队列中。否则调用者继续执行,并获得一个资源。

  • V:信号量计数值 +1,如果先前计数值为负,表示当前有等待的 P 操作调用者,则会从等待队列中取出一个等待的调用者唤醒,使之继续执行。

信号量可分为 计数信号量(counting semaphre)和 二进位信号量(binary semaphore)。计数信号量的计数值可以是任意整数,在特殊的情况下只为 0 或 1,则为二进位信号量,可提供互斥功能。

因此有时互斥锁也会使用二进位信号量实现(常用于保护一组资源,比如数据库连接池、客户端连接等),此时 P/V 操作相当于互斥锁的 Lock/Unlock。

在 Windows 中,互斥锁只能由持有锁的线程释放。而二进位信号量则没有这个限制。但对于 Go 而言,互斥锁也可以由非持锁 goroutine 释放,所以在行为上没有严格区别。

标准库实现

Go 标准库提供的 Mutex 内部使用信号量控制 goroutine 的等待和唤醒,并实现 P/V 操作的函数:

1
2
3
4
5
6
7
8
type Mutex struct {
state int32
sema uint32
}

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

但只作内部使用,没有封装对外暴露。要使用信号量可以用扩展包 semaphore 提供的 Weighted

1
2
3
4
5
6
type Weighted struct {
size int64 // 最大资源数
cur int64 // 当前已被使用的资源
mu sync.Mutex // 互斥锁,对字段的保护
waiters list.List // 等待队列
}

提供以下方法:

  • Acquire:相当于 P 操作,可一次获取多个资源,如果没有足够多的资源调用者就会被阻塞。第一个参数是 Context,用于增加超时或 cancel 机制。如果正常获取资源就返回 nil;否则返回 ctx.Err() 且信号量不改变。其内部实现需要监控资源是否可用,以及检测 Context 的 Done 是否已关闭。

  • Release:相当于 V 操作,可将 n 个资源释放、返还给信号量。如果调用该方法时,传递比请求到的数量更大的数值就会 panic

  • TryAcquire:尝试不阻塞地获取 n 个资源,要么成功获取 n 个资源返回 true,要么获取失败返回 false。

其中 AcquireRelease 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
// fast path, 如果有足够的资源,都不考虑 ctx.Done 的状态,将 cur 加上 n 就返回。
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}

// 如果是不可能完成的任务,请求的资源数大于能提供的最大的资源数
if n > s.size {
s.mu.Unlock()
// 依赖 ctx 的状态返回,否则一直等待
<-ctx.Done()
return ctx.Err()
}

// 否则就需要把调用者加入到等待队列中。创建一个 ready chan,以便被通知唤醒。
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()

// 等待
select {
case <-ctx.Done(): // context的Done被关闭
err := ctx.Err()
s.mu.Lock()
select {
case <-ready: // 如果被唤醒了,忽略ctx的状态
err = nil
default: // 通知 waiter
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// 通知其它的 waiters,检查是否有足够的资源。
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready: // 被唤醒
return nil
}
}

func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}

func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}

w := next.Value.(waiter)
if s.size-s.cur < w.n {
// 避免饥饿,这里还是按照先入先出的方式处理(否则就会优先满足较小的需求,使得大的需求永远不能满足)。
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}

利用信号量实现 Worker Pool:

  • 主 goroutine 是负责任务分发的 dispacher。先请求信号量,获取成功则启动一个 goroutine 处理计算,该 goroutine 会释放信号量(信号量的获取在主 goroutine,释放则在 worker goroutine);如获取失败就等到有信号量可用时再获取。

  • 在实际应用中如果想等所有 Worker 执行完,可获取最大计数值的信号量(表示所有申请的都已释放)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var (
maxWorkers = runtime.GOMAXPROCS(0) // worker 数量
sema = semaphore.NewWeighted(int64(maxWorkers)) // 信号量
task = make([]int, maxWorkers*4) // 任务数
)

func main() {
ctx := context.Background()
for i := range task {
// 如果无可用 worker 则会阻塞,直到某个 worker 被释放。
_ := sema.Acquire(ctx, 1)

// 启动 worker goroutine。
go func(i int) {
defer sema.Release(1)
time.Sleep(100 * time.Millisecond) // 模拟耗时操作
task[i] = i + 1
}(i)
}

// 一次请求所有 worker,可确保前面的 worker 都执行完。
if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("获取所有 worker 失败: %v", err)
}
fmt.Println(task)
}

使用信号量需要注意,当公平性和安全性损害会导致程序 panic。常见错误:

  • 请求资源但没有释放。

  • 释放从未请求的资源。

  • 长时间持有资源(即使不需要)。

  • 不持有资源却直接使用。

  • 使用不同的信号量控制不同的资源时,也可能导致死锁。

因此必须确保按需请求、按量释放。

使用 Channel 实现

使用带 buffer 的 Channel 可以实现信号量(其中 P/V 操作以 Lock/Unlock 表示 ,且无法提供一次性申请多个的功能):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type semaphore struct {
sync.Locker
ch chan struct{}
}

func NewSemaphore(capacity int) sync.Locker {
if capacity <= 0 {
capacity = 1
}
return &semaphore{ch: make(chan struct{}, capacity)}
}

func (s *semaphore) Lock() {
s.ch <- struct{}{}
}

func (s *semaphore) Unlock() {
<-s.ch
}

由于带 buffer 的 Channel 本身就能表示多个资源,更建议直接使用 Channel,除非有必要一次性申请多个。

除此之外还有第三方库 marusama/semaphore,推荐在资源数量不固定、动态变化的场景使用。

SingleFlight 请求合并

多个 goroutine 同时调用同一个函数时,SingleFlight 可限制只有一个 goroutine 调用,当该 goroutine 返回,再把结果返回给其它 goroutine,可减少并发调用数。

区别于 sync.Once 保证永远只执行一次,SingleFlight 保证多个请求同时调用时只执行一个。前者常用在单次初始化场景,而后者常用于合并并发请求的场景(比如缓存)。尤其当面对类似秒杀等大量并发读时,可有效减缓后端服务压力。

实现原理

SingleFlight 使用互斥锁 Mutex(提供并发读写保护)和 Map(保存同一个 key 的正在处理请求)实现。其数据结构是 Group,并提供以下方法:

  • Do:执行函数并返回其执行结果。需提供用于确保在同时只有一个请求执行的 key,相同 key 的其它请求会等待该执行请求返回结果。指定 fn 无参函数,返回执行结果或 error。Do 方法返回 fn 执行的结果,以另一个返回值 shared 指示是否返回给多个请求。

  • DoChan:类似 Do,区别是返回一个 chan。等 fn 函数执行完,可从该 chan 接收结果。

  • Forget:告知 Group 忘记指定 key,之后该 key 请求会执行 fn,而不是等待前一个未完成的 fn 函数返回结果。

SingleFlight 内部使用辅助结构 call,代表正在执行、或已完成的 fn 函数请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 表示一个正在处理或已经处理完的请求。
type call struct {
wg sync.WaitGroup

// 表示处理完的值,在 waitgroup 完成前只会写一次,在 waitgroup 完成后读取。
val interface{}
err error

// 指示当 call 在处理时是否要忘掉这个 key。
forgotten bool
dups int
chans []chan<- Result
}

// 表示一个 singleflight 对象
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}

Do 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 如果已存在相同的 key:等待该 key 的第一个请求完成,使用第一个 key 的请求结果返回。
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call) // 第一个请求,创建一个 call
c.wg.Add(1)
g.m[key] = c // 加入到 key map 中
g.mu.Unlock()
g.doCall(c, key, fn) // 调用方法
return c.val, c.err, c.dups > 0
}

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
// 默认情况下 forgotten==false,delete 会被调用。即第一个请求完成后,后续同一个 key 的请求又重新开始 fn 的调用。
if !c.forgotten {
delete(g.m, key)
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}

应用场景

在 Go 标准库中也用到 SingleFlight

  • 在 net/lookup.go 中,如果同时有查询同一个 host 的请求,lookupGroup 会把请求合并。

  • Go 在查询仓库版本信息时,将并发的请求合并成 1 个。

SingleFlight 最适合用于解决缓存击穿:并发的请求可以共享同一个结果,避免大量请求不经缓存直接落到数据库上。而且由于是缓存查询,不用考虑幂等性问题。比如缓存框架 groupcache:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error)

type Group struct {
// loadGroup ensures that each key is only fetched once
// (either locally or remotely), regardless of the number of
// concurrent callers.
loadGroup flightGroup
// ......
}

func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// 从 cache、peer、local 尝试查询 cache
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}

其它项目比如 Cockroachdb、CoreDNS 都有 SingleFlight 的应用。但这一般只用于并发读场景,或是设置单一值的并发写场景。而对于并发的增减写操作则不合适(存在幂等性问题)。

CyclicBarrier 循环栅栏

CyclicBarrier 由第三方库 marusama/cyclicbarrier 提供,类似 WaitGroup,常用于等待多个 goroutine 并发执行,而前者更适用于 固定数量的 goroutine 等待同一执行点 的场景,在放行 goroutine 后还能更方便地重复利用(在重用时后者还要处理重置计数值时的并发问题);后者更适用于 一个 goroutine 等待一组 goroutine 到达同一执行点 的场景。

两者对应关系:

CyclicBarrier WaitGroup
New(n) var wg WaitGroup(n)
wg.Add(n)
Await wg.Done
wg.Wait
wg.Add(b)

实现原理

CyclicBarrier 的初始化方法:

  • New:只需要一个参数指定参与者数量。

  • NewWithAction:额外提供一个函数,用于在每次到达执行点时执行一次。时间点最后一个参与者到达后、其它参与者未被放行前。可利用它实现放行前的共享状态更新等操作。

CyclicBarrier 定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type CyclicBarrier interface {
// 等待所有的参与者到达,如果被 ctx.Done() 中断会返回 ErrBrokenBarrier
Await(ctx context.Context) error

// 重置循环栅栏到初始化状态。如果当前有等待者,那它们会返回ErrBrokenBarrier
Reset()

// 返回当前等待者的数量
GetNumberWaiting() int

// 参与者的数量
GetParties() int

// 循环栅栏是否处于中断状态
IsBroken() bool
}

基本应用

在使用时循环栅栏的参与者只需调用 Await 等待,当所有参与者到达后再执行下一步。当执行下一步时,循环栅栏的状态又恢复到初始状态,可迎接下一轮同样数量的参与者。

假设需求如下:

  • 工厂提供多条生产线,每条负责生产氧原子(N 条)或氢原子(2N 条),各由一个 goroutine 负责。

  • 通过一个栅栏,只有一个氧原子和两个氢原子准备好,才能生成出一个水分子,否则所有生产线都处于等待状态。

  • 水分子是逐个按照顺序产生的(原子种类和数量有要求)。

需要引入:

  • 信号量 semaH:控制氢原子。空槽数资源数设置为 2。

  • 信号量 semaO:控制氧原子。空槽数资源数设置为 1。

  • 循环栅栏:等待两个氢原子和一个氧原子填补空槽,直到任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
type H2O struct {
semaH *semaphore.Weighted
semaO *semaphore.Weighted
b cyclicbarrier.CyclicBarrier
}
func New() *H2O {
return &H2O{
semaH: semaphore.NewWeighted(2),
semaO: semaphore.NewWeighted(1),
b: cyclicbarrier.New(3),
}
}

氢原子与氧原子的流水线:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (h2o *H2O) hydrogen(releaseHydrogen func()) {
h2o.semaH.Acquire(context.Background(), 1)
releaseHydrogen() // 输出 H
h2o.b.Await(context.Background()) // 等待栅栏放行
h2o.semaH.Release(1) // 释放氢原子空槽
}

func (h2o *H2O) oxygen(releaseOxygen func()) {
h2o.semaO.Acquire(context.Background(), 1)
releaseOxygen() // 输出 O
h2o.b.Await(context.Background()) // 等待栅栏放行
h2o.semaO.Release(1) // 释放氢原子空槽
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func TestWaterFactory(t *testing.T) {
// 存放水分子结果。
var ch chan string
releaseHydrogen := func() {
ch <- "H"
}
releaseOxygen := func() {
ch <- "O"
}

// 每个 goroutine 并发产生一个原子。
var N = 100
ch = make(chan string, N*3)
h2o := New()

// 等待所有 goroutine 完成。
var wg sync.WaitGroup
wg.Add(N * 3)

// 氢原子 goroutine。
for i := 0; i < 2*N; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.hydrogen(releaseHydrogen)
wg.Done()
}()
}
// 氧原子 goroutine。
for i := 0; i < N; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.oxygen(releaseOxygen)
wg.Done()
}()
}

//等待所有的 goroutine 执行完
wg.Wait()
if len(ch) != N*3 {
t.Fatalf("expect %d atom but got %d", N*3, len(ch))
}

// 分组检查结果。
var s = make([]string, 3)
for i := 0; i < N; i++ {
s[0] = <-ch
s[1] = <-ch
s[2] = <-ch
sort.Strings(s)
water := s[0] + s[1] + s[2]
if water != "HHO" {
t.Fatalf("expect a water molecule but got %s", water)
}
}
}

ErrGroup 子任务编排

ErrGroup 在 Go 的扩展库中提供,常用于将一个通用的父任务拆成几个小任务并发执行的场景。相比起 WaitGroup 其功能更丰富:

  • Context 集成。

  • error 向上传播,可把子任务的错误传递给 Wait 的调用者。

提供三个方法:

  • WithContext:传入 Context 以创建 Group 对象。其返回一个 Group 实例以及一个使用 context.WithCancel(ctx) 生成的新 Context。一旦有一个子任务返回错误或是 Wait 调用返回,新 Context 就会被 cancel。如果传递参数是可 cancelContext,则其被 cancel 时不会终止正在执行的子任务。

  • Go:传入一个无参、带 error 返回值的函数,创建 goroutine 执行子任务。

  • Wait:阻塞直到所有子任务完成后才返回。如多个子任务返回错误,则只会返回第一个出现的错误,所有子任务都执行成功则返回 nil。

默认只返回第一个错误,如要实现收集所有子任务执行结果,可使用全局变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
var g errgroup.Group
var result = make([]error, 3)

// 第一个子任务执行成功。
g.Go(func() error {
time.Sleep(5 * time.Second)
fmt.Println("exec #1")
result[0] = nil // 保存成功或者失败的结果
return nil
})

// 第二个子任务执行失败。
g.Go(func() error {
time.Sleep(10 * time.Second)
fmt.Println("exec #2")
result[1] = errors.New("failed to exec #2")
return result[1]
})

// 第三个子任务执行成功。
g.Go(func() error {
time.Sleep(15 * time.Second)
fmt.Println("exec #3")
result[2] = nil
return nil
})

// err 为第一个错误。
if err := g.Wait(); err == nil {
fmt.Printf("Successfully exec all. result: %v\n", result)
} else {
fmt.Printf("failed: %v\n", result)
}

实现 Pipeline

官方文档提供的 例子:一个子任务遍历目录下的文件,把遍历出的文件交给 20 个 goroutine、并行计算文件的 md5。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func main() {
m, err := MD5All(context.Background(), ".")
if err != nil {
log.Fatal(err)
}

for k, sum := range m {
fmt.Printf("%s:\t%x\n", k, sum)
}
}

type result struct {
path string
sum [md5.Size]byte
}

// 遍历根目录下所有的文件和子目录,计算它们的 md5 值。
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
g, ctx := errgroup.WithContext(ctx)
paths := make(chan string)

// 遍历文件路径:写入 paths。
g.Go(func() error {
defer close(paths)
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
// ... 将文件路径放入 paths
return nil
})
})

// 并行从 paths 取出文件、计算 md5,将结果写入 c。
c := make(chan result)
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
g.Go(func() error {
for path := range paths { // 遍历直到 paths chan 被关闭
// ... 计算 path 的 md5 值,放入 c 中。
}
return nil
})
}

// 等待遍历目录和计算 md5 的 goroutine 执行完成,关闭 c。
go func() {
g.Wait()
close(c)
}()

// 将 md5 结果从 chan 中读取到 map,直到 c 被关闭。
m := make(map[string][md5.Size]byte)
for r := range c {
m[r.path] = r.sum
}

// 再次调用 Wait,依然可以得到 group 的 error 信息
if err := g.Wait(); err != nil {
return nil, err
}
return m, nil
}

ErrGroup 也存在一些问题:如果无限制地直接调用 Go 方法,就会创建出非常多 goroutine,会带来调度和 GC 的压力、占用更多内存。当前 Go 运行时创建的 g 对象只会增长和重用,不会回收。在高并发场景下要尽可能减少 goroutine 的使用(比如使用 Work Pool 或类似 containerd/stargz-snapshotter 的方案,信号量的资源数就是可并行的 goroutine 的数量)。

扩展库

很多在 Go 官方 ErrGroup 基础上进行扩展、或自行实现分组功能的第三方库,提供了更多更丰富的功能:

  • bilibili/errgroup:可以使用固定数量的 goroutine 处理子任务。并提供了 cancel(失败的子任务可以 cancel 所有正在执行任务)和 recover(把 panic 的堆栈信息放到 error 中,避免子任务 panic 导致的程序崩溃)功能。但一旦设置了并发数,超过并发数的子任务需要等到调用者调用 Wait 之后才执行(而不是 goroutine 空闲下来就自动执行)。而且在高并发下如果任务数大于 goroutine 数,且任务被集中加入到 Group 中,该库会把子任务加入到一个非线程安全的数组。

  • neilotoole/errgroup:对标准库 WaitGroup 的扩展。其 Wait 方法可返回一或多个 error。子任务在调用 Done 之前可把自己的 error 信息设置给 ErrGroup。在 Wait 返回时,把这些 error 信息返回给调用者。

  • go-pkgz/syncs:提供了 SizedGroupErrSizedGroup,支持直接控制子任务的并发数(而不是 goroutine 数),并提供相应的 failfast 的错误处理能力。

  • vardius/gollback:用于处理一组子任务的执行,且解决了 ErrGroup 收集子任务返回结果的痛点。其提供三个方法:

    • All:等待所有的异步函数都执行完才返回执行结果和错误信息,且返回结果的顺序与传入函数的顺序保持一致。

    • Race:与 All 方法类似,但在使用时只要一个异步函数执行成功就马上返回,不会返回所有的子任务信息。只有全部失败才返回最后一个 error 信息。

    • Retry:执行一个子任务,执行失败就会尝试一定次数(为 0 则一致尝试直到成功),如一直不成功就会返回错误信息 ,否则它会立即返回。

  • AaronJan/Hunch:和 gollback 类似,但提供的方法更多(AllTakeLastRetryWaterfall 等)。

  • mdlayher/schedgroup:提供与时间相关、处理一组 goroutine 的并发原语 schedgroup,可指定任务在某时间后执行(提供 DelayScheduleWait 等方法)。

待续。

参考

CATALOG
  1. 1. Go 并发原语
    1. 1.1. Mutex 互斥锁
      1. 1.1.1. 实现细节
      2. 1.1.2. 使用须知
      3. 1.1.3. 扩展:实现 TryLock
      4. 1.1.4. 扩展:获取 Waiter 数量等指标
      5. 1.1.5. 扩展:实现线程安全队列
    2. 1.2. RWMutex 读写锁
      1. 1.2.1. 实现细节
      2. 1.2.2. 使用须知
    3. 1.3. WaitGroup 等待组
      1. 1.3.1. 实现细节
      2. 1.3.2. 使用须知
    4. 1.4. Cond 条件
      1. 1.4.1. 实现细节
      2. 1.4.2. 使用须知
      3. 1.4.3. 扩展:实现阻塞队列
    5. 1.5. Once 单例
      1. 1.5.1. 实现细节
      2. 1.5.2. 使用须知
    6. 1.6. Pool 池
      1. 1.6.1. 实现细节
      2. 1.6.2. 使用须知
      3. 1.6.3. Worker Pool
    7. 1.7. Context 上下文
      1. 1.7.1. 特殊 Context
        1. 1.7.1.1. WithValue
        2. 1.7.1.2. WithCancel
        3. 1.7.1.3. WithTimeout
        4. 1.7.1.4. WithDeadline
      2. 1.7.2. 管理 goroutine 生命周期
      3. 1.7.3. 使用须知
        1. 1.7.3.1. 参数传递
        2. 1.7.3.2. 级联取消
        3. 1.7.3.3. 超时调用
        4. 1.7.3.4. 作用域
    8. 1.8. Atomic 原子操作
      1. 1.8.1. 功能扩展
      2. 1.8.2. 扩展:实现无锁队列
    9. 1.9. Semaphore 信号量
      1. 1.9.1. P/V 操作
      2. 1.9.2. 标准库实现
      3. 1.9.3. 使用 Channel 实现
    10. 1.10. SingleFlight 请求合并
      1. 1.10.1. 实现原理
      2. 1.10.2. 应用场景
    11. 1.11. CyclicBarrier 循环栅栏
      1. 1.11.1. 实现原理
      2. 1.11.2. 基本应用
    12. 1.12. ErrGroup 子任务编排
      1. 1.12.1. 实现 Pipeline
      2. 1.12.2. 扩展库
    13. 1.13. 参考