当您使用复杂的分布式系统时,您很可能会遇到并发处理的需求。在 Mode.net,我们每天都在处理实时、快速且弹性的软件。如果没有高度并发的系统,构建一个以毫秒级动态路由数据包的全球专用网络是不可能的。这种动态路由是基于网络状态的,虽然这里有很多参数需要考虑,但我们的重点是链路 指标。在我们的上下文中,链路指标可以是与网络链路的状态或当前属性相关的任何内容(例如:链路延迟)。
链路指标的并发探测
H.A.L.O. (逐跳自适应链路状态最优路由),我们的动态路由算法部分依赖于链路指标来计算其路由表。这些指标由一个独立的组件收集,该组件位于每个 PoP(存在点)。PoP 是网络中代表单个路由实体的机器,通过链路连接并分布在多个位置,构成我们的网络。该组件使用网络数据包探测相邻机器,这些邻居将弹回初始探测。链路延迟值可以从接收到的探测中导出。因为每个 PoP 都有多个邻居,所以这种任务的本质是内在的并发性:我们需要实时测量每个相邻链路的延迟。我们无法承担顺序处理;为了计算这个指标,必须尽快处理每个探测。

序列号和重置:重新排序的情况
我们的探测组件交换数据包,并依靠序列号进行数据包处理。这旨在避免处理数据包重复或乱序数据包。我们的第一个实现依赖于特殊的序列号 0 来重置序列号。这样的数字仅在组件初始化期间使用。主要问题是我们正在考虑一个递增的序列号值,该值始终从 0 开始。在组件重启后,可能会发生数据包重新排序,并且数据包很容易用重置前正在使用的值替换序列号。这意味着后续的数据包将被忽略,直到它达到重置之前正在使用的序列号。
UDP 握手和有限状态机
这里的问题是在组件重启后正确地协商序列号。有很多方法可以处理这个问题,在讨论了我们的选择后,我们选择实现一个具有明确状态定义的 3 次握手协议。此握手在初始化期间建立链路上的会话。这保证了节点在同一会话上通信,并为其使用适当的序列号。
为了正确实现这一点,我们必须定义一个具有明确状态和转换的有限状态机。这使我们能够正确管理握手形成的所有极端情况。

会话 ID 由握手发起者生成。完整的交换序列如下
- 发送方发送一个 SYN (ID) 数据包。
- 接收方存储接收到的 ID 并发送一个 SYN-ACK (ID)。
- 发送方接收到 SYN-ACK (ID) 并发送一个 ACK (ID)。 它也开始发送序列号从 0 开始的数据包。
- 接收方检查最后接收到的 ID ,如果 ID 匹配,则接受 ACK (ID) 。它也开始接受序列号为 0 的数据包。
处理状态超时
基本上,在每个状态下,您最多需要处理三种类型的事件:链路事件、数据包事件和超时事件。这些事件是并发出现的,所以这里您必须正确处理并发。
- 链路事件要么是链路启动,要么是链路关闭更新。这可以启动链路会话或中断现有会话。
- 数据包事件是控制数据包 (SYN/SYN-ACK/ACK) 或只是探测响应。
- 超时事件是在为当前会话状态调度的超时到期后触发的事件。
这里的主要挑战是如何处理并发超时到期和其他事件。而这正是人们很容易陷入死锁和竞争条件陷阱的地方。
第一种方法
用于这个项目的语言是 Golang。它确实提供了本机同步机制,例如本机通道和锁,并且能够为并发处理生成轻量级线程。

Gophers 一起黑客
作者:Ashley McNamara, CC BY-NC-SA 4.0
您可以首先设计一个结构来表示我们的 会话 和 超时处理程序。
type Session struct {
State SessionState
Id SessionId
RemoteIp string
}
type TimeoutHandler struct {
callback func(Session)
session Session
duration int
timer *timer.Timer
}
会话 标识连接会话,包括会话 ID、相邻链路 IP 和当前会话状态。
TimeoutHandler 包含回调函数、它应该运行的会话、持续时间以及指向计划计时器的指针。
有一个全局映射,它将存储每个相邻链路会话的计划超时处理程序。
SessionTimeout map[Session]*TimeoutHandler
注册和取消超时是通过以下方法实现的
// schedules the timeout callback function.
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {
timeout.callback(timeout.session)
})
}
func (timeout* TimeoutHandler) Cancel() {
if timeout.timer == nil {
return
}
timeout.timer.Stop()
}
对于超时创建和存储,您可以使用如下方法
func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {
if sessionTimeout[session] == nil {
sessionTimeout[session] := new(TimeoutHandler)
}
timeout = sessionTimeout[session]
timeout.session = session
timeout.callback = callback
timeout.duration = duration
return timeout
}
一旦超时处理程序被创建和注册,它会在 duration 秒过去后运行回调。但是,某些事件将需要您重新调度超时处理程序(就像在 SYN 状态下发生的那样——每 3 秒)。
为此,您可以让回调重新调度新的超时
func synCallback(session Session) {
sendSynPacket(session)
// reschedules the same callback.
newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)
newTimeout.Register()
sessionTimeout[state] = newTimeout
}
此回调在新的超时处理程序中重新调度自身,并更新全局 sessionTimeout 映射。
数据竞争和引用
您的解决方案已准备就绪。一个简单的测试是检查超时回调是否在计时器过期后执行。为此,注册一个超时,睡眠其持续时间,然后检查回调操作是否完成。在测试执行后,最好取消计划的超时(因为它会重新调度),这样它就不会在测试之间产生副作用。
令人惊讶的是,这个简单的测试发现了解决方案中的一个错误。使用 cancel 方法取消超时根本不起作用。以下事件顺序会导致数据竞争条件
- 您有一个计划的超时处理程序。
- 线程 1
a) 您收到一个控制数据包,现在您想取消注册的超时并继续下一个会话状态。(例如,在您发送 SYN 后收到 SYN-ACK )。
b) 您调用 timeout.Cancel(),它调用 timer.Stop()。(请注意,Golang 计时器停止不会阻止已过期的计时器运行。) - 线程 2
a) 就在取消调用之前,计时器已过期,回调即将执行。
b) 回调被执行,它调度一个新的超时并更新全局映射。 - 线程 1
a) 转换为新的会话状态并注册一个新的超时,更新全局映射。
两个线程都在并发更新超时映射。最终结果是您未能取消注册的超时,然后您也丢失了对线程 2 完成的重新调度超时的引用。这会导致处理程序持续执行和重新调度一段时间,从而产生不需要的行为。
当锁不足够时
使用锁也不能完全解决问题。如果您在处理任何事件之前和执行回调之前添加锁,它仍然不能阻止过期的回调运行
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
})
}
现在的区别在于全局映射中的更新是同步的,但这并不能阻止回调在您调用 timeout.Cancel() 后运行——如果计划的计时器已过期但尚未获取锁,则会发生这种情况。您应该再次丢失对已注册超时之一的引用。
使用取消通道
与其依赖 golang 的 timer.Stop()(它不会阻止过期的计时器执行),不如使用取消通道。
这是一种稍微不同的方法。现在您不会通过回调进行递归重新调度;相反,您注册一个无限循环,等待取消信号或超时事件。
新的 Register() 生成一个新的 go 线程,该线程在超时后运行您的回调,并在上一个超时执行后调度一个新的超时。取消通道返回给调用者,以控制循环何时应停止。
func (timeout *TimeoutHandler) Register() chan struct{} {
cancelChan := make(chan struct{})
go func () {
select {
case _ = <- cancelChan:
return
case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):
func () {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
} ()
}
} ()
return cancelChan
}
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
timeout.cancelChan <- struct{}{}
}
这种方法为您注册的每个超时提供一个取消通道。取消调用向通道发送一个空结构并触发取消。但是,这并不能解决之前的问题;超时可能在您通过通道调用取消之前,以及在超时线程获取锁之前过期。
这里的解决方案是在您获取锁后在超时范围内检查取消通道。
case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):
func () {
stateLock.Lock()
defer stateLock.Unlock()
select {
case _ = <- handler.cancelChan:
return
default:
timeout.callback(timeout.session)
}
} ()
}
最后,这保证了回调仅在您获取锁并且未触发取消后才执行。
提防死锁
这个解决方案似乎有效;但是,这里有一个隐藏的陷阱:死锁。
请再次阅读上面的代码,并尝试自己找到它。考虑并发调用任何描述的方法。
这里的最后一个问题是取消通道本身。我们将其设为无缓冲通道,这意味着发送是一个阻塞调用。一旦您在超时处理程序中调用 cancel,您只有在该处理程序被取消后才能继续。这里的问题是当您对同一个取消通道进行多次调用时,取消请求只被消耗一次。如果并发事件要取消同一个超时处理程序,例如链路关闭或控制数据包事件,则很容易发生这种情况。这会导致死锁情况,可能会导致应用程序停止运行。

有人在听吗?
作者:Trevor Forrey。经许可使用。
这里的解决方案是至少使通道缓冲为 1,这样发送就不会总是阻塞,并且在并发调用的情况下也显式地使发送成为非阻塞的。这保证了取消只发送一次,并且不会阻塞后续的取消调用。
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
select {
case timeout.cancelChan <- struct{}{}:
default:
// can’t send on the channel, someone has already requested the cancellation.
}
}
结论
您在实践中了解到,在使用并发代码时,常见的错误是如何出现的。由于它们的非确定性性质,即使经过广泛的测试,这些问题也很容易被检测不到。以下是我们在初始实现中遇到的三个主要问题。
在没有同步的情况下更新共享数据
这似乎是显而易见的,但如果您的并发更新发生在不同的位置,则实际上很难发现。结果是数据竞争,其中对同一数据的多次更新可能会导致更新丢失,因为一个更新覆盖了另一个更新。在我们的案例中,我们正在更新同一共享映射上的计划超时引用。(有趣的是,如果 Go 检测到在同一个 Map 对象上的并发读/写,它会抛出一个致命错误——您可以尝试运行 Go 的数据竞争检测器)。这最终会导致丢失超时引用,并使其无法取消给定的超时。始终记住在需要时使用锁。

不要忘记同步 gophers 的工作
缺少条件检查
在您不能仅依赖锁的排他性的情况下,需要进行条件检查。我们的情况有点不同,但核心思想与 条件变量 相同。想象一下一个经典的情况,您有一个生产者和多个消费者使用共享队列。生产者可以向队列添加一个项目并唤醒所有消费者。唤醒调用意味着队列中有一些数据可用,并且由于队列是共享的,因此必须通过锁同步访问。每个消费者都有机会获取锁;但是,您仍然需要检查队列中是否有项目。需要进行条件检查,因为在您获取锁时您不知道队列状态。
在我们的示例中,超时处理程序从计时器过期收到“唤醒”调用,但它仍然需要检查是否已向其发送取消信号,然后才能继续执行回调。

如果您唤醒多个 gophers,可能需要条件检查
死锁
当一个线程被卡住,无限期地等待唤醒信号时,就会发生这种情况,但这个信号永远不会到达。这些可能会通过停止您的整个程序执行来完全杀死您的应用程序。
在我们的案例中,这是由于对非缓冲和阻塞通道的多次发送调用而发生的。这意味着发送调用只有在同一通道上完成接收后才会返回。我们的超时线程循环及时地接收取消通道上的信号;但是,在收到第一个信号后,它会中断循环,并且永远不会再次从该通道读取。其余的调用者永远被卡住了。为了避免这种情况,您需要仔细考虑您的代码,小心处理阻塞调用,并保证不会发生线程饥饿。我们示例中的修复方法是使取消调用成为非阻塞的——我们的需求不需要阻塞调用。
评论已关闭。