本文整理了锁的分类与实现,抽取了 golang 1.12 的源码分析,

什么是锁?

当我们在谈论锁的时候,我们在谈什么?

Wiki

In computer science, a lock or mutex (from mutual exclusion) is a synchronization mechanism for enforcing limits on access to a resource in an environment where there are many threads of execution. A lock is designed to enforce a mutual exclusion concurrency control policy.

当我们的程序处于并发执行的过程中,由于并发执行的不确定性。对于多进程(线程)共享的资源访问时我们就需要对其进行同步处理,使并发结果符合预期。而锁就是一种同步机制,其设计的目的就是强制执行互斥并发控制策略.

操作系统的锁

  1. 硬件

    • 单处理器: 修改共享变量时禁止中断出现, 修改结束后允许中断。(但是屏蔽中断后,时钟中断也会被屏蔽。相当于这个时候我们把CPU交给了这个进程,他不会因为CPU时钟走完而切换,如果其不再打开中断,后果将是非常可怕的。总之,把屏蔽中断的权力交给用户级进程是很不明智的。)
    • 多处理器: 屏蔽中断只会对执行了中断指令的那个CPU有效,但是其他CPU还是有可能会改变共享内存中的数据的。许多现代计算机系统都提供了特殊硬件指令以允许能 原子地(不可中断地) 检查和修改字的内容或交换两个字的内容(比如 CAS,compare and swap)很多锁的软件方案都是通过调用原子操作(例如 Java:AtomicInteger 原子整数)来实现的。

    CAS

  2. 软件

    • 信号量: 信号量是E. W. Dijkstra在1965年提出的一种方法,它使用一个整型变量来累计唤醒次数。一个信号量的取值可以为0(表示没有保存下来的唤醒次数)或者为正值(表示有一个或者多个唤醒操作)。信号量除了初始化外只能通过两个标准原子操作:wait()和signal()来访问。这些操作原来被称为P(荷兰语Proberen,测试)和V(荷兰语Verhogen,增加).wait()和signal()的定义可表示如下:
    1
    2
    3
    4
    5
    6
    7
    8
    
    wait(S) {
        while(S<=0)
            ;   //no-op
        S--
    }
    signal(S) {
        S++
    }
    • 互斥量: 互斥量可以认为是取值只有0和1的信号量。我们经常使用就是这个。使用其来同步的代码如下
    1
    2
    3
    4
    5
    6
    
    do {
        waiting(mutex);
            //critical section
        signal(mutex);
            //remainder section
    }while(TRUE)

锁的种类

根据表现形式,常见的锁有互斥锁、自旋锁、读写锁。

  • 互斥锁。只有取得互斥锁的进程才能进入临界区,无论读写。

  • 自旋锁。自旋锁是指在进程试图取得锁失败的时候选择忙等待而不是阻塞自己。选择忙等待的优点在于如果该进程在其自身的CPU时间片内拿到锁(说明锁占用时间都比较短),则相比阻塞少了上下文切换。

    画外音: 注意这里还有一个隐藏条件:多处理器。因为单个处理器的情况下,由于当前自旋进程占用着CPU,持有锁的进程只有等待自旋进程耗尽CPU时间才有机会执行,这样CPU就空转了。

  • 读写锁。读写锁要根据进程进入临界区的具体行为(读,写)来决定锁的占用情况。这样锁的状态就有三种了:读模式加锁、写模式加锁、无锁。

    1. 无锁。读/写进程都可以进入。

    2. 读模式锁。读进程可以进入。写进程不可以进入。

    3. 写模式锁。读/写进程都不可以进入。

Golang sync.Mutex 实现

sync.Mutex是Go标准库中常用的一个排外锁。当一个 goroutine 获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock 方法的调用上,直到锁被释放。

mutex

wait 实现

上述中提到的锁,准确而言是信号量,其实现方式有两种: wait 忙等待、阻塞自己

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 忙等待
wait(s) {
    while(s <= 0); // do nothing

    s--;
}

// 阻塞
wait(semaphore *s) {
    if (s.value <= 0) {
        // add this process to  s.list;
        // block()
    }

    s--;
}

忙等待和阻塞方式各有优劣:

  • 忙等待会使 CPU 空转,好处是如果在当前时间片内锁被其他进程释放,当前进程直接就能直接拿到锁而不需要 CPU 再进行进程调度。适用于锁占用时间较短的情况,且不适合于单处理器。
  • 阻塞不会导致 CPU 空转,但是进程切换也需要代价,比如上下文切换,CPU Cache Miss(缓存失效)

互斥锁

简单示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func count() {
	var mutex sync.Mutex
	count := 0

	for r := 0; r < 50; r++ {
		go func() {
			mutex.Lock()
			count += 1
			mutex.Unlock()
		}()
	}

	time.Sleep(time.Second)
	fmt.Println("the count is : ", count) // res:  50
}

上例使用 Mutex 实现对 count 进行同步。下面根据源码分析 golang Mutex 如何实现。

版本 /sync/mutex.go 1.12.7

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// A Mutex is a mutual exclusion lock. 
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
// Mutex 是一个互斥锁, 0 代表解除互斥,第一次使用后不允许复制。
type Mutex struct {
	state int32  // 状态 
	sema  uint32 // 非负数, 信号量
}

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota

    // Mutex can be in 2 modes of operations: normal and starvation. 互斥有两种模式:  正常和饥饿
}

其使用信号量的方式来实现的。sema就是信号量,一个非负数;state表示Mutex的状态。

  • mutexLocked = 1 表示锁是否可用(0可用,1被别的goroutine占用),
  • mutexWoken = 2 表示mutex是否被唤醒,
  • mutexStarving = 4 互斥有两种模式: 正常和饥饿 Go 1.9中增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在1毫秒
  • mutexWaiterShift = 1 表示统计阻塞在该mutex上的goroutine数目需要移位的数值。将3个常量映射到state上就是

  • normal: 在正常模式下,等待者按FIFO顺序排队,但是被唤醒的等待者不拥有互斥对象,而是与新到达的goroutine争夺所有权, 新到的goroutine有一个优势——它们已经运行在CPU上,而且可能有很多goroutine,所以一个被叫醒的服务员很有可能会输掉。在这种情况下,它被排在等待队列的前面。如果一个服务端在超过1ms的时间内没有获取互斥量,它将互斥对象切换到饥饿模式。

  • starving: 在饥饿模式下,互斥锁的所有权直接从解锁 goroutine 传递给队列前面的服务员。新到达的goroutine 不会尝试获取互斥体,即使它看起来是解锁的,也不会尝试旋转。相反,它们将自己排在等待队列的末尾。

Lock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {        // 如果已经被某个 goroutine 使用,将阻塞直到( mutex.sema = 0 )mutex 可用
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {    // CAS 比较是否为 0 如果为 0 则置 1 表示 lock 成功
		if race.Enabled {  // 竞态检测,并标记 [详细了解 race](https://www.cnblogs.com/yjf512/p/5144211.html)
			race.Acquire(unsafe.Pointer(m))  
		}
		return
	}

	var waitStartTime int64
	starving := false
	awoke := false
    // 自旋数
	iter := 0
    // 保存当前协程状态
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
        // 第一个条件是 state 已被锁状态,且不是饥饿状态。如果时饥饿状态,自旋是没有用的,锁的拥有权直接交给了等待队列的第一个。
        // 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 自旋 设状态为 mutexWoken
            // remove detail code
			continue
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 { 
			new += 1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		if starving && old&mutexLocked != 0 { // 任然为 mutexLocked|mutexStarving 状态,标志位偏移,加入阻塞队列
			new |= mutexStarving
		}
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 等待时间超过 1 ns,进入饥饿状态
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddIntt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

Unlock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 当没有锁定的对象进行解锁将会是一个运行时错误,允许一个 goroutine 锁定一个互斥锁安排另一个 goroutine 解锁
func (m *Mutex) Unlock() {

	// Fast path: drop lock bit.
    // 如果state不是处于锁的状态, 那么就是Unlock根本没有加锁的mutex, panic
	new := atomic.AddInt32(&m.state, -mutexLocked) // state--;
	if (new+mutexLocked)&mutexLocked == 0 { // -> (new + 1) & 1 == 0 ?-> new == 0 ?
		throw("sync: unlock of unlocked mutex")
	}

    // 释放了锁,还得需要通知其它等待者
    // 锁如果处于饥饿状态,直接交给等待队列的第一个, 唤醒它,让它去获取锁
    // 锁如果处于正常状态,

    // 正常状态
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 将等待的goroutine数减一,并设置woken标识,并通过信号量唤醒某个阻塞的 goroutine 去获取锁.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false)
				return
			}
			old = m.state
		}
	} else {
        // 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
        // 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
        // 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
        // 新来的goroutine不会把锁抢过去.
		runtime_Semrelease(&m.sema, true)
	}
}

总结

对于 Lock 操作:

  • Mutex 使用 CAS 对信号量的进行修改
  • 使用 自旋 1 ns 避免频繁切换协程状态
  • 1.9 加入 starving 状态,保证了竞态的公平性。

对于 Unlock 操作:

  • Mutex 的上锁没有绑定 gorouting 对象,使得会发生其他协程可以 Unlock 当前的锁状态
  • 使用信号量唤醒阻塞的 goroutine 去获得锁

如看完笔者粗略的注释看完后不是特别理解,建议可以查看 Mutex 源码注释,以及参考链接。

参考