使用时序数据和开源工具为您的边缘项目提供动力

InfluxData 是一个开源时序数据库平台。本文介绍如何将其用于边缘应用程序。
2 位读者喜欢这篇文章。
Puzzle pieces coming together to form a computer screen

Opensource.com

随着时间推移收集不断变化的数据被称为时序数据。如今,它已成为每个行业和生态系统的一部分。 它是不断增长的物联网领域的重要组成部分,并将成为人们日常生活中越来越重要的部分。但是,时序数据及其需求很难处理。 这是因为没有专门用于处理时序数据的工具。 在本文中,我将详细介绍这些问题以及 InfluxData 过去 10 年来如何努力解决这些问题。

InfluxData

InfluxData 是一个开源时序数据库平台。您可能通过 InfluxDB 了解该公司,但您可能不知道它专门从事时序数据库。这非常重要,因为在管理时序数据时,您会遇到两个问题 - 存储生命周期和查询。

在存储生命周期方面,开发人员通常最初会收集和分析高度详细的数据。但是,开发人员希望存储更小的、降采样的数据集,这些数据集描述了趋势,而不会占用太多存储空间。

查询数据库时,您不希望基于 ID 查询数据。 您希望基于时间范围查询。使用时序数据最常见的事情之一是在很长一段时间内对其进行汇总。当将数据存储在典型的关系数据库中(该数据库使用行和列来描述不同数据点之间的关系)时,这种查询速度很慢。 旨在处理时序数据的数据库可以以指数级的速度处理查询。 InfluxDB 有其自己的内置查询语言:Flux。 这是专门为查询时序数据集而构建的。

Image of how Telegraf works.

(Zoe Steinkamp,CC BY-SA 4.0)

数据采集

数据采集和数据操作可以通过一些很棒的工具开箱即用。 InfluxData 有超过 12 个客户端库,允许您使用您选择的编码语言编写和查询数据。 这是自定义用例的绝佳工具。 开源摄取代理 Telegraf 包括 300 多个输入和输出插件。 如果您是开发人员,您也可以贡献自己的插件。

InfluxDB 还可以接受 CSV 上传,用于小型历史数据集,以及批量导入,用于大型数据集。

import math
bicycles3 = from(bucket: "smartcity")
    |> range(start:2021-03-01T00:00:00z, stop: 2021-04-01T00:00:00z)
    |> filter(fn: (r) => r._measurement == "city_IoT")
    |> filter(fn: (r) => r._field == "counter")
    |> filter(fn: (r) => r.source == "bicycle")
    |> filter(fn: (r) => r.neighborhood_id == "3")
    |> aggregateWindow(every: 1h, fn: mean, createEmpty:false)

bicycles4 = from(bucket: "smartcity")
    |> range(start:2021-03-01T00:00:00z, stop: 2021-04-01T00:00:00z)
    |> filter(fn: (r) => r._measurement == "city_IoT")
    |> filter(fn: (r) => r._field == "counter")
    |> filter(fn: (r) => r.source == "bicycle")
    |> filter(fn: (r) => r.neighborhood_id == "4")
    |> aggregateWindow(every: 1h, fn: mean, createEmpty:false)

join(tables: {neighborhood_3: bicycles3, neighborhood_4: bicycles4}, on ["_time"], method: "inner")
    |> keep(columns: ["_time", "_value_neighborhood_3","_value_neighborhood_4"])
    |> map(fn: (r) => ({
        r with
        difference_value : math.abs(x: (r._value_neighborhood_3 - r._value_neighborhood_4))
    }))

Flux

Flux 是我们从头开始构建的内部查询语言,用于处理时序数据。它也是我们一些工具(包括任务、警报和通知)的基础动力。 要剖析上面的 flux 查询,您需要定义一些东西。 首先,“bucket”是我们所说的数据库。 您配置您的 bucket,然后将您的数据流添加到其中。 该查询调用 smartcity bucket,范围为特定的一天(准确地说是 24 小时)。 您可以从 bucket 中获取所有数据,但大多数用户都包含数据范围。 这是您可以做的最基本的 flux 查询。

接下来,我添加过滤器,将数据过滤到更精确和可管理的内容。 例如,我过滤分配给 id 为 3 的社区中的自行车数量。 从那里,我使用 aggregateWindow 获取每小时的平均值。 这意味着我期望收到一个包含 24 列的表,每小时一列。 我也对社区 4 执行完全相同的查询。 最后,我加入这两个表并获得这两个社区中自行车使用情况的差异。

如果您想知道哪些时间是交通繁忙时间,这非常棒。 显然,这只是 flux 查询的功能的一个小例子。 但它给出了 flux 附带的一些工具的很好的例子。 我还有大量的数据分析和统计函数。 但对于这一点,我建议查看 Flux 文档。

import "influxdata/influxdb/tasks"

option task = {name: PB_downsample, every: 1h, offset: 10s}
from(bucket: "plantbuddy")
    |>range(start: tasks.lastSuccess(orTime: -task.every))
    |>filter(fn: (r) => r["_measurement"] == "sensor_data")
    |>aggregateWindow(every: 10m, fn:last, createEmpty:false)
    |>yield(name: "last")
    |>to(bucket: "downsampled")

任务

InfluxDB 任务是一个计划的 Flux 脚本,它获取输入数据流并以某种方式修改或分析它。 然后,它将修改后的数据存储在一个新的 bucket 中或执行其他操作。 将较小的数据集存储到新的 bucket 中称为“降采样”,它是数据库的核心功能,也是时序数据生命周期的核心部分。

您可以在当前的任务示例中看到,我已经对数据进行了降采样。 我正在获取每 10 分钟增量的最后一个值,并将该值存储在降采样的 bucket 中。 原始数据集可能在这 10 分钟内有数千个数据点,但现在降采样的 bucket 只有 60 个新值。 需要注意的一件事是,我还在 range 中使用了 last success 函数。 这告诉 InfluxDB 从上次成功运行该任务时运行该任务,以防它在过去 2 个小时内失败,在这种情况下,它可以及时返回三个小时到上次成功运行。 这对于内置错误处理非常有用。

Image of the checks and alerts notification system.

(Zoe Steinkamp,CC BY-SA 4.0)

 

检查和警报

InfluxDB 包括一个警报或检查和通知系统。 这个系统非常简单。 您首先进行检查,定期查看数据中您定义的异常情况。 通常,这是用阈值定义的。 例如,任何低于 32°F 的温度值都将被分配一个 WARN 值,任何高于 32°F 的温度值都将被分配一个 OK 值,任何低于 0°F 的温度值都将被分配一个 CRITICAL 值。 从那里,您的检查可以按照您认为必要的频率运行。 有您的检查的记录历史以及每个检查的当前状态。 如果不需要设置通知,则不需要设置。 您可以根据需要引用您的警报历史记录。

许多人选择设置他们的通知。 为此,您需要定义一个通知端点。 例如,聊天应用程序可以进行 HTTP 调用以接收您的通知。 然后,您定义您希望何时收到通知,例如,您可以每小时运行一次检查。 您可以每 24 小时运行一次通知。 您可以让您的通知响应值的变化,例如从 WARNCRITICAL,或者当一个值是 CRITICAL 时,无论它是否从 OK 变为 WARN。 这是一个高度可定制的系统。 可以编辑从此系统创建的 Flux 代码。

Image of the new Edge feature.

(Zoe Steinkamp,CC BY-SA 4.0)

边缘

总而言之,我想将所有核心功能整合在一起,包括最近发布的一个非常特别的新功能。边缘到云是一个非常强大的工具,允许您运行开源 InfluxDB 并在本地存储您的数据,以防出现连接问题。 当连接恢复时,它会将数据流式传输到 InfluxData 云平台。

这对于边缘设备和重要数据非常重要,在这些数据中,任何数据丢失都是有害的。您定义您希望将一个 bucket 复制到云,然后该 bucket 具有一个磁盘支持的队列,用于在本地存储数据。然后,您定义您的云 bucket 应该复制到什么。数据存储在本地,直到连接到云。

InfluxDB 和物联网边缘

假设您有一个项目,您想使用连接到植物的物联网传感器 监控家用植物的健康状况。该项目设置为使用您的笔记本电脑作为边缘设备。当您的笔记本电脑关闭或以其他方式关闭时,它会在本地存储数据,然后在重新连接时将其流式传输到我的云 bucket。

Image showing how Plant buddy works.

(Zoe Steinkamp,CC BY-SA 4.0)

需要注意的一件事是,它会在将数据存储在复制 bucket 中之前对本地设备上的数据进行降采样。您的植物的传感器每秒提供一个数据点。但它会将数据压缩为一分钟的平均值,以便您存储更少的数据。在云帐户中,您可以添加一些警报和通知,让您知道植物的湿度低于一定水平并且需要浇水。还可以有一些视觉效果,您可以在网站上使用这些视觉效果来告诉用户有关其植物的健康状况。

数据库是许多应用程序的支柱。 在像 InfluxDB 这样的时序数据库平台中使用带时间戳的数据可以节省开发人员的时间,并使他们可以访问各种工具和服务。 InfluxDB 的维护人员很高兴看到人们在我们开源社区中构建的东西,因此请与我们联系并与他人分享您的项目和代码!

Headshot
Zoe Steinkamp 是 InfluxData 的开发者倡导者。她已经在 InfluxData 工作了三年多。在加入 InfluxData 之前,她担任前端工程师超过 7 年。她乐于分享关于数据库的知识,并且喜欢学习新的技术。她最初参加了一个训练营进行 Python 培训。

1 条评论

我是 InfluxDB 的忠实粉丝,特别是 Flux 查询语言。市面上有很多时序数据库选择,但没有一款能提供如此强大的方式来实际分析你收集的数据。绝对值得一试。

Creative Commons License本作品采用知识共享署名-相同方式共享 4.0 国际许可协议进行许可。
© . All rights reserved.