乐于分享
好东西不私藏

一起读Go源码(第14期) sync:互斥锁与读写锁

一起读Go源码(第14期) sync:互斥锁与读写锁

概述

Go标注库sync包提供了互斥锁读写锁,用于解决并发安全问题,即多协程同时操作共享资源导致的数据竞争问题。简单来讲,互斥锁即同一时间只能有一个协程持有锁,对读和写都独占,不允许任何其他任何并行操作读写锁将读锁和写锁分开,写锁比读锁有更高的优先级;首先看一段老生常谈的代码,以及为什么每次运行count的值都不一样?

var wg sync.WaitGroupvar count = 0wg.Add(1000)for i := 0; i < 1000; i++ {    gofunc(id int) {        count++        defer wg.Done()    }(i)}wg.Wait()log.Println("count=>", count)

这时候有彦祖就要说了,这是数据竞争,多个goroutine同时修改count时(修改步骤包括读,+1,和写三个操作),有的goroutine在读,有的在+1,有的在写入,这些操作啥时候进行都取决于调度顺序,因此每次运行的结果都不一样,而且总是小于1000的;基于该例子下面开始介绍sync.Mutex的作用了。

sync.Mutex

现在我们使用sync.Mutex改进上面的代码,在count++操作这段代码时使用mu.Lock()加锁,保证函数退出时解锁defer mu.Unlock();此时我们看go func代码块,首先某协程会先尝试获取锁,如果锁已经被其他goroutine持有,那么当前goroutine阻塞等待,等到被释放;终于等到该协程获取锁,此时进入临界区,读取count、计算count+1、写count的值这三步操作,在有锁的情况下这些操作都不会被其他goroutine打断。

var wg sync.WaitGroupvar mu sync.Mutexvar count = 0wg.Add(1000)for i := 0; i < 1000; i++ {    gofunc(id int) {        mu.Lock() // 加锁        defer mu.Unlock()  // 退出时自动解锁        count++        defer wg.Done()    }(i)}wg.Wait()

此时无论程序运行几次都会得出1000的结果。那LockUnlock方法具体是怎么实现的呢?先来看看sync.Mutex的结构体,注释的意思是Mutex是一个互斥锁,Mutex的零值是一个未锁定的互斥锁,Mutex在首次使用后不能被复制;在Go内存模型中,对于任意n<m,第n对Mutex.Unlock的调用在同步顺序上先于第m次对Mutex.Lock的调用。对Mutex.TryLock的成功等价于对Lock的调用,失败的TryLock 调用不会建立任何 “同步顺序先于”的关系.

// A Mutex is a mutual exclusion lock.// The zero value for a Mutex is an unlocked mutex.//// A Mutex must not be copied after first use.//// In the terminology of [the Go memory model],// the n'th call to [Mutex.Unlock] “synchronizes before” the m'th call to [Mutex.Lock]// for any n < m.// A successful call to [Mutex.TryLock] is equivalent to a call to Lock.// A failed call to TryLock does not establish any “synchronizes before”// relation at all.//// [the Go memory model]: https://go.dev/ref/memtype Mutex struct {    state int32    sema  uint32}

其中Mutex中的2个变量:

  • • state是锁状态,用于记录锁是否被持有,以及等待者数量
  • • sema是信号量,用于阻塞和唤醒goroutine

那么下面来看看Lock和Unlock方法具体是怎么实现的。

Lock

注释的意思是Lock锁定m(*Mutex),如果锁已经被占用,则调用的goroutine会被阻塞,直到互斥锁可用

// Lock locks m.// If the lock is already in use, the calling goroutine// blocks until the mutex is available.func (m *Mutex) Lock() {    // Fast path: grab unlocked mutex.    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {        if race.Enabled {            race.Acquire(unsafe.Pointer(m))        }        return    }    // Slow path (outlined so that the fast path can be inlined)    m.lockSlow()}
  1. 1. 快速路径:首先通过atomic.CompareAndSwapInt32原子操作尝试将m.state从0改为mutexLocked;此时有两种情况,若当前goroutine获得了锁,就直接返回;若race.Enabled为真(Go的内置工具竞态检测器已打开,该工具用于在运行时自动发现数据竞争),race.Acquire(unsafe.Pointer(m))会告诉检测器当前goroutine已经获得了该锁,建立同步关系避免误报竞争。
  2. 2. 慢速路径:就是锁已经被其他goroutine持有,此时调用m.lockSlow()m.lockSlow就特别特别复杂了,总得来说就是通过自旋runtime_doSpin()、信号量阻塞runtime_SemacquireMutex(&m.sema, queueLifo, 1)、饥饿模式mutexStarving(防止高并发时goroutine长期无法获得锁)等方式让goroutine成功获取到锁。

Unlock

接下来看Unlock,注释的意思是Unlock解锁m,如果在调用Unlock时m未被锁定,则运行时错误。已锁定的Mutex不与特定的goroutine关联,因此允许一个goroutine锁定Mutex,然后安排另一个goroutine来解锁它

// Unlock unlocks m.// It is a run-time error if m is not locked on entry to Unlock.//// A locked [Mutex] is not associated with a particular goroutine.// It is allowed for one goroutine to lock a Mutex and then// arrange for another goroutine to unlock it.func (m *Mutex) Unlock() {    if race.Enabled {        _ = m.state        race.Release(unsafe.Pointer(m))    }    // Fast path: drop lock bit.    new := atomic.AddInt32(&m.state, -mutexLocked)    if new != 0 {        // Outlined slow path to allow inlining the fast path.        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.        m.unlockSlow(new)    }}
  1. 1. 检测数据竞争,此有开启竞态检测器(跟上面那个一样),则调用rece.Release释放锁,建立同步关系。
  2. 2. 快速路径:通过atomic.AddInt32(&m.state, -mutexLocked)原子操作,将state减去锁定标志位,清除锁的状态;若为0说明锁是空闲的,解锁完成。
  3. 3. 慢速路径:交给m.unlockSlow()处理,该方法会唤醒等待者重置唤醒标志,或是直接将锁交给下一个等待者。

在高并发场景下(如上面1000个goroutine尝试一把锁),Lock的快速路径基本上是失败的,锁总是被其他goroutine持有,因此绝大部分goroutine都会进入慢速路径模式,通过自旋、信号阻塞、饥饿模式等机制等待获得锁。同样Unlock也一样,多数情况会在慢速路径。

sync.RWMutex

在sync包中读写锁RWMutex是更细的锁操作,它允许多个goroutine同时持有读锁,写锁则是互斥锁(跟爱情类似,爱情具有排他性的唯一),这种机制在读得多写得少的场景下能显著提高并发性能。这时候有彦祖要问了,既然允许多个goroutine同时持有读锁,那直接让读操作不上锁不更快吗?实际上读锁并不是为了限制读操作之间的并发,而是为了解决读操作时数据的一致性问题。在读写锁的共同限制下:

  • • 读锁:保证在持有读锁期间,不会有任何写操作进入临界区。若后面有写操作协程,会被阻塞直到所有读锁释放
  • • 写锁:保证在持有写锁期间,不会有任何读锁或写锁进入临界区

一起来看看下面这个读写锁例子,定义了read和write方法,期间在mu.RLockmu.Lock获取到读锁或写锁前会打印一句话”尝试获取读/写锁”,使用WaitGroup,启动3个协程运行read方法后,启动1个协程运行write方法时公共变量count++,再启动2个协程运行read方法,观察打印的日志。

func read(id int, mu *sync.RWMutex, wg *sync.WaitGroup, counter *int) {    defer wg.Done()    log.Println("id:", id, "尝试获取读锁")    mu.RLock()    defer mu.RUnlock()    log.Println("获取读锁成功,count=>", *counter)}func write(id int, mu *sync.RWMutex, wg *sync.WaitGroup, counter *int) {    defer wg.Done()    log.Println("id:", id, "尝试获取写锁")    mu.Lock()    defer mu.Unlock()    log.Println("获取写锁成功")    *counter++    time.Sleep(3 * time.Second) // 模拟写入用时    log.Println("read写入count=>", *counter)}func main() {    var mu sync.RWMutex    var wg sync.WaitGroup    counter := 0    wg.Add(3)    go read(1, &mu, &wg, &counter)    go read(2, &mu, &wg, &counter)    go read(3, &mu, &wg, &counter)    time.Sleep(100 * time.Millisecond)    wg.Add(1)    go write(1, &mu, &wg, &counter)    time.Sleep(100 * time.Millisecond)    wg.Add(2)    go read(2, &mu, &wg, &counter)    go read(3, &mu, &wg, &counter)    wg.Wait()    log.Println("所有操作已完成")}/*2026/04/02 21:33:11 id: 3 尝试获取读锁2026/04/02 21:33:11 获取读锁成功,count=> 02026/04/02 21:33:11 id: 1 尝试获取读锁2026/04/02 21:33:11 获取读锁成功,count=> 02026/04/02 21:33:11 id: 2 尝试获取读锁2026/04/02 21:33:11 获取读锁成功,count=> 02026/04/02 21:33:11 id: 1 尝试获取写锁2026/04/02 21:33:11 获取写锁成功2026/04/02 21:33:11 id: 2 尝试获取读锁2026/04/02 21:33:11 id: 3 尝试获取读锁2026/04/02 21:33:14 read写入count=> 12026/04/02 21:33:14 获取读锁成功,count=> 12026/04/02 21:33:14 获取读锁成功,count=> 12026/04/02 21:33:14 所有操作已完成*/

前3个goroutine正常执行,读到写操作("获取写锁成功"),当前协程持有写锁(互斥锁),下面两行id2,3尝试获取读写锁但并没能获取,因为在持有写锁期间,不会有任何读锁或写锁进入临界区,后续的读操作会被阻塞;看到打印read写入count=> 1后释放写锁,其他goroutine获取读锁,对count读取,最后打印所有操作完成。那具体是怎么实现的呢?先来看看RWMutex的结构体:

注释的意思是:RWMutex一种读写互斥锁,该锁可以被任意数量的读者持有,或被单个写者持有RWMutex的零值是未锁定的互斥锁,RWMutex在第一次使用后不得被复制;如果锁已经被一个或多个读者持有,而此时有任何goroutine调用Lock,那么后续并发的RLock将会被阻塞,直到该写者释放锁,从而确保写者最终能获得锁

// There is a modified copy of this file in runtime/rwmutex.go.// If you make any changes here, see if you should make them there.// A RWMutex is a reader/writer mutual exclusion lock.// The lock can be held by an arbitrary number of readers or a single writer.// The zero value for a RWMutex is an unlocked mutex.//// A RWMutex must not be copied after first use.//// If any goroutine calls [RWMutex.Lock] while the lock is already held by// one or more readers, concurrent calls to [RWMutex.RLock] will block until// the writer has acquired (and released) the lock, to ensure that// the lock eventually becomes available to the writer.// Note that this prohibits recursive read-locking.//// In the terminology of [the Go memory model],// the n'th call to [RWMutex.Unlock] “synchronizes before” the m'th call to Lock// for any n < m, just as for [Mutex].// For any call to RLock, there exists an n such that// the n'th call to Unlock “synchronizes before” that call to RLock,// and the corresponding call to [RWMutex.RUnlock] “synchronizes before”// the n+1'th call to Lock.//// [the Go memory model]: https://go.dev/ref/memtype RWMutex struct {    w           Mutex        // held if there are pending writers    writerSem   uint32       // semaphore for writers to wait for completing readers    readerSem   uint32       // semaphore for readers to wait for completing writers    readerCount atomic.Int32 // number of pending readers    readerWait  atomic.Int32 // number of departing readers}

结构体的字段:

  • • w,互斥锁,用于保护写者之间的互斥
  • • writerSem,写者信号量
  • • readerSem,读者信号量
  • • readerCount,当前持有读锁数量
  • • readerWait,当写者正在等待时,记录当前仍持有读锁但将要释放的读者数量

读锁  RLock

已经把注释摘下来了,以下 happens-before 关系通过如下方式向竞争检测器表明:

  • • Unlock → LockreaderSem
  • • Unlock → RLockreaderSem
  • • RUnlock → LockwriterSem
func (rw *RWMutex) RLock() {    if race.Enabled {        _ = rw.w.state        race.Disable()    }    if rw.readerCount.Add(1) < 0 {        // A writer is pending, wait for it.        runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)    }    if race.Enabled {        race.Enable()        race.Acquire(unsafe.Pointer(&rw.readerSem))    }}
  1. 1. 首先是竞争检测,if race.Enabled ...,如果开启了竞争就先关闭
  2. 2. 尝试获取读锁,rw.readerCount.Add(1),原子操作,将读者数量+1
  3. 3. 恢复竞争检测。race.Enable

读锁 RUnlock

注释:RUnlock撤销一次RLck的调用,它不会影响其他同事存在的读者;如果在调用RUnlockrw并未处于读锁定状态,则会导致运行时错误

// RUnlock undoes a single [RWMutex.RLock] call;// it does not affect other simultaneous readers.// It is a run-time error if rw is not locked for reading// on entry to RUnlock.func (rw *RWMutex) RUnlock() {    if race.Enabled {        _ = rw.w.state        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))        race.Disable()    }    if r := rw.readerCount.Add(-1); r < 0 {        // Outlined slow-path to allow the fast-path to be inlined        rw.rUnlockSlow(r)    }    if race.Enabled {        race.Enable()    }}
  1. 1. 老生常谈了先做竞争检测,先关掉。
  2. 2. 尝试释放读锁,rw.readerCount.Add(-1),调用rw.rUnlockSlow()慢速路径方法
  3. 3. 最后将竞争检测器再开启。

写锁 Lock

Lock锁定rw以进行写操作,如果锁已经被读锁定或写锁定,Lock会阻塞直到锁可用。

// Lock locks rw for writing.// If the lock is already locked for reading or writing,// Lock blocks until the lock is available.func (rw *RWMutex) Lock() {    if race.Enabled {        _ = rw.w.state        race.Disable()    }    // First, resolve competition with other writers.    rw.w.Lock()    // Announce to readers there is a pending writer.    r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders    // Wait for active readers.    if r != 0 && rw.readerWait.Add(r) != 0 {        runtime_SemacquireRWMutex(&rw.writerSem, false, 0)    }    if race.Enabled {        race.Enable()        race.Acquire(unsafe.Pointer(&rw.readerSem))        race.Acquire(unsafe.Pointer(&rw.writerSem))    }}

写锁 Unlock

Unlock解锁rw的写锁;如果在调用Unlock时rw并未处于写锁定状态,则会发生运行时错误。和Mutex类似,已锁定的RWMutex并不与特定的goroutine关联。

// Unlock unlocks rw for writing. It is a run-time error if rw is// not locked for writing on entry to Unlock.//// As with Mutexes, a locked [RWMutex] is not associated with a particular// goroutine. One goroutine may [RWMutex.RLock] ([RWMutex.Lock]) a RWMutex and then// arrange for another goroutine to [RWMutex.RUnlock] ([RWMutex.Unlock]) it.func (rw *RWMutex) Unlock() {    if race.Enabled {        _ = rw.w.state        race.Release(unsafe.Pointer(&rw.readerSem))        race.Disable()    }    // Announce to readers there is no active writer.    r := rw.readerCount.Add(rwmutexMaxReaders)    if r >= rwmutexMaxReaders {        race.Enable()        fatal("sync: Unlock of unlocked RWMutex")    }    // Unblock blocked readers, if any.    for i := 0; i < int(r); i++ {        runtime_Semrelease(&rw.readerSem, false, 0)    }    // Allow other writers to proceed.    rw.w.Unlock()    if race.Enabled {        race.Enable()    }}

总得来说,sync的读写互斥锁,允许多个读操作并发执行(可以保证读操作时的数据一致性),写操作时是互斥锁,因此适合读得多,写得少的并发场景

写在最后

本人是新手小白,如果这篇笔记中有任何错误或不准确之处,真诚地希望各位读者能够给予批评和指正,如有更好的实现方法请给我留言,谢谢!欢迎大家在评论区留言!觉得写得还不错的话欢迎大家关注一波!下一篇继续看看sync包的剩余方法。