并发 Go 中的锁与通道

比较两种与 goroutine 共享信息的方式,一种是使用同步共享内存,另一种是使用通道。
264 位读者喜欢这个。
Locks on a bridge in Paris

Jason Baker。CC BY-SA 4.0。

Go 推广了一句格言:不要通过共享内存来通信;而应该通过通信来共享内存。 该语言确实具有传统的互斥锁(互斥构造)来协调对共享内存的访问,但它更倾向于使用通道在 goroutine 之间共享信息。

在本文中,对 goroutine、线程和竞争条件的简短介绍为探讨两个 Go 程序奠定了基础。在第一个程序中,goroutine 通过同步共享内存进行通信,第二个程序使用通道达到相同的目的。代码可以从我的网站下载,位于一个带有 README 文件的 .zip 文件中。

线程和竞争条件

线程是可执行指令的序列,同一进程中的线程共享地址空间:多线程进程中的每个线程都具有对完全相同的内存位置的读/写访问权限。如果两个或多个线程(其中至少一个执行写入操作)对同一内存位置进行非协调访问,则会发生基于内存的竞争条件

考虑一下整数变量 n 的描述,其值为 777,以及两个尝试更改其内容的线程

        n = n + 10  +-----+  n = n - 10
Thread1------------>| 777 |<------------Thread2
                    +-----+
                       n

在多处理器机器上,两个线程可以真正同时执行。然后,对变量 n 的影响是不确定的。至关重要的是要注意,每次尝试更新都包含两个机器级操作:对 n 当前值执行算术运算(加或减 10),以及随后的赋值操作,将 n 设置为新值(787 或 767)。

在两个线程中执行的成对操作可能会以各种不适当的方式交错。考虑以下场景,其中每个编号项都是机器级别的单个操作。为简单起见,假设每个操作占用一个系统时钟周期

  1. 线程 1 执行加法运算以计算 787,该值保存在临时位置(堆栈上或 CPU 寄存器中)。
  2. 线程 2 执行减法运算以计算 767,该值也保存在临时位置。
  3. 线程 2 执行赋值;n 的值现在为 767。
  4. 线程 1 执行赋值;n 的值现在为 787。

通过最后到达,线程 1 赢得了与线程 2 的竞争。显然,发生了不正确的交错。线程 1 执行加法运算,延迟两个时钟周期,然后执行赋值。相比之下,线程 2 在没有中断的情况下执行减法和随后的赋值操作。修复方法很明确:算术和赋值操作应该像它们是单个原子操作一样发生。诸如互斥锁之类的构造提供了所需的修复,而 Go 具有互斥锁。

Go 程序通常是多线程的,尽管线程发生在表面之下。表面上是 goroutine。goroutine 是一个绿色线程——一个在 Go 运行时控制下的线程。相比之下,原生线程直接在操作系统控制之下。但是 goroutine 多路复用到操作系统调度的原生线程上,这意味着基于内存的竞争条件在 Go 中是可能的。两个示例程序中的第一个说明了这一点。

MiserSpendthrift1

MiserSpendthrift1 程序模拟了对银行帐户的共享访问。除了 main 之外,还有两个其他的 goroutine

  • miser goroutine 重复地向余额中添加金额,每次一个货币单位。
  • spendthrift goroutine 重复地从余额中减去金额,每次也是一个货币单位。

每个 goroutine 执行其操作的次数取决于命令行参数,该参数应该足够大才能有趣(例如,100,000 到几百万)。帐户余额初始化为零,最终应该为零,因为存款和取款的金额相同,数量也相同。

示例 1. 使用互斥锁协调对共享内存的访问

package main

import (
   "os"
   "fmt"
   "runtime"
   "strconv"
   "sync"
)

var accountBalance = 0    // balance for shared bank account
var mutex = &sync.Mutex{} // mutual-exclusion lock

// critical-section code with explicit locking/unlocking
func updateBalance(amt int) {
   mutex.Lock()
   accountBalance += amt  // two operations: update and assignment
   mutex.Unlock()
}

func reportAndExit(msg string) {
   fmt.Println(msg)
   os.Exit(-1) // all 1s in binary
}

func main() {
   if len(os.Args) < 2 {
      reportAndExit("\nUsage: go ms1.go <number of updates per thread>")
   }
   iterations, err := strconv.Atoi(os.Args[1])
   if err != nil {
      reportAndExit("Bad command-line argument: " + os.Args[1]);
   }

   var wg sync.WaitGroup  // wait group to ensure goroutine coordination

   // miser increments the balance
   wg.Add(1)           // increment WaitGroup counter
   go func() {
      defer wg.Done()  // invoke Done on the WaitGroup when finished
      for i := 0; i < iterations ; i++ {
         updateBalance(1)
         runtime.Gosched()  // yield to another goroutine
      }
   }()

   // spendthrift decrements the balance
   wg.Add(1)           // increment WaitGroup counter
   go func() {
      defer wg.Done()
      for i := 0; i < iterations; i++ {
         updateBalance(-1)
         runtime.Gosched()  // be nice--yield
      }
   }()

   wg.Wait()  // await completion of miser and spendthrift
   fmt.Println("Final balance: ", accountBalance)  // confirm final balance is zero
}

MiserSpendthrift1 程序中的控制流(见上文)可以描述如下

  • 程序首先尝试读取和验证命令行参数,该参数指定 miser 和 spendthrift 各自应更新帐户余额的次数(例如,一百万次)。
  • main goroutine 使用以下调用启动另外两个 goroutine
    go func() { // either the miser or the spendthrift

    启动的两个 goroutine 中的第一个代表 miser,第二个代表 spendthrift

  • 程序使用 sync.WaitGroup 来确保在 miser 和 spendthrift goroutine 完成其工作并终止之前,main goroutine 不会打印最终余额。

MiserSpendthrift1 程序声明了两个全局变量,一个整数变量表示共享银行帐户,另一个互斥锁用于确保协调 goroutine 对帐户的访问

var accountBalance = 0    // balance for shared bank account
var mutex = &sync.Mutex{} // mutual-exclusion lock

互斥锁代码出现在 updateBalance 函数中,以保护临界区,临界区是一个必须以单线程方式执行的代码段,程序才能正常运行

func updateBalance(amt int) {
   mutex.Lock()
   accountBalance += amt  // critical section
   mutex.Unlock()
}

临界区是 Lock()Unlock() 调用之间的语句。尽管在 Go 源代码中是一行代码,但此语句涉及两个不同的操作:算术运算,然后是赋值。这两个操作必须一起执行,一次一个线程,互斥锁代码确保了这一点。有了锁定代码,accountBalance 在最后为零,因为加 1 和减 1 的次数相同。

如果删除互斥锁代码,则 accountBalance 的最终值是不可预测的。在删除锁定代码的两次示例运行中,最终余额在第一次运行中为 249,在第二次运行中为 -87,从而证实发生了基于内存的竞争条件。

互斥锁代码的行为值得仔细研究

  • 要执行临界区代码,goroutine 必须首先通过执行 mutex.Lock() 调用来获取锁。如果锁已被持有,则 goroutine 会阻塞,直到锁变为可用;否则,goroutine 将执行受互斥锁保护的临界区。
  • 互斥锁保证互斥,即一次只有一个 goroutine 可以执行锁定的代码段。互斥锁确保临界区的单线程执行:算术运算,然后是赋值操作。
  • Unlock() 的调用释放持有的锁,以便某个 goroutine(可能是刚刚释放锁的那个 goroutine)可以重新获取锁。

在 MiserSpendthrift1 程序中,三个 goroutine(miser、spendthrift 和 main)通过名为 accountBalance 的共享内存位置进行通信。互斥锁协调 miser 和 spendthrift 对此变量的访问,并且 main 仅在 miser 和 spendthrift 都终止后才尝试访问该变量。即使使用相对较大的命令行参数(例如,五百万到一千万),程序运行速度也相对较快,并且 accountBalance 的预期最终值为零。

sync/atomic 具有诸如 AddInt32 之类的内置同步功能的函数。例如,如果 accountBalance 类型从 int 更改为 int32,则可以按如下方式简化 updateBalance 函数

func updateBalance(amt int32) {          // argument must be int32 as well
   atomic.AddInt32(&accountBalance, amt) // no explicit locking required
}

MiserSpendthrift1 程序使用显式锁定来突出显示临界区代码,并强调线程同步对于防止竞争条件的必要性。在生产级示例中,临界区可能包含多行源代码。在任何情况下,临界区都应尽可能短,以使程序尽可能并发。

MiserSpendthrift2

MiserSpendthrift2 程序再次具有初始化为零的全局变量 accountBalance,并且再次有 miser 和 spendthrift goroutine 争夺更新余额。但是,此程序不使用互斥锁来防止竞争条件。相反,现在有一个 banker goroutine,它响应来自 miser 和 spendthrift 的请求来访问 accountBalance。这两个 goroutine 不再直接更新 accountBalance。以下是架构草图

                  requests         updates
miser/spendthrift---------->banker--------->balance

此架构在线程安全的 Go 通道的支持下,序列化来自 miser 和 spendthrift 的请求,防止了 accountBalance 上的竞争条件。

示例 2. 使用线程安全通道协调对共享内存的访问

package main

import (
   "os"
   "fmt"
   "runtime"
   "strconv"
   "sync"
)

type bankOp struct { // bank operation: deposit or withdraw
   howMuch int       // amount
   confirm chan int  // confirmation channel
}

var accountBalance = 0          // shared account
var bankRequests chan *bankOp   // channel to banker

func updateBalance(amt int) int {
   update := &bankOp{howMuch: amt, confirm: make(chan int)}
   bankRequests <- update
   newBalance := <-update.confirm
   return newBalance
}

// For now a no-op, but could save balance to a file with a timestamp.
func logBalance(current int) { }

func reportAndExit(msg string) {
   fmt.Println(msg)
   os.Exit(-1) // all 1s in binary
}

func main() {
   if len(os.Args) < 2 {
      reportAndExit("\nUsage: go ms1.go <number of updates per thread>")
   }
   iterations, err := strconv.Atoi(os.Args[1])
   if err != nil {
      reportAndExit("Bad command-line argument: " + os.Args[1]);
   }

   bankRequests = make(chan *bankOp, 8) // 8 is channel buffer size

   var wg sync.WaitGroup
   // The banker: handles all requests for deposits and withdrawals through a channel.
   go func() {
      for {
         /* The select construct is non-blocking:
            -- if there's something to read from a channel, do so
            -- otherwise, fall through to the next case, if any */
         select {
         case request := <-bankRequests:
            accountBalance += request.howMuch   // update account
            request.confirm <- accountBalance   // confirm with current balance
         }
      }
   }()

   // miser increments the balance
   wg.Add(1)           // increment WaitGroup counter
   go func() {
      defer wg.Done()  // invoke Done on the WaitGroup when finished
      for i := 0; i < iterations ; i++ {
         newBalance := updateBalance(1)
         logBalance(newBalance)
         runtime.Gosched()  // yield to another goroutine
      }
   }()

   // spendthrift decrements the balance
   wg.Add(1)           // increment WaitGroup counter
   go func() {
      defer wg.Done()
      for i := 0; i < iterations; i++ {
         newBalance := updateBalance(-1)
         logBalance(newBalance)
         runtime.Gosched()  // be nice--yield
      }
   }()

   wg.Wait()  // await completion of miser and spendthrift
   fmt.Println("Final balance: ", accountBalance) // confirm the balance is zero
}

MiserSpendthrift2 程序中的更改可以概括如下。有一个 BankOp 结构

type bankOp struct { // bank operation: deposit or withdraw
   howMuch int       // amount
   confirm chan int  // confirmation channel
}

miser 和 spendthrift goroutine 使用它来发出更新请求。howMuch 字段是更新金额,1(miser)或 -1(spendthrift)。confirm 字段是 banker goroutine 在响应 miser 或 spendthrift 请求时使用的通道;此通道将新余额传回请求者作为确认。为了提高效率,bankOp 结构的地址而不是其副本通过 bankRequests 通道发送,该通道声明如下

var bankRequests chan *bankOp // channel of pointers to a bankOp

通道默认是同步的——即线程安全的。

miser 和 spendthrift 再次调用 updateBalance 函数以更改帐户余额。此函数不再具有任何显式线程同步

func updateBalance(amt int) int {   // request structure
   update := &bankOp{howMuch: amt,
                     confirm: make(chan int)}
   bankRequests <- update           // send request
   newBalance := <-update.confirm   // await confirmation
   return newBalance                // perhaps to be logged
}

bankRequests 通道的缓冲区大小为 8,以最大限度地减少阻塞。该通道最多可以容纳八个未读请求,之后进一步尝试添加另一个 bankOp 指针将被阻止。与此同时,banker goroutine 应该正在处理到达的请求;当 banker 读取请求时,请求会自动从通道中删除。但是,confirm 通道没有缓冲。请求者会阻塞,直到确认消息(存储在本地 newBalanace 变量中的更新余额)从 banker 到达。

因此,updateBalance 函数中的局部变量和参数(updatenewBalanceamt)是线程安全的,因为每个 goroutine 都会获得它们自己的副本。通道也是线程安全的,因此 updateBalance 函数的主体不再需要显式锁定。程序员真是松了一口气!

banker goroutine 无限循环,等待来自 miser 和 spendthrift goroutine 的请求

for {
   select {
   case request := <-bankRequests:      // Is there a request?
      accountBalance += request.howMuch // If so, update balance and
      request.confirm <- accountBalance // confirm to requester
   }
   // other cases could be added (e.g., golf outings)
}

当 miser 和 spendthrift goroutine 仍在活动时,只有 banker goroutine 可以访问 accountBalance,这意味着在此内存位置上不会发生竞争条件。只有在 miser 和 spendthrift 完成其工作并终止后,main goroutine 才会打印 accountBalance 的最终值并退出。当 main 终止时,banker goroutine 也终止。

锁还是通道?

MiserSpendthrift2 程序通过倾向于通道而不是同步共享内存,从而遵循了 Go 的格言。可以肯定的是,锁定内存可能很棘手。互斥锁 API 是低级的,因此容易出错,例如锁定但忘记解锁——可能导致死锁。更微妙的错误包括仅锁定临界区的一部分(欠锁定)和锁定不属于临界区的代码(过度锁定)。诸如 atomic.AddInt32 之类的线程安全函数降低了这些风险,因为锁定和解锁会自动发生。然而,如何在复杂的程序中推理低级内存锁定仍然是一个挑战。

Go 的格言带来了它自身的挑战。如果使用足够大的命令行参数运行两个 miser/spendthrift 程序,则性能对比非常明显。互斥锁可能是低级的,但性能良好。Go 通道很有吸引力,因为它们提供内置的线程安全性,并鼓励对共享临界资源(例如两个示例程序中的 accountBalance)进行单线程访问。但是,与互斥锁相比,通道会产生性能损失。

在编程中,很少有一种工具适合所有任务。因此,Go 提供了线程安全选项,范围从低级锁定到高级通道。

User profile image.
我是一名计算机科学领域的学者(德保罗大学计算与数字媒体学院),在软件开发方面拥有广泛的经验,主要是在生产计划和调度(钢铁行业)以及产品配置(卡车和客车制造)方面。有关书籍和其他出版物的详细信息,请访问

2 条评论

当只有两个参与者(一个 miser 和一个 spendthrift)并且 updateBalance() 强制每个参与者在发送下一个 bankOp 之前等待确认时,MiserSpendthrift2 如何填充大小为 8 的通道缓冲区?

由于您给出的原因,缓冲区不应该被填满,但是并发程序中的健壮性是一件好事。简而言之,缓冲是一种预防措施。

Creative Commons License本作品根据 Creative Commons Attribution-Share Alike 4.0 International License 获得许可。
© . All rights reserved.