Go 推广了一句格言:不要通过共享内存来通信;而应该通过通信来共享内存。 该语言确实具有传统的互斥锁(互斥构造)来协调对共享内存的访问,但它更倾向于使用通道在 goroutine 之间共享信息。
在本文中,对 goroutine、线程和竞争条件的简短介绍为探讨两个 Go 程序奠定了基础。在第一个程序中,goroutine 通过同步共享内存进行通信,第二个程序使用通道达到相同的目的。代码可以从我的网站下载,位于一个带有 README 文件的 .zip 文件中。
线程和竞争条件
线程是可执行指令的序列,同一进程中的线程共享地址空间:多线程进程中的每个线程都具有对完全相同的内存位置的读/写访问权限。如果两个或多个线程(其中至少一个执行写入操作)对同一内存位置进行非协调访问,则会发生基于内存的竞争条件。
考虑一下整数变量 n
的描述,其值为 777,以及两个尝试更改其内容的线程
n = n + 10 +-----+ n = n - 10
Thread1------------>| 777 |<------------Thread2
+-----+
n
在多处理器机器上,两个线程可以真正同时执行。然后,对变量 n
的影响是不确定的。至关重要的是要注意,每次尝试更新都包含两个机器级操作:对 n
当前值执行算术运算(加或减 10),以及随后的赋值操作,将 n
设置为新值(787 或 767)。
在两个线程中执行的成对操作可能会以各种不适当的方式交错。考虑以下场景,其中每个编号项都是机器级别的单个操作。为简单起见,假设每个操作占用一个系统时钟周期
- 线程 1 执行加法运算以计算 787,该值保存在临时位置(堆栈上或 CPU 寄存器中)。
- 线程 2 执行减法运算以计算 767,该值也保存在临时位置。
- 线程 2 执行赋值;
n
的值现在为 767。 - 线程 1 执行赋值;
n
的值现在为 787。
通过最后到达,线程 1 赢得了与线程 2 的竞争。显然,发生了不正确的交错。线程 1 执行加法运算,延迟两个时钟周期,然后执行赋值。相比之下,线程 2 在没有中断的情况下执行减法和随后的赋值操作。修复方法很明确:算术和赋值操作应该像它们是单个原子操作一样发生。诸如互斥锁之类的构造提供了所需的修复,而 Go 具有互斥锁。
Go 程序通常是多线程的,尽管线程发生在表面之下。表面上是 goroutine。goroutine 是一个绿色线程——一个在 Go 运行时控制下的线程。相比之下,原生线程直接在操作系统控制之下。但是 goroutine 多路复用到操作系统调度的原生线程上,这意味着基于内存的竞争条件在 Go 中是可能的。两个示例程序中的第一个说明了这一点。
MiserSpendthrift1
MiserSpendthrift1 程序模拟了对银行帐户的共享访问。除了 main
之外,还有两个其他的 goroutine
- miser goroutine 重复地向余额中添加金额,每次一个货币单位。
- spendthrift goroutine 重复地从余额中减去金额,每次也是一个货币单位。
每个 goroutine 执行其操作的次数取决于命令行参数,该参数应该足够大才能有趣(例如,100,000 到几百万)。帐户余额初始化为零,最终应该为零,因为存款和取款的金额相同,数量也相同。
示例 1. 使用互斥锁协调对共享内存的访问
package main
import (
"os"
"fmt"
"runtime"
"strconv"
"sync"
)
var accountBalance = 0 // balance for shared bank account
var mutex = &sync.Mutex{} // mutual-exclusion lock
// critical-section code with explicit locking/unlocking
func updateBalance(amt int) {
mutex.Lock()
accountBalance += amt // two operations: update and assignment
mutex.Unlock()
}
func reportAndExit(msg string) {
fmt.Println(msg)
os.Exit(-1) // all 1s in binary
}
func main() {
if len(os.Args) < 2 {
reportAndExit("\nUsage: go ms1.go <number of updates per thread>")
}
iterations, err := strconv.Atoi(os.Args[1])
if err != nil {
reportAndExit("Bad command-line argument: " + os.Args[1]);
}
var wg sync.WaitGroup // wait group to ensure goroutine coordination
// miser increments the balance
wg.Add(1) // increment WaitGroup counter
go func() {
defer wg.Done() // invoke Done on the WaitGroup when finished
for i := 0; i < iterations ; i++ {
updateBalance(1)
runtime.Gosched() // yield to another goroutine
}
}()
// spendthrift decrements the balance
wg.Add(1) // increment WaitGroup counter
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
updateBalance(-1)
runtime.Gosched() // be nice--yield
}
}()
wg.Wait() // await completion of miser and spendthrift
fmt.Println("Final balance: ", accountBalance) // confirm final balance is zero
}
MiserSpendthrift1 程序中的控制流(见上文)可以描述如下
- 程序首先尝试读取和验证命令行参数,该参数指定 miser 和 spendthrift 各自应更新帐户余额的次数(例如,一百万次)。
main
goroutine 使用以下调用启动另外两个 goroutine
go func() { // either the miser or the spendthrift
启动的两个 goroutine 中的第一个代表 miser,第二个代表 spendthrift。
- 程序使用
sync.WaitGroup
来确保在 miser 和 spendthrift goroutine 完成其工作并终止之前,main
goroutine 不会打印最终余额。
MiserSpendthrift1 程序声明了两个全局变量,一个整数变量表示共享银行帐户,另一个互斥锁用于确保协调 goroutine 对帐户的访问
var accountBalance = 0 // balance for shared bank account
var mutex = &sync.Mutex{} // mutual-exclusion lock
互斥锁代码出现在 updateBalance
函数中,以保护临界区,临界区是一个必须以单线程方式执行的代码段,程序才能正常运行
func updateBalance(amt int) {
mutex.Lock()
accountBalance += amt // critical section
mutex.Unlock()
}
临界区是 Lock()
和 Unlock()
调用之间的语句。尽管在 Go 源代码中是一行代码,但此语句涉及两个不同的操作:算术运算,然后是赋值。这两个操作必须一起执行,一次一个线程,互斥锁代码确保了这一点。有了锁定代码,accountBalance
在最后为零,因为加 1 和减 1 的次数相同。
如果删除互斥锁代码,则 accountBalance
的最终值是不可预测的。在删除锁定代码的两次示例运行中,最终余额在第一次运行中为 249,在第二次运行中为 -87,从而证实发生了基于内存的竞争条件。
互斥锁代码的行为值得仔细研究
- 要执行临界区代码,goroutine 必须首先通过执行
mutex.Lock()
调用来获取锁。如果锁已被持有,则 goroutine 会阻塞,直到锁变为可用;否则,goroutine 将执行受互斥锁保护的临界区。 - 互斥锁保证互斥,即一次只有一个 goroutine 可以执行锁定的代码段。互斥锁确保临界区的单线程执行:算术运算,然后是赋值操作。
- 对
Unlock()
的调用释放持有的锁,以便某个 goroutine(可能是刚刚释放锁的那个 goroutine)可以重新获取锁。
在 MiserSpendthrift1 程序中,三个 goroutine(miser、spendthrift 和 main
)通过名为 accountBalance
的共享内存位置进行通信。互斥锁协调 miser 和 spendthrift 对此变量的访问,并且 main
仅在 miser 和 spendthrift 都终止后才尝试访问该变量。即使使用相对较大的命令行参数(例如,五百万到一千万),程序运行速度也相对较快,并且 accountBalance
的预期最终值为零。
包 sync/atomic
具有诸如 AddInt32
之类的内置同步功能的函数。例如,如果 accountBalance
类型从 int
更改为 int32
,则可以按如下方式简化 updateBalance
函数
func updateBalance(amt int32) { // argument must be int32 as well
atomic.AddInt32(&accountBalance, amt) // no explicit locking required
}
MiserSpendthrift1 程序使用显式锁定来突出显示临界区代码,并强调线程同步对于防止竞争条件的必要性。在生产级示例中,临界区可能包含多行源代码。在任何情况下,临界区都应尽可能短,以使程序尽可能并发。
MiserSpendthrift2
MiserSpendthrift2 程序再次具有初始化为零的全局变量 accountBalance
,并且再次有 miser 和 spendthrift goroutine 争夺更新余额。但是,此程序不使用互斥锁来防止竞争条件。相反,现在有一个 banker goroutine,它响应来自 miser 和 spendthrift 的请求来访问 accountBalance
。这两个 goroutine 不再直接更新 accountBalance
。以下是架构草图
requests updates
miser/spendthrift---------->banker--------->balance
此架构在线程安全的 Go 通道的支持下,序列化来自 miser 和 spendthrift 的请求,防止了 accountBalance
上的竞争条件。
示例 2. 使用线程安全通道协调对共享内存的访问
package main
import (
"os"
"fmt"
"runtime"
"strconv"
"sync"
)
type bankOp struct { // bank operation: deposit or withdraw
howMuch int // amount
confirm chan int // confirmation channel
}
var accountBalance = 0 // shared account
var bankRequests chan *bankOp // channel to banker
func updateBalance(amt int) int {
update := &bankOp{howMuch: amt, confirm: make(chan int)}
bankRequests <- update
newBalance := <-update.confirm
return newBalance
}
// For now a no-op, but could save balance to a file with a timestamp.
func logBalance(current int) { }
func reportAndExit(msg string) {
fmt.Println(msg)
os.Exit(-1) // all 1s in binary
}
func main() {
if len(os.Args) < 2 {
reportAndExit("\nUsage: go ms1.go <number of updates per thread>")
}
iterations, err := strconv.Atoi(os.Args[1])
if err != nil {
reportAndExit("Bad command-line argument: " + os.Args[1]);
}
bankRequests = make(chan *bankOp, 8) // 8 is channel buffer size
var wg sync.WaitGroup
// The banker: handles all requests for deposits and withdrawals through a channel.
go func() {
for {
/* The select construct is non-blocking:
-- if there's something to read from a channel, do so
-- otherwise, fall through to the next case, if any */
select {
case request := <-bankRequests:
accountBalance += request.howMuch // update account
request.confirm <- accountBalance // confirm with current balance
}
}
}()
// miser increments the balance
wg.Add(1) // increment WaitGroup counter
go func() {
defer wg.Done() // invoke Done on the WaitGroup when finished
for i := 0; i < iterations ; i++ {
newBalance := updateBalance(1)
logBalance(newBalance)
runtime.Gosched() // yield to another goroutine
}
}()
// spendthrift decrements the balance
wg.Add(1) // increment WaitGroup counter
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
newBalance := updateBalance(-1)
logBalance(newBalance)
runtime.Gosched() // be nice--yield
}
}()
wg.Wait() // await completion of miser and spendthrift
fmt.Println("Final balance: ", accountBalance) // confirm the balance is zero
}
MiserSpendthrift2 程序中的更改可以概括如下。有一个 BankOp
结构
type bankOp struct { // bank operation: deposit or withdraw
howMuch int // amount
confirm chan int // confirmation channel
}
miser 和 spendthrift goroutine 使用它来发出更新请求。howMuch
字段是更新金额,1(miser)或 -1(spendthrift)。confirm
字段是 banker goroutine 在响应 miser 或 spendthrift 请求时使用的通道;此通道将新余额传回请求者作为确认。为了提高效率,bankOp
结构的地址而不是其副本通过 bankRequests
通道发送,该通道声明如下
var bankRequests chan *bankOp // channel of pointers to a bankOp
通道默认是同步的——即线程安全的。
miser 和 spendthrift 再次调用 updateBalance
函数以更改帐户余额。此函数不再具有任何显式线程同步
func updateBalance(amt int) int { // request structure
update := &bankOp{howMuch: amt,
confirm: make(chan int)}
bankRequests <- update // send request
newBalance := <-update.confirm // await confirmation
return newBalance // perhaps to be logged
}
bankRequests
通道的缓冲区大小为 8,以最大限度地减少阻塞。该通道最多可以容纳八个未读请求,之后进一步尝试添加另一个 bankOp
指针将被阻止。与此同时,banker goroutine 应该正在处理到达的请求;当 banker 读取请求时,请求会自动从通道中删除。但是,confirm
通道没有缓冲。请求者会阻塞,直到确认消息(存储在本地 newBalanace
变量中的更新余额)从 banker 到达。
因此,updateBalance
函数中的局部变量和参数(update
、newBalance
和 amt
)是线程安全的,因为每个 goroutine 都会获得它们自己的副本。通道也是线程安全的,因此 updateBalance
函数的主体不再需要显式锁定。程序员真是松了一口气!
banker goroutine 无限循环,等待来自 miser 和 spendthrift goroutine 的请求
for {
select {
case request := <-bankRequests: // Is there a request?
accountBalance += request.howMuch // If so, update balance and
request.confirm <- accountBalance // confirm to requester
}
// other cases could be added (e.g., golf outings)
}
当 miser 和 spendthrift goroutine 仍在活动时,只有 banker goroutine 可以访问 accountBalance
,这意味着在此内存位置上不会发生竞争条件。只有在 miser 和 spendthrift 完成其工作并终止后,main
goroutine 才会打印 accountBalance
的最终值并退出。当 main
终止时,banker goroutine 也终止。
锁还是通道?
MiserSpendthrift2 程序通过倾向于通道而不是同步共享内存,从而遵循了 Go 的格言。可以肯定的是,锁定内存可能很棘手。互斥锁 API 是低级的,因此容易出错,例如锁定但忘记解锁——可能导致死锁。更微妙的错误包括仅锁定临界区的一部分(欠锁定)和锁定不属于临界区的代码(过度锁定)。诸如 atomic.AddInt32
之类的线程安全函数降低了这些风险,因为锁定和解锁会自动发生。然而,如何在复杂的程序中推理低级内存锁定仍然是一个挑战。
Go 的格言带来了它自身的挑战。如果使用足够大的命令行参数运行两个 miser/spendthrift 程序,则性能对比非常明显。互斥锁可能是低级的,但性能良好。Go 通道很有吸引力,因为它们提供内置的线程安全性,并鼓励对共享临界资源(例如两个示例程序中的 accountBalance
)进行单线程访问。但是,与互斥锁相比,通道会产生性能损失。
在编程中,很少有一种工具适合所有任务。因此,Go 提供了线程安全选项,范围从低级锁定到高级通道。
2 条评论