Go 编程的经验教训

通过学习如何解决这些常见的陷阱,防止未来并发处理的难题。
160 位读者喜欢这个。
gopher illustrations

Renee French。CC BY 3.0

当您使用复杂的分布式系统时,您很可能会遇到并发处理的需求。在 Mode.net,我们每天都在处理实时、快速且弹性的软件。如果没有高度并发的系统,构建一个以毫秒级动态路由数据包的全球专用网络是不可能的。这种动态路由是基于网络状态的,虽然这里有很多参数需要考虑,但我们的重点是链路 指标。在我们的上下文中,链路指标可以是与网络链路的状态或当前属性相关的任何内容(例如:链路延迟)。

链路指标的并发探测

H.A.L.O. (逐跳自适应链路状态最优路由),我们的动态路由算法部分依赖于链路指标来计算其路由表。这些指标由一个独立的组件收集,该组件位于每个 PoP(存在点)。PoP 是网络中代表单个路由实体的机器,通过链路连接并分布在多个位置,构成我们的网络。该组件使用网络数据包探测相邻机器,这些邻居将弹回初始探测。链路延迟值可以从接收到的探测中导出。因为每个 PoP 都有多个邻居,所以这种任务的本质是内在的并发性:我们需要实时测量每个相邻链路的延迟。我们无法承担顺序处理;为了计算这个指标,必须尽快处理每个探测。

latency computation graph

序列号和重置:重新排序的情况

我们的探测组件交换数据包,并依靠序列号进行数据包处理。这旨在避免处理数据包重复或乱序数据包。我们的第一个实现依赖于特殊的序列号 0 来重置序列号。这样的数字仅在组件初始化期间使用。主要问题是我们正在考虑一个递增的序列号值,该值始终从 0 开始。在组件重启后,可能会发生数据包重新排序,并且数据包很容易用重置前正在使用的值替换序列号。这意味着后续的数据包将被忽略,直到它达到重置之前正在使用的序列号。

UDP 握手和有限状态机

这里的问题是在组件重启后正确地协商序列号。有很多方法可以处理这个问题,在讨论了我们的选择后,我们选择实现一个具有明确状态定义的 3 次握手协议。此握手在初始化期间建立链路上的会话。这保证了节点在同一会话上通信,并为其使用适当的序列号。

为了正确实现这一点,我们必须定义一个具有明确状态和转换的有限状态机。这使我们能够正确管理握手形成的所有极端情况。

finite state machine diagram

会话 ID 由握手发起者生成。完整的交换序列如下

  1. 发送方发送一个 SYN (ID) 数据包。
  2. 接收方存储接收到的 ID 并发送一个 SYN-ACK (ID)
  3. 发送方接收到 SYN-ACK (ID) 并发送一个 ACK (ID) 它也开始发送序列号从 0 开始的数据包。
  4. 接收方检查最后接收到的 ID ,如果 ID 匹配,则接受 ACK (ID) 。它也开始接受序列号为 0 的数据包。

处理状态超时

基本上,在每个状态下,您最多需要处理三种类型的事件:链路事件、数据包事件和超时事件。这些事件是并发出现的,所以这里您必须正确处理并发。

  • 链路事件要么是链路启动,要么是链路关闭更新。这可以启动链路会话或中断现有会话。
  • 数据包事件是控制数据包 (SYN/SYN-ACK/ACK) 或只是探测响应。
  • 超时事件是在为当前会话状态调度的超时到期后触发的事件。

这里的主要挑战是如何处理并发超时到期和其他事件。而这正是人们很容易陷入死锁和竞争条件陷阱的地方。

第一种方法

用于这个项目的语言是 Golang。它确实提供了本机同步机制,例如本机通道和锁,并且能够为并发处理生成轻量级线程。

gophers hacking together

Gophers 一起黑客

作者:Ashley McNamara, CC BY-NC-SA 4.0

您可以首先设计一个结构来表示我们的 会话 超时处理程序

type Session struct {  
  State SessionState  
  Id SessionId  
  RemoteIp string  
}

type TimeoutHandler struct {  
  callback func(Session)  
  session Session  
  duration int  
  timer *timer.Timer  
}

会话 标识连接会话,包括会话 ID、相邻链路 IP 和当前会话状态。

TimeoutHandler 包含回调函数、它应该运行的会话、持续时间以及指向计划计时器的指针。

有一个全局映射,它将存储每个相邻链路会话的计划超时处理程序。

SessionTimeout map[Session]*TimeoutHandler

注册和取消超时是通过以下方法实现的

// schedules the timeout callback function.  
func (timeout* TimeoutHandler) Register() {   
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {   
    timeout.callback(timeout.session)   
  })   
}

func (timeout* TimeoutHandler) Cancel() {   
  if timeout.timer == nil {   
    return   
  }   
  timeout.timer.Stop()   
}

对于超时创建和存储,您可以使用如下方法

func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {  
  if sessionTimeout[session] == nil {  
    sessionTimeout[session] := new(TimeoutHandler)  
  }  
    
  timeout = sessionTimeout[session]  
  timeout.session = session  
  timeout.callback = callback  
  timeout.duration = duration  
  return timeout  
}

一旦超时处理程序被创建和注册,它会在 duration 秒过去后运行回调。但是,某些事件将需要您重新调度超时处理程序(就像在 SYN 状态下发生的那样——每 3 秒)。

为此,您可以让回调重新调度新的超时

func synCallback(session Session) {  
  sendSynPacket(session)

  // reschedules the same callback.  
  newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)  
  newTimeout.Register()

  sessionTimeout[state] = newTimeout  
}

此回调在新的超时处理程序中重新调度自身,并更新全局 sessionTimeout 映射。

数据竞争和引用

您的解决方案已准备就绪。一个简单的测试是检查超时回调是否在计时器过期后执行。为此,注册一个超时,睡眠其持续时间,然后检查回调操作是否完成。在测试执行后,最好取消计划的超时(因为它会重新调度),这样它就不会在测试之间产生副作用。

令人惊讶的是,这个简单的测试发现了解决方案中的一个错误。使用 cancel 方法取消超时根本不起作用。以下事件顺序会导致数据竞争条件

  1. 您有一个计划的超时处理程序。
  2. 线程 1

    a) 您收到一个控制数据包,现在您想取消注册的超时并继续下一个会话状态。(例如,在您发送 SYN 后收到 SYN-ACK )。

    b) 您调用 timeout.Cancel(),它调用 timer.Stop()。(请注意,Golang 计时器停止不会阻止已过期的计时器运行。)
  3. 线程 2

    a) 就在取消调用之前,计时器已过期,回调即将执行。

    b) 回调被执行,它调度一个新的超时并更新全局映射。
  4. 线程 1

    a) 转换为新的会话状态并注册一个新的超时,更新全局映射。

两个线程都在并发更新超时映射。最终结果是您未能取消注册的超时,然后您也丢失了对线程 2 完成的重新调度超时的引用。这会导致处理程序持续执行和重新调度一段时间,从而产生不需要的行为。

当锁不足够时

使用锁也不能完全解决问题。如果您在处理任何事件之前和执行回调之前添加锁,它仍然不能阻止过期的回调运行

func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {  
    stateLock.Lock()  
    defer stateLock.Unlock()

    timeout.callback(timeout.session)  
  })  
}

现在的区别在于全局映射中的更新是同步的,但这并不能阻止回调在您调用 timeout.Cancel() 后运行——如果计划的计时器已过期但尚未获取锁,则会发生这种情况。您应该再次丢失对已注册超时之一的引用。

使用取消通道

与其依赖 golang 的 timer.Stop()(它不会阻止过期的计时器执行),不如使用取消通道。

这是一种稍微不同的方法。现在您不会通过回调进行递归重新调度;相反,您注册一个无限循环,等待取消信号或超时事件。

新的 Register() 生成一个新的 go 线程,该线程在超时后运行您的回调,并在上一个超时执行后调度一个新的超时。取消通道返回给调用者,以控制循环何时应停止。

func (timeout *TimeoutHandler) Register() chan struct{} {  
  cancelChan := make(chan struct{})  
    
  go func () {  
    select {  
    case _ = <- cancelChan:  
      return  
    case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
      func () {  
        stateLock.Lock()  
        defer stateLock.Unlock()

        timeout.callback(timeout.session)  
      } ()  
    }  
  } ()

  return cancelChan  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
  timeout.cancelChan <- struct{}{}  
}

这种方法为您注册的每个超时提供一个取消通道。取消调用向通道发送一个空结构并触发取消。但是,这并不能解决之前的问题;超时可能在您通过通道调用取消之前,以及在超时线程获取锁之前过期。

这里的解决方案是在您获取锁在超时范围内检查取消通道。

  case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
    func () {  
      stateLock.Lock()  
      defer stateLock.Unlock()  
      
      select {  
      case _ = <- handler.cancelChan:  
        return  
      default:  
        timeout.callback(timeout.session)  
      }  
    } ()  
  }

最后,这保证了回调仅在您获取锁并且未触发取消后才执行。

提防死锁

这个解决方案似乎有效;但是,这里有一个隐藏的陷阱:死锁

请再次阅读上面的代码,并尝试自己找到它。考虑并发调用任何描述的方法。

这里的最后一个问题是取消通道本身。我们将其设为无缓冲通道,这意味着发送是一个阻塞调用。一旦您在超时处理程序中调用 cancel,您只有在该处理程序被取消后才能继续。这里的问题是当您对同一个取消通道进行多次调用时,取消请求只被消耗一次。如果并发事件要取消同一个超时处理程序,例如链路关闭或控制数据包事件,则很容易发生这种情况。这会导致死锁情况,可能会导致应用程序停止运行。

gophers on a wire, talking

有人在听吗?

作者:Trevor Forrey。经许可使用。

这里的解决方案是至少使通道缓冲为 1,这样发送就不会总是阻塞,并且在并发调用的情况下也显式地使发送成为非阻塞的。这保证了取消只发送一次,并且不会阻塞后续的取消调用。

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
    
  select {  
  case timeout.cancelChan <- struct{}{}:  
  default:  
    // can’t send on the channel, someone has already requested the cancellation.  
  }  
}

结论

您在实践中了解到,在使用并发代码时,常见的错误是如何出现的。由于它们的非确定性性质,即使经过广泛的测试,这些问题也很容易被检测不到。以下是我们在初始实现中遇到的三个主要问题。

在没有同步的情况下更新共享数据

这似乎是显而易见的,但如果您的并发更新发生在不同的位置,则实际上很难发现。结果是数据竞争,其中对同一数据的多次更新可能会导致更新丢失,因为一个更新覆盖了另一个更新。在我们的案例中,我们正在更新同一共享映射上的计划超时引用。(有趣的是,如果 Go 检测到在同一个 Map 对象上的并发读/写,它会抛出一个致命错误——您可以尝试运行 Go 的数据竞争检测器)。这最终会导致丢失超时引用,并使其无法取消给定的超时。始终记住在需要时使用锁。

gopher assembly line

不要忘记同步 gophers 的工作

CC BY 3.0

缺少条件检查

在您不能仅依赖锁的排他性的情况下,需要进行条件检查。我们的情况有点不同,但核心思想与 条件变量 相同。想象一下一个经典的情况,您有一个生产者和多个消费者使用共享队列。生产者可以向队列添加一个项目并唤醒所有消费者。唤醒调用意味着队列中有一些数据可用,并且由于队列是共享的,因此必须通过锁同步访问。每个消费者都有机会获取锁;但是,您仍然需要检查队列中是否有项目。需要进行条件检查,因为在您获取锁时您不知道队列状态。

在我们的示例中,超时处理程序从计时器过期收到“唤醒”调用,但它仍然需要检查是否已向其发送取消信号,然后才能继续执行回调。

gopher boot camp

如果您唤醒多个 gophers,可能需要条件检查

CC BY 3.0

死锁

当一个线程被卡住,无限期地等待唤醒信号时,就会发生这种情况,但这个信号永远不会到达。这些可能会通过停止您的整个程序执行来完全杀死您的应用程序。

在我们的案例中,这是由于对非缓冲和阻塞通道的多次发送调用而发生的。这意味着发送调用只有在同一通道上完成接收后才会返回。我们的超时线程循环及时地接收取消通道上的信号;但是,在收到第一个信号后,它会中断循环,并且永远不会再次从该通道读取。其余的调用者永远被卡住了。为了避免这种情况,您需要仔细考虑您的代码,小心处理阻塞调用,并保证不会发生线程饥饿。我们示例中的修复方法是使取消调用成为非阻塞的——我们的需求不需要阻塞调用。

接下来阅读什么
User profile image.
Eduardo Ferreira 是 Mode.net 的高级软件工程师。他拥有里约热内卢联邦大学 (UFRJ) 的计算机科学学士学位,并在康奈尔大学留学一年。他的主要兴趣领域是软件工程、分布式系统、计算机网络和对等架构。

评论已关闭。

Creative Commons License本作品根据 Creative Commons Attribution-Share Alike 4.0 International License 获得许可。
© . All rights reserved.