Go 学习笔记(二十七)互斥锁 Mutex 和读写锁 RWMutex 用法详述

本文原创地址:博客园骏马金龙Go 基础系列:互斥锁 Mutex 和读写锁 RWMutex 用法详述

sync.Mutex

Go 中使用 sync.Mutex 类型实现 mutex(排他锁、互斥锁)。在源代码的 sync/mutex.go 文件中,有如下定义:

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
	state int32
	sema uint32
}

这没有任何非凡的地方。 和 mutex 相关的所有事情都是通过 sync.Mutex 类型的两个方法 sync.Lock()和 sync.Unlock() 函数来完成的,前者用于获取 sync.Mutex 锁,后者用于释放 sync.Mutex 锁 。sync.Mutex 一旦被锁住,其它的 Lock()操作就无法再获取它的锁,只有通过 Unlock() 释放锁之后才能通过 Lock() 继续获取锁。

也就是说, 已有的锁会导致其它申请 Lock()操作的 goroutine 被阻塞,且只有在 Unlock() 的时候才会解除阻塞

另外需要注意, sync.Mutex 不区分读写锁,只有 Lock()与 Lock() 之间才会导致阻塞的情况 ,如果在一个地方 Lock(),在另一个地方不 Lock() 而是直接修改或访问共享数据,这对于 sync.Mutex 类型来说是允许的,因为 mutex 不会和 goroutine 进行关联。如果想要区分读、写锁,可以使用 sync.RWMutex 类型,见后文。

在 Lock()和 Unlock()之间的代码段称为资源的临界区 (critical section),在这一区间内的代码是严格被 Lock() 保护的,是线程安全的,任何一个时间点都只能有一个 goroutine 执行这段区间的代码

以下是使用 sync.Mutex 的一个示例,稍后是非常详细的分析过程。

package main

import (
	"fmt"
	"sync"
	"time"
)

// 共享变量
var (
	m  sync.Mutex
	v1 int
)

// 修改共享变量
// 在Lock()和Unlock()之间的代码部分是临界区
func change(i int) {
	m.Lock()
	time.Sleep(time.Second)
	v1 = v1 + 1
	if v1%10 == 0 {
		v1 = v1 - 10*i
	}
	m.Unlock()
}

// 访问共享变量
// 在Lock()和Unlock()之间的代码部分是是临界区
func read() int {
	m.Lock()
	a := v1
	m.Unlock()
	return a
}

func main() {
	var numGR = 21
	var wg sync.WaitGroup

	fmt.Printf("%d", read())

	// 循环创建numGR个goroutine
	// 每个goroutine都执行change()、read()
	// 每个change()和read()都会持有锁
	for i := 0; i < numGR; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			change(i)
			fmt.Printf(" -> %d", read())
		}(i)
	}

	wg.Wait()
}

第一次执行结果:

0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> -100 -> -99
-> -98 -> -97 -> -96 -> -95 -> -94 -> -93 -> -92 -> -91 -> -260 -> -259

第二次执行结果:注意其中的 -74 和 -72 之间跨了一个数

0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> -80 -> -79 
-> -78 -> -77 -> -76 -> -75 -> -74 -> -72 -> -71 -> -230 -> -229 -> -229

上面的示例中,change()、read() 都会申请锁,并在准备执行完函数时释放锁,它们如何修改数据、访问数据本文不多做解释。需要详细解释的是 main() 中的 for 循环部分。

在 for 循环中,会不断激活新的 goroutine(共 21 个) 执行匿名函数,在每个匿名函数中都会执行 change()和 read(),意味着每个 goroutine 都会申请两次锁、释放两次锁,且 for 循环中没有任何 Sleep 延迟,这 21 个 goroutine 几乎是一瞬间同时激活的。

但由于 change()和 read() 中都申请锁,对于这 21 个 goroutine 将要分别执行的 42 个 critical section,Lock() 保证了在某一时间点只有其中一个 goroutine 能访问其中一个 critical section。当释放了一个 critical section, 其它的 Lock()将争夺互斥锁,也就是所谓的竞争现象 (race condition) 。因为竞争的存在, 这 42 个 critical section 被访问的顺序是随机的 ,完全无法保证哪个 critical section 先被访问。

对于前 9 个被调度到的 goroutine,无论是哪个 goroutine 取得这 9 个 change(i) 中的 critical section,都只是对共享变量 v1 做加 1 运算,但当第 10 个 goroutine 被调度时,由于 v1 加 1 之后得到 10,它满足 if 条件,会执行v1 = v1 - i*10,但这个 i 可能是任意 0 到 numGR 之间的值 (因为无法保证并发的 goroutine 的调度顺序),这使得 v1 的值从第 10 个 goroutine 开始出现随机性。但从第 10 到第 19 个 goroutine 被调度的过程中,也只是对共享变量 v1 做加 1 运算,这些值是可以根据第 10 个数推断出来的,到第 20 个 goroutine,又再次随机。依此类推。

此外, 每个 goroutine 中的 read()也都会参与锁竞争,所以并不能保证每次 change(i) 之后会随之执行到 read() ,可能 goroutine 1 的 change()执行完后,会跳转到 goroutine 3 的 change() 上,这样一来,goroutine 1 的 read()就无法读取到 goroutine 1 所修改的 v1 值,而是访问到其它 goroutine 中修改后的值。所以,前面的第二次执行结果中出现了一次数据跨越。只不过执行完 change() 后立即执行 read() 的几率比较大,所以多数时候输出的数据都是连续的。

总而言之, Mutex 保证了每个 critical section 安全,某一时间点只有一个 goroutine 访问到这部分,但也因此而出现了随机性

如果 Lock()后忘记了 Unlock(),将会永久阻塞而出现死锁。

适合 sync.Mutex 的数据类型

其实,对于内置类型的共享变量来说,使用 sync.Mutex 和 Lock()、Unlock() 来保护也是不合理的,因为它们自身不包含 Mutex 属性。真正合理的共享变量是那些包含 Mutex 属性的 struct 类型。例如:

type mytype struct {
	m   sync.Mutex
	var int
}

x := new(mytype)

这时只要想保护 var 变量,就先 x.m.Lock(),操作完 var 后,再 x.m.Unlock()。这样就能保证 x 中的 var 字段变量一定是被保护的。

sync.RWMutex

Go 中使用 sync.RWMutex 类型实现读写互斥锁 rwmutex。在源代码的 sync/rwmutex.go 文件中,有如下定义:

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // 写锁需要等待读锁释放的信号量
	readerSem   uint32 // 读锁需要等待写锁释放的信号量
	readerCount int32  // 读锁后面挂起了多少个写锁申请
	readerWait  int32  // 已释放了多少个读锁
}

上面的注释和源代码说明了几点:

  1. RWMutex 是基于 Mutex 的,在 Mutex 的基础之上增加了读、写的信号量,并使用了类似引用计数的读锁数量
  2. 读锁与读锁兼容,读锁与写锁互斥,写锁与写锁互斥,只有在锁释放后才可以继续申请互斥的锁
    • 可以同时申请多个读锁
    • 有读锁时申请写锁将阻塞,有写锁时申请读锁将阻塞
    • 只要有写锁,后续申请读锁和写锁都将阻塞

此类型有几个锁和解锁的方法:

func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()

其中:

  1. Lock()和 Unlock() 用于申请和释放写锁
  2. RLock()和 RUnlock() 用于申请和释放读锁
    • 一次 RUnlock() 操作只是对读锁数量减 1,即减少一次读锁的引用计数
  3. 如果不存在写锁,则 Unlock()引发 panic,如果不存在读锁,则 RUnlock() 引发 panic
  4. RLocker()用于返回一个实现了 Lock() 和 Unlock() 方法的 Locker 接口

此外,无论是 Mutex 还是 RWMutex 都不会和 goroutine 进行关联,这意味着它们的锁申请行为可以在一个 goroutine 中操作,释放锁行为可以在另一个 goroutine 中操作。

由于 RLock()和 Lock() 都能保证数据不被其它 goroutine 修改,所以在 RLock()与 RUnlock() 之间的,以及 Lock()与 Unlock() 之间的代码区都是 critical section。

以下是一个示例,此示例中同时使用了 Mutex 和 RWMutex,RWMutex 用于读、写,Mutex 只用于读。

package main

import (
	"fmt"
	"os"
	"sync"
	"time"
)

var Password = secret{password: "myPassword"}

type secret struct {
	RWM      sync.RWMutex
	M        sync.Mutex
	password string
}

// 通过rwmutex写
func Change(c *secret, pass string) {
	c.RWM.Lock()
	fmt.Println("Change with rwmutex lock")
	time.Sleep(3 * time.Second)
	c.password = pass
	c.RWM.Unlock()
}

// 通过rwmutex读
func rwMutexShow(c *secret) string {
	c.RWM.RLock()
	fmt.Println("show with rwmutex",time.Now().Second())
	time.Sleep(1 * time.Second)
	defer c.RWM.RUnlock()
	return c.password
}

// 通过mutex读,和rwMutexShow的唯一区别在于锁的方式不同
func mutexShow(c *secret) string {
	c.M.Lock()
	fmt.Println("show with mutex:",time.Now().Second())
	time.Sleep(1 * time.Second)
	defer c.M.Unlock()
	return c.password
}

func main() {
	// 定义一个稍后用于覆盖(重写)的函数
	var show = func(c *secret) string { return "" }

	// 通过变量赋值的方式,选择并重写showFunc函数
	if len(os.Args) != 2 {
		fmt.Println("Using sync.RWMutex!",time.Now().Second())
		show = rwMutexShow
	} else {
		fmt.Println("Using sync.Mutex!",time.Now().Second())
		show = mutexShow
	}
	
	var wg sync.WaitGroup

	// 激活5个goroutine,每个goroutine都查看
	// 根据选择的函数不同,showFunc()加锁的方式不同
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println("Go Pass:", show(&Password),time.Now().Second())
		}()
	}
	
	// 激活一个申请写锁的goroutine
	go func() {
		wg.Add(1)
		defer wg.Done()
		Change(&Password, "123456")
	}()
	// 阻塞,直到所有wg.Done
	wg.Wait()
}

Change() 函数申请写锁,并睡眠 3 秒后修改数据,然后释放写锁。

rwMutexShow()函数申请读锁,并睡眠一秒后取得数据,并释放读锁。注意,rwMutexShow() 中的 print 和 return 是相隔一秒钟的。

mutexShow()函数申请 Mutex 锁,和 RWMutex 互不相干。和 rwMutexShow() 唯一不同之处在于申请的锁不同。

main()中,先根据命令行参数数量决定运行哪一个 show()。之所以能根据函数变量来赋值,是因为先定义了一个 show()函数,它的函数签名和 rwMutexShow()、mutexShow() 的签名相同,所以可以相互赋值。

for 循环中激活了 5 个 goroutine 并发运行,for 瞬间激活 5 个 goroutine 后,继续执行 main() 代码会激活另一个用于申请写锁的 goroutine。这 6 个 goroutine 的执行顺序是随机的。

如果 show 选中的函数是 rwMutexShow(),则 5 个 goroutine 要申请的 RLock() 锁和写锁是冲突的,但 5 个 RLock() 是兼容的。所以,只要某个时间点调度到了写锁的 goroutine,剩下的读锁 goroutine 都会从那时开始阻塞 3 秒。

除此之外,还有一个不严格准确,但在时间持续长短的理论上来说能保证的一个规律:当修改数据结束后,各个剩下的 goroutine 都申请读锁,因为申请后立即 print 输出,然后睡眠 1 秒,但 1 秒时间足够所有剩下的 goroutine 申请完读锁,使得show with rwmutex输出是连在一起,输出的Go Pass: 123456又是连在一起的。

某次结果如下:

Using sync.RWMutex! 58
show with rwmutex 58
Change with rwmutex lock
Go Pass: myPassword 59
show with rwmutex 2
show with rwmutex 2
show with rwmutex 2
show with rwmutex 2
Go Pass: 123456 3
Go Pass: 123456 3
Go Pass: 123456 3
Go Pass: 123456 3

如果 show 选中的函数是 mutexShow(),则读数据和写数据互不冲突,但读和读是冲突的 (因为 Mutex 的 Lock() 是互斥的)。

某次结果如下:

Using sync.Mutex! 30
Change with rwmutex lock
show with mutex: 30
Go Pass: myPassword 31
show with mutex: 31
Go Pass: myPassword 32
show with mutex: 32
Go Pass: 123456 33
show with mutex: 33
show with mutex: 34
Go Pass: 123456 34
Go Pass: 123456 35

用 Mutex 还是用 RWMutex

Mutex 和 RWMutex 都不关联 goroutine,但 RWMutex 显然更适用于读多写少的场景。仅针对读的性能来说,RWMutex 要高于 Mutex,因为 rwmutex 的多个读可以并存。
上一篇 Go 学习笔记(二十六)指定 goroutine 的执行顺序
Go 学习笔记(目录)
下一篇 Go 学习笔记(二十八)Go 实现工作池的两种方式