作为一名软件工程师,我有过一些时刻,当被要求执行一项让我不寒而栗的任务时。其中一个时刻是当我必须编写一些需要 C 的新硬件基础设施和主要使用 Python 的云基础设施之间的接口时。
一种策略可能是用 C 编写扩展,Python 在设计上支持这一点。快速浏览一下文档就会发现,这将意味着要编写大量的 C 代码。在某些情况下这可能很好,但这不是我喜欢做的事情。另一种策略是将这两个任务放在不同的进程中,并使用 ZeroMQ 消息库在两者之间交换消息。
在我发现 ZeroMQ 之前,当我遇到这种类型的场景时,我走了编写扩展的道路。这还不错,但是非常耗时且复杂。如今,为了避免这种情况,我将系统细分为独立的进程,这些进程通过在 通信套接字上发送的消息来交换信息。通过这种方法,几种编程语言可以共存,并且每个进程都更简单,因此更容易调试。
ZeroMQ 提供了一个更简单的过程
- 用 C 编写一个小的垫片程序,从硬件读取数据,并将找到的任何内容作为消息发送。
- 编写新的基础设施和现有基础设施之间的 Python 接口。
ZeroMQ 项目的创始人之一是 Pieter Hintjens,他是一位杰出的人物,拥有有趣的观点和著作。
前提条件
本教程需要
在 Fedora 上安装它们,使用
$ dnf install clang zeromq zeromq-devel python3 python3-zmq
对于 Debian 或 Ubuntu
$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq
如果您遇到任何问题,请参考每个项目的安装说明(链接如上)。
编写硬件接口库
由于这是一个假设的场景,本教程将编写一个虚构的库,其中包含两个函数
- fancyhw_init() 用于初始化(假设的)硬件
- fancyhw_read_val() 用于返回从硬件读取的值
将库的完整源代码保存到名为 libfancyhw.h 的文件中
#ifndef LIBFANCYHW_H
#define LIBFANCYHW_H
#include <stdlib.h>
#include <stdint.h>
// This is the fictitious hardware interfacing library
void fancyhw_init(unsigned int init_param)
{
srand(init_param);
}
int16_t fancyhw_read_val(void)
{
return (int16_t)rand();
}
#endif
由于有随机数生成器,该库可以模拟您想要在语言之间传递的数据。
设计 C 接口
以下将逐步介绍如何编写 C 接口——从包含库到管理数据传输。
库
首先加载必要的库(每个库的用途都在代码注释中)
// For printf()
#include <stdio.h>
// For EXIT_*
#include <stdlib.h>
// For memcpy()
#include <string.h>
// For sleep()
#include <unistd.h>
#include <zmq.h>
#include "libfancyhw.h"
重要参数
定义 main 函数和程序其余部分所需的重要参数
int main(void)
{
const unsigned int INIT_PARAM = 12345;
const unsigned int REPETITIONS = 10;
const unsigned int PACKET_SIZE = 16;
const char *TOPIC = "fancyhw_data";
...
初始化
两个库都需要一些初始化。虚构的库只需要一个参数
fancyhw_init(INIT_PARAM);
ZeroMQ 库需要一些真正的初始化。首先,定义一个上下文——一个管理所有套接字的对象
void *context = zmq_ctx_new();
if (!context)
{
printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
然后定义用于传递数据的套接字。ZeroMQ 支持多种类型的套接字,每种套接字都有其应用。使用发布套接字(也称为 PUB 套接字),它可以将消息的副本传递给多个接收者。这种方法使您可以连接多个接收者,它们都将获得相同的消息。如果没有接收者,消息将被丢弃(即,它们不会排队)。使用以下方法执行此操作
void *data_socket = zmq_socket(context, ZMQ_PUB);
套接字必须绑定到一个地址,以便客户端知道在哪里连接。在这种情况下,使用 TCP 传输层(还有其他选项,但 TCP 是一个很好的默认选择)
const int rb = zmq_bind(data_socket, "tcp://*:5555");
if (rb != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
接下来,计算稍后需要的一些有用值。注意下面代码中的 TOPIC;PUB 套接字需要一个与它们发送的消息关联的主题。接收者可以使用主题来过滤消息
const size_t topic_size = strlen(TOPIC);
const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);
printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);
发送消息
启动一个循环,发送 REPETITIONS 条消息
for (unsigned int i = 0; i < REPETITIONS; i++)
{
...
在发送消息之前,填充 PACKET_SIZE 个值的缓冲区。该库提供 16 位有符号整数。由于 C 中 int 的维度未定义,因此请使用具有特定宽度的 int
int16_t buffer[PACKET_SIZE];
for (unsigned int j = 0; j < PACKET_SIZE; j++)
{
buffer[j] = fancyhw_read_val();
}
printf("Read %u data values\n", PACKET_SIZE);
消息准备和传递的第一步是创建 ZeroMQ 消息并分配消息所需的内存。这个空消息是一个信封,用于存储您将要发送的数据
zmq_msg_t envelope;
const int rmi = zmq_msg_init_size(&envelope, envelope_size);
if (rmi != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));
zmq_msg_close(&envelope);
break;
}
现在内存已分配,将数据存储在 ZeroMQ 消息“信封”中。zmq_msg_data() 函数返回指向信封中缓冲区开头的指针。第一部分是主题,后跟一个空格,然后是二进制数据。添加空格作为主题和数据之间的分隔符。要沿着缓冲区移动,您必须使用强制类型转换和指针运算。(谢谢你,C,让事情变得如此简单。)使用以下方法执行此操作
memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));
通过 data_socket 发送消息
const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
if (rs != envelope_size)
{
printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));
zmq_msg_close(&envelope);
break;
}
确保在使用后处理信封
zmq_msg_close(&envelope);
printf("Message sent; i: %u, topic: %s\n", i, TOPIC);
清理
由于 C 不提供 垃圾回收,因此您必须进行清理。在您完成发送消息后,关闭程序并进行清理以释放已使用的内存
const int rc = zmq_close(data_socket);
if (rc != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
const int rd = zmq_ctx_destroy(context);
if (rd != 0)
{
printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
完整的 C 程序
将完整的接口库保存在名为 hw_interface.c 的本地文件中
// For printf()
#include <stdio.h>
// For EXIT_*
#include <stdlib.h>
// For memcpy()
#include <string.h>
// For sleep()
#include <unistd.h>
#include <zmq.h>
#include "libfancyhw.h"
int main(void)
{
const unsigned int INIT_PARAM = 12345;
const unsigned int REPETITIONS = 10;
const unsigned int PACKET_SIZE = 16;
const char *TOPIC = "fancyhw_data";
fancyhw_init(INIT_PARAM);
void *context = zmq_ctx_new();
if (!context)
{
printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
void *data_socket = zmq_socket(context, ZMQ_PUB);
const int rb = zmq_bind(data_socket, "tcp://*:5555");
if (rb != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
const size_t topic_size = strlen(TOPIC);
const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);
printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);
for (unsigned int i = 0; i < REPETITIONS; i++)
{
int16_t buffer[PACKET_SIZE];
for (unsigned int j = 0; j < PACKET_SIZE; j++)
{
buffer[j] = fancyhw_read_val();
}
printf("Read %u data values\n", PACKET_SIZE);
zmq_msg_t envelope;
const int rmi = zmq_msg_init_size(&envelope, envelope_size);
if (rmi != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));
zmq_msg_close(&envelope);
break;
}
memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));
const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
if (rs != envelope_size)
{
printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));
zmq_msg_close(&envelope);
break;
}
zmq_msg_close(&envelope);
printf("Message sent; i: %u, topic: %s\n", i, TOPIC);
sleep(1);
}
const int rc = zmq_close(data_socket);
if (rc != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
const int rd = zmq_ctx_destroy(context);
if (rd != 0)
{
printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
使用以下命令编译
$ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface
如果没有编译错误,您可以运行该接口。最棒的是,ZeroMQ PUB 套接字可以在没有任何应用程序发送或检索数据的情况下运行。这降低了复杂性,因为在哪个进程需要先启动方面没有义务。
运行接口
$ ./hw_interface
Topic: fancyhw_data; topic size: 12; Envelope size: 45
Read 16 data values
Message sent; i: 0, topic: fancyhw_data
Read 16 data values
Message sent; i: 1, topic: fancyhw_data
Read 16 data values
...
...
输出显示正在通过 ZeroMQ 发送的数据。现在您需要一个应用程序来读取数据。
编写 Python 数据处理器
现在您可以准备好将数据从 C 传递到 Python 应用程序。
库
您需要两个库来帮助传输数据。首先,您需要在 Python 中使用 ZeroMQ 绑定
$ python3 -m pip install zmq
另一个是 struct 库,它解码二进制数据。它通常与 Python 标准库一起提供,因此无需 pip install 它。
Python 程序的第一个部分导入这两个库
import zmq
import struct
重要参数
要使用 ZeroMQ,您必须订阅与上面常量 TOPIC 中使用的主题相同的主题
topic = "fancyhw_data".encode('ascii')
print("Reading messages with topic: {}".format(topic))
初始化
接下来,初始化上下文和套接字。使用订阅套接字(也称为 SUB 套接字),它是 PUB 套接字的天然伙伴。套接字还需要订阅正确的主题
with zmq.Context() as context:
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5555")
socket.setsockopt(zmq.SUBSCRIBE, topic)
i = 0
...
接收消息
启动一个无限循环,等待新消息传递到 SUB 套接字。如果您按下 Ctrl+C 或发生错误,循环将关闭
try:
while True:
... # we will fill this in next
except KeyboardInterrupt:
socket.close()
except Exception as error:
print("ERROR: {}".format(error))
socket.close()
循环等待新消息通过 recv() 方法到达。然后它在第一个空格处拆分接收到的任何内容,以将主题与内容分开
binary_topic, data_buffer = socket.recv().split(b' ', 1)
解码消息
Python 尚不知道主题是字符串,因此使用标准 ASCII 编码对其进行解码
topic = binary_topic.decode(encoding = 'ascii')
print("Message {:d}:".format(i))
print("\ttopic: '{}'".format(topic))
下一步是使用 struct 库读取二进制数据,该库可以将无定形的二进制数据块转换为有意义的值。首先,计算数据包中存储的值的数量。此示例使用 16 位有符号整数,它们对应于 struct 格式中的“h”
packet_size = len(data_buffer) // struct.calcsize("h")
print("\tpacket size: {:d}".format(packet_size))
通过了解数据包中有多少个值,您可以通过准备一个包含值数量及其类型的字符串来定义格式(例如,“16h”)
struct_format = "{:d}h".format(packet_size)
将二进制数据块转换为您可以立即打印的一系列数字
data = struct.unpack(struct_format, data_buffer)
print("\tdata: {}".format(data))
完整的 Python 程序
这是完整的 Python 数据接收器
#! /usr/bin/env python3
import zmq
import struct
topic = "fancyhw_data".encode('ascii')
print("Reading messages with topic: {}".format(topic))
with zmq.Context() as context:
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5555")
socket.setsockopt(zmq.SUBSCRIBE, topic)
i = 0
try:
while True:
binary_topic, data_buffer = socket.recv().split(b' ', 1)
topic = binary_topic.decode(encoding = 'ascii')
print("Message {:d}:".format(i))
print("\ttopic: '{}'".format(topic))
packet_size = len(data_buffer) // struct.calcsize("h")
print("\tpacket size: {:d}".format(packet_size))
struct_format = "{:d}h".format(packet_size)
data = struct.unpack(struct_format, data_buffer)
print("\tdata: {}".format(data))
i += 1
except KeyboardInterrupt:
socket.close()
except Exception as error:
print("ERROR: {}".format(error))
socket.close()
将其保存到名为 online_analysis.py 的文件中。Python 不需要编译,因此您可以立即运行该程序。
这是输出
$ ./online_analysis.py
Reading messages with topic: b'fancyhw_data'
Message 0:
topic: 'fancyhw_data'
packet size: 16
data: (20946, -23616, 9865, 31416, -15911, -10845, -5332, 25662, 10955, -32501, -18717, -24490, -16511, -28861, 24205, 26568)
Message 1:
topic: 'fancyhw_data'
packet size: 16
data: (12505, 31355, 14083, -19654, -9141, 14532, -25591, 31203, 10428, -25564, -732, -7979, 9529, -27982, 29610, 30475)
...
...
结论
本教程介绍了一种从基于 C 的硬件接口收集数据并将其提供给基于 Python 的基础设施的替代方法。您可以获取这些数据并对其进行分析,或者将其传递到任意数量的方向。它采用消息传递库来传递“收集器”和“分析器”之间的数据,而不是使用一个庞大的软件来完成所有事情。
本教程还提高了我所说的“软件粒度”。换句话说,它将软件细分为更小的单元。这种策略的好处之一是可以同时使用不同的编程语言,而最小的接口充当它们之间的垫片程序。
在实践中,这种设计允许软件工程师更加协作且独立地工作。不同的团队可以处理分析的不同步骤,选择他们喜欢的工具。另一个好处是并行性,因为所有进程都可以并行运行。ZeroMQ 消息库是一款卓越的软件,它使所有这些都变得更加容易。
8 条评论