如果你已经构建了批处理数据管道,但不知道检查其健康状况或测试潜在回归的最佳方法,那么这篇文章适合你。
监控和测试批处理数据管道需要与监控和测试 Web 服务不同的方法。在 Python 中构建一个健壮的数据管道流程是一回事,但找到工具并构建框架以提供数据系统健康的信心是完全不同的挑战。为了真正地迭代和开发代码库,开发人员必须能够在开发过程中自信地进行测试,并监控生产系统。本文提供了一些关于监控和配置数据管道警报的解决方案。
监控数据管道 vs. Web 服务
那么,监控数据管道与监控 Web 服务有何不同?监控任何系统的核心原则在数据管道和 Web 服务之间是可以直接转换的。如何监控是开始出现差异的地方,因为数据管道本质上具有不同的健康指标。下表概述了常见的健康指标,并比较了 Web 服务和批处理数据服务对这些指标的监控。
Web 服务 | 数据管道 | |
---|---|---|
健康检查 | 具有某种健康检查端点,并检查当您 ping /healthcheck 时,您会得到 200 状态代码。 |
检查作业是否成功。 |
集成测试 | POST 到一个端点,并期望从相应的 GET 端点获取正确的数据。 |
验证一些伪造的数据是否通过了数据转换。(如果没有简单的方法将伪造的数据输入数据管道,这可能很难复制。) |
延迟 | 测量 API 的平均响应时间。 | 测量数据管道完成所需的时间。 |
监控工具
像 StatsD 和 Prometheus 这样的开源工具通常用于收集指标和配置警报。除了持续收集指标外,还可以在像 Nagios 这样的工具上运行集成测试,Nagios 主要检查成功的退出代码,我还曾在其他团队中使用 Jenkins 定期运行集成测试,以检查构建是否仍然通过。所有这些工具都可以推断出来以支持数据服务和 Web 服务。
时间序列数据
这些监控工具的数据模型都不同。StatsD 具有聚合指标,Nagios 具有退出状态代码,Prometheus 利用时间序列数据。在监控领域,时间序列数据特别强大,因为它监控整个系统随时间推移的行为,包括系统的当前状态以及系统的变化。使用时间序列数据的挑战在于,因为你现在基本上存储了每个时间点的数据点,所以数据集可能会增长得非常快。如果没有强大的数据模型,很难查询和理解时间序列数据。这就是 Prometheus 作为强大监控工具的用武之地。Prometheus 的 核心功能之一是:"具有多维 数据模型,时间序列数据由指标名称和键/值对标识。"
Prometheus
我在 Button 的团队在我们的监控堆栈中大量利用 Prometheus,因此本博客中的示例将专门针对 Prometheus,但一般概念可以推广到任何具有时间序列指标的监控工具。用它自己的 话来说
"Prometheus 非常适合记录任何纯数字时间序列。它既适用于以机器为中心的监控,也适用于高度动态的面向服务的架构的监控。在微服务世界中,它对多维数据收集和查询的支持是一个特别的优势。"
Prometheus 服务器通过 HTTP 抓取数据,这意味着你的服务需要有一个暴露给 Prometheus 服务器的 HTTP 端点才能启用数据收集。也支持通过中间推送网关推送时间序列数据。
时间序列指标
从时间序列指标的角度来看,测量 Web 服务和数据管道之间的区别在于实时系统和批处理系统之间的区别。当实时系统“在线”并服务或处理流量时,它会持续产生健康的信号,而批处理系统可能更加零星,并且“停机时间”在两者中可能并不意味着相同的事情。如果批处理系统报告它“无法访问”,这并不一定意味着它没有运行;它可能只是意味着它没有任何数据要处理。
Web 服务
你可以通过检查连续、成功的状态代码来评估 Web 服务的健康状况,并通过查找相对较低的响应时间来监控延迟。
为了在 Prometheus 中做到这一点,你在你的代码中创建和抓取指标。例如,你可能想知道响应时间和总请求指标
import prometheus_client
response_time = prometheus_client.Histogram(
'http_response_time_seconds',
'Histogram of requests processing time.',
['method', 'path'])
requests_total = prometheus_client.Counter(
'http_requests_total',
'Total count of requests',
['method', 'path', 'code'])
计数器发送已发出的总请求数,因为计数器是 Prometheus 中的累积指标,它随着发出更多请求而增加。在此示例中,Prometheus 中的数据将显示所有历史请求计数,这些请求是针对在标签中配置的 url 路径发出的,以及 code
标签中相应的响应状态代码。
直方图将请求持续时间放入存储桶中,并支持基于特定时间段内第 90 个百分位的响应时间发出警报。
在你的 API 中间件中,你希望在请求进入时记录这些指标。
def record_status_code(self, request, status_code):
if hasattr(request, '_start_time'):
requests_total.labels(
request.method,
resolve(request.path_info).url_name,
str(status_code)
).inc()
def record_time(self, request):
if hasattr(request, '_start_time'):
seconds = (time.time() - request._start_time)
response_time.labels(
request.method,
resolve(request.path_info).url_name,
).observe(seconds)
为了评估(并接收关于)系统的健康状况,你将需要知道在特定时间内具有成功状态的请求计数器的变化率是否为 0,这表明在该时间段内没有任何状态代码为 200 的请求。
rate(http_requests_total{code="200"}[1m]) == 0
或者,你也可以在非 200 代码的变化率不为 0 时发出警报。
rate(http_requests_total{code!="200"}[1m]) != 0
你还可以根据 API 请求的延迟触发警报。以下 Prometheus 查询计算前一分钟内请求持续时间的第 90 个百分位。
histogram_quantile(0.90, rate(http_response_time_seconds[1m]))
如果结果超过某个阈值,则可能需要发出警报。
数据管道
用于数据管道的指标略有不同。我们不测量响应时间和响应状态代码,而是希望测量数据管道的运行时间以及它花费了多长时间或处理了多少数据。为了做到这一点,我们将使用一个 gauge 来测量上次批处理作业成功的时间。我们还可以使用 summary 测量数据管道成功所花费的时间——这相当于批处理数据管道的延迟。
要发送的指标
job_last_success_unixtime = prometheus_client.Gauge('job_last_success_unixtime',
'Time of last successful batch job')
job_duration_seconds = prometheus_client.Summary('job_duration_seconds',
'Duration of batch job run in seconds')
指标在管道末端计算如下
with job_duration_seconds.time():
run_pipeline()
time_now = int(time.time())
job_last_success_unixtime.set(time_now)
对批处理作业的健康状况发出警报的明确方法是检查上次作业成功的时间是否在预期的时间间隔内。例如,如果你希望你的作业每小时运行五分钟,那么如果上次成功的时间是在三个多小时前,这可能表明系统不健康。Prometheus 中的警报如下所示
Alert IF time() - job_last_success_unixtime > (3 * 60 * 60)
for 5min
请注意,此方法仅适用于暴露端口以进行抓取的长时间运行的批处理作业。对于短时间运行的作业,例如周期性 cron 作业,Prometheus 有一个 推送网关,用于将指标推送到 Prometheus 服务器,因为它们无法被抓取。
回归测试和统计
除了衡量系统的健康状况外,最好让数据管道输出一些关于其结果的核心统计数据。假设你的基础数据管道只是运行一个 csv
转储并生成数值的总数、平均值和标准差。你也应该能够输出诸如数据验证检查之类的内容,例如,由于无效数据而拒绝的记录数、超出两个标准差的数据点、读取的总记录数、处理的总记录数等。
这些基本统计数据也可以用于回归测试,以验证代码更改是否通过针对相同的基线数据运行不同的代码库来大幅更改系统。
设置阈值:建立基线
就像在 Web 服务中一样,被认为是“健康流量模式”的内容可能因应用程序而异,健康数据管道的概念也可能因管道的用途而异。可能需要一些关于你的特定管道细微差别的经验才能识别任何给定的信号是令人担忧的还是正常的。
要了解更多信息,请参加 Jiaqi Liu 在 PyCon Cleveland 2018 上举行的题为“构建以测试为中心的数据管道”的演讲。
评论已关闭。