去年十一月,我们买了一辆电动汽车,这引发了一个有趣的问题:我们应该在什么时候充电?我关心的是为汽车充电所用电力的排放量要尽可能低,所以这是一个具体的问题:在任何给定时间,每千瓦时的二氧化碳排放率是多少?以及一天中什么时候排放率最低?
查找数据
我住在纽约州。我们大约 80% 的电力来自州内发电,主要通过天然气、水坝(大部分来自尼亚加拉瀑布)、核能以及少量风能、太阳能和其他化石燃料。整个系统由纽约独立系统运营商 (NYISO) 管理,这是一个非营利性实体,旨在平衡发电厂、消费者和监管机构的需求,以保持纽约州的电力供应。
虽然没有官方的公共 API,但作为其使命的一部分,NYISO 提供了大量的开放数据供公众使用。这包括报告在全州范围内,以五分钟为间隔,用于发电的燃料类型。这些数据以 CSV 文件的形式发布在公共存档中,并在全天更新。如果您知道来自不同类型燃料的兆瓦数,您就可以合理地估算出在任何给定时间排放多少二氧化碳。
在构建工具来收集和处理开放数据时,我们应该始终保持友善,以避免系统过载。我们可以做得更好,而不是让每个人都访问他们的存档服务来一直下载文件。我们可以创建一个低开销的事件流,人们可以订阅该事件流并在更新发生时获取更新。我们可以使用 MQTT 来做到这一点。我的项目 (ny-power.org) 的目标是纳入 Home Assistant 项目,这是一个拥有数十万用户的开源家庭自动化平台。如果所有这些用户一直访问这个 CSV 服务器,NYISO 可能需要限制对其的访问。
什么是 MQTT?
MQTT 是一种发布/订阅 (pubsub) 线协议,专为小型设备而设计。Pubsub 系统的工作方式类似于消息总线。您向一个主题发送消息,任何订阅了该主题的软件都会收到您的消息副本。作为发送者,您永远不知道谁在监听;您只需将您的信息提供给一组主题,并监听您可能关心的任何其他主题。这就像走进一个聚会,倾听有趣的对话并加入一样。
这可以使应用程序极其高效。客户端订阅一小部分主题,并且只接收他们正在寻找的信息。这节省了处理时间和网络带宽。
作为一种开放标准,MQTT 有许多客户端和服务器的开源实现。有您可以想象到的每种语言的客户端库,甚至有一个您可以嵌入到 Arduino 中以构建传感器网络的库。有许多服务器可供选择。我首选 Eclipse 的 Mosquitto 服务器,因为它体积小,用 C 语言编写,并且可以轻松处理数万个订阅者。
我为什么喜欢 MQTT
在过去的二十年中,我们已经提出了成熟可靠的软件应用程序向服务提出问题的模型。“我有更多电子邮件吗?”“今天天气怎么样?”“我现在应该买这个东西吗?” 这种“请求/接收”模式在很多时候都运行良好;然而,在一个数据泛滥的世界中,我们需要其他模式。MQTT pubsub 模型在大量数据发布到系统时非常强大。客户端可以订阅少量数据,并在数据进入时立即接收更新。
MQTT 还具有其他有趣的功能,例如“遗嘱和遗言”消息,这使得区分沉默是因为没有相关数据还是沉默是因为您的数据收集器崩溃成为可能。MQTT 还具有保留消息,当客户端首次连接时,它向客户端提供主题上的最后一条消息。这对于更新缓慢的主题非常有用。
在我与 Home Assistant 项目的合作中,我发现这种消息总线模型非常适用于异构系统。如果您深入研究物联网领域,您会很快发现 MQTT 无处不在。
我们的第一个 MQTT 流
NYSO 的 CSV 文件之一是实时燃料组合。每五分钟,它都会更新燃料来源和该时间段内产生的电力(以兆瓦为单位)。
CSV 文件看起来像这样
时间戳 | 时区 | 燃料类别 | 发电兆瓦 |
---|---|---|---|
05/09/2018 00:05:00 | EDT | 双燃料 | 1400 |
05/09/2018 00:05:00 | EDT | 天然气 | 2144 |
05/09/2018 00:05:00 | EDT | 核能 | 4114 |
05/09/2018 00:05:00 | EDT | 其他化石燃料 | 4 |
05/09/2018 00:05:00 | EDT | 其他可再生能源 | 226 |
05/09/2018 00:05:00 | EDT | 风能 | 41 |
05/09/2018 00:05:00 | EDT | 水力 | 3229 |
05/09/2018 00:10:00 | EDT | 双燃料 | 1307 |
05/09/2018 00:10:00 | EDT | 天然气 | 2092 |
05/09/2018 00:10:00 | EDT | 核能 | 4115 |
05/09/2018 00:10:00 | EDT | 其他化石燃料 | 4 |
05/09/2018 00:10:00 | EDT | 其他可再生能源 | 224 |
05/09/2018 00:10:00 | EDT | 风能 | 40 |
05/09/2018 00:10:00 | EDT | 水力 | 3166 |
表中唯一奇怪的是双燃料类别。纽约的大多数天然气发电厂也可以燃烧其他化石燃料来发电。在冬季的寒潮期间,天然气供应受到限制,其家庭供暖用途优先于发电。这种情况发生的频率足够低,以至于我们可以将双燃料视为天然气(用于我们的计算)。
该文件会在全天更新。我创建了一个简单的数据泵,每分钟轮询文件并查找更新。它将任何新条目发布到 MQTT 服务器,进入一组主要镜像此 CSV 文件的主题。有效负载被转换为 JSON 对象,几乎可以从任何编程语言中轻松解析。
ny-power/upstream/fuel-mix/Hydro {"units": "MW", "value": 3229, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Dual Fuel {"units": "MW", "value": 1400, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Natural Gas {"units": "MW", "value": 2144, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Other Fossil Fuels {"units": "MW", "value": 4, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Wind {"units": "MW", "value": 41, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Other Renewables {"units": "MW", "value": 226, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Nuclear {"units": "MW", "value": 4114, "ts": "05/09/2018 00:05:00"}
这种直接反映是将开放数据转化为开放事件的第一步。我们将把它转换成二氧化碳强度,但其他应用程序可能需要这些原始数据源来进行其他计算。
MQTT 主题
主题和主题结构是 MQTT 的主要设计要点之一。与更“企业级”的消息总线不同,在 MQTT 中,主题不是预先注册的。发送者可以动态创建主题,唯一的限制是它们少于 220 个字符。/
字符是特殊的;它用于创建主题层次结构。正如我们很快将看到的,您可以订阅这些层次结构中的数据切片。
在 Mosquitto 中,每个客户端都可以发布到任何主题。虽然这对于原型设计来说很棒,但在投入生产之前,您需要添加访问控制列表 (ACL) 以限制对授权应用程序的写入。例如,我的应用程序的树以只读格式对所有人开放,但只有具有特定凭据的客户端才能发布到它。
主题没有自动模式,也没有办法发现客户端将发布的所有可能主题。您必须将这种理解直接编码到任何使用 MQTT 总线的应用程序中。
那么您应该如何设计您的主题呢?最佳实践是从应用程序特定的根名称开始,在我们的例子中是 ny-power
。之后,根据您对高效订阅的需求构建尽可能深的层次结构。upstream
树将包含直接来自上游源的数据,而无需任何处理。我们的 fuel-mix
类别是一种特定类型的数据。我们稍后可能会添加其他类别。
订阅主题
MQTT 中的订阅是简单的字符串匹配。为了处理效率,只允许使用两个通配符
#
递归地匹配到结尾的所有内容+
仅匹配到下一个/
字符
用一些例子来解释这一点最容易
ny-power/# - match everything published by the ny-power app
ny-power/upstream/# - match all raw data
ny-power/upstream/fuel-mix/+ - match all fuel types
ny-power/+/+/Hydro - match everything about Hydro power that's
nested 2 deep (even if it's not in the upstream tree)
像 ny-power/#
这样的广泛订阅对于低容量应用程序来说很常见。只需通过网络获取所有内容并在您自己的应用程序中处理即可。这对于高容量应用程序来说效果不佳,因为大部分网络带宽将被浪费,因为您会丢弃大部分消息。
为了在高容量下保持性能,应用程序将进行一些巧妙的主题切片,例如 ny-power/+/+/Hydro
,以准确获取他们所需的数据横截面。
添加我们的下一层数据
从这一点开始,应用程序中的所有内容都将基于现有的 MQTT 流工作。第一个附加的数据层是计算电力的二氧化碳强度。
使用 2016 年 美国能源信息署 关于纽约州按燃料类型划分的总排放量和总电量的数字,我们可以得出每兆瓦时电力的平均排放率。
这被封装在一个专门的微服务中。它订阅了 ny-power/upstream/fuel-mix/+
,它匹配来自数据泵的所有上游燃料组合条目。然后它执行计算并发布到一个新的主题树
ny-power/computed/co2 {"units": "g / kWh", "value": 152.9486, "ts": "05/09/2018 00:05:00"}
反过来,还有另一个进程订阅这个主题树,并将该数据存档到 InfluxDB 实例中。然后它将 24 小时时间序列发布到 ny-power/archive/co2/24h
,这使得绘制最近的变化图变得容易。
这种分层模型运行良好,因为这些程序中的每一个的逻辑都可以彼此不同。在一个更复杂的系统中,它们甚至可能不使用相同的编程语言。我们不在乎,因为交换格式是 MQTT 消息,具有众所周知的主题和 JSON 有效负载。
从命令行消费
为了感受 MQTT 的实际运行,只需将其连接到总线并查看消息流非常有用。mosquitto-clients
包中包含的 mosquitto_sub
程序是一种简单的方法。
安装完成后,您需要提供服务器主机名和您想要监听的主题。如果您想查看正在发布到的主题,-v
标志非常重要。如果没有它,您将只看到有效负载。
mosquitto_sub -h mqtt.ny-power.org -t ny-power/# -v
每当我编写或调试 MQTT 应用程序时,我总是会运行一个带有 mosquitto_sub
的终端。
直接从 Web 访问 MQTT
我们现在有一个应用程序提供开放事件流。我们可以使用我们的微服务连接到它,并且借助一些命令行工具,它可以在互联网上供所有人查看。但 Web 仍然是王者,因此直接将其放入用户的浏览器中非常重要。
MQTT 的人员考虑到了这一点。该协议规范旨在通过三种传输协议工作:TCP、UDP 和 WebSocket。所有主流浏览器都支持 WebSocket,作为为实时应用程序保留持久连接的一种方式。
Eclipse 项目有一个名为 Paho 的 MQTT JavaScript 实现,可以包含在您的应用程序中。模式是连接到主机,设置一些订阅,然后对收到的消息做出反应。
// ny-power web console application
var client = new Paho.MQTT.Client(mqttHost, Number("80"), "client-" + Math.random());
// set callback handlers
client.onMessageArrived = onMessageArrived;
// connect the client
client.reconnect = true;
client.connect({onSuccess: onConnect});
// called when the client connects
function onConnect() {
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("ny-power/computed/co2");
client.subscribe("ny-power/archive/co2/24h");
client.subscribe("ny-power/upstream/fuel-mix/#");
}
// called when a message arrives
function onMessageArrived(message) {
console.log("onMessageArrived:"+message.destinationName + message.payloadString);
if (message.destinationName == "ny-power/computed/co2") {
var data = JSON.parse(message.payloadString);
$("#co2-per-kwh").html(Math.round(data.value));
$("#co2-units").html(data.units);
$("#co2-updated").html(data.ts);
}
if (message.destinationName.startsWith("ny-power/upstream/fuel-mix")) {
fuel_mix_graph(message);
}
if (message.destinationName == "ny-power/archive/co2/24h") {
var data = JSON.parse(message.payloadString);
var plot = [
{
x: data.ts,
y: data.values,
type: 'scatter'
}
];
var layout = {
yaxis: {
title: "g CO2 / kWh",
}
};
Plotly.newPlot('co2_graph', plot, layout);
}
此应用程序订阅了许多主题,因为我们将显示几种不同类型的数据。ny-power/computed/co2
主题为我们提供了当前强度的最高数字。每当我们收到该主题时,我们都会替换站点上的相关内容。

NY ISO 电网 CO2 强度图表来自 /ny-power.org/#
ny-power/archive/co2/24h
主题提供了一个时间序列,可以加载到 Plotly 折线图中。ny-power/upstream/fuel-mix
提供了提供当前燃料组合的漂亮条形图所需的数据。

NYISO 电网上的燃料组合,ny-power.org。
这是一个动态网站,它不轮询服务器。它连接到 MQTT 总线并监听其开放的 WebSocket。该网页是一个发布/订阅客户端,就像数据泵和存档器一样。只是这个碰巧在您的浏览器而不是云中的微服务中执行。
您可以在 http://ny-power.org 上查看该页面的实际效果。其中包括图形和一个实时 MQTT 控制台,用于查看传入的消息。
深入了解
整个 ny-power.org 应用程序都在 GitHub 上开源。您还可以查看此架构概述 ,了解它是如何构建为一组使用 Helm 部署的 Kubernetes 微服务的。您可以使用 此代码模式 查看另一个有趣的 MQTT 应用程序示例,该模式使用 MQTT 和 OpenWhisk 实时翻译文本消息。
MQTT 广泛用于物联网领域,在 Home Assistant 项目中可以找到更多 MQTT 用例示例。
如果您想深入了解该协议,mqtt.org 提供了此开放标准的全部详细信息。
要了解更多信息,请参加 Sean Dague 在 OSCON 上的演讲 将 MQTT 添加到您的工具包,OSCON 将于 7 月 16 日至 19 日在俄勒冈州波特兰举行。
评论已关闭。