使用此消息库在 C 和 Python 之间共享数据

ZeroMQ 是一款快速且有弹性的消息库,用于收集数据并在多种语言之间共享。
137 位读者喜欢此文。
Chat via email

作为一名软件工程师,我有过一些时刻,当被要求执行一项让我不寒而栗的任务时。其中一个时刻是当我必须编写一些需要 C 的新硬件基础设施和主要使用 Python 的云基础设施之间的接口时。

一种策略可能是用 C 编写扩展,Python 在设计上支持这一点。快速浏览一下文档就会发现,这将意味着要编写大量的 C 代码。在某些情况下这可能很好,但这不是我喜欢做的事情。另一种策略是将这两个任务放在不同的进程中,并使用 ZeroMQ 消息库在两者之间交换消息。

在我发现 ZeroMQ 之前,当我遇到这种类型的场景时,我走了编写扩展的道路。这还不错,但是非常耗时且复杂。如今,为了避免这种情况,我将系统细分为独立的进程,这些进程通过在 通信套接字上发送的消息来交换信息。通过这种方法,几种编程语言可以共存,并且每个进程都更简单,因此更容易调试。

ZeroMQ 提供了一个更简单的过程

  1. 用 C 编写一个小的垫片程序,从硬件读取数据,并将找到的任何内容作为消息发送。
  2. 编写新的基础设施和现有基础设施之间的 Python 接口。

ZeroMQ 项目的创始人之一是 Pieter Hintjens,他是一位杰出的人物,拥有有趣的观点和著作

前提条件

本教程需要

在 Fedora 上安装它们,使用

$ dnf install clang zeromq zeromq-devel python3 python3-zmq

对于 Debian 或 Ubuntu

$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq

如果您遇到任何问题,请参考每个项目的安装说明(链接如上)。

编写硬件接口库

由于这是一个假设的场景,本教程将编写一个虚构的库,其中包含两个函数

  • fancyhw_init() 用于初始化(假设的)硬件
  • fancyhw_read_val() 用于返回从硬件读取的值

将库的完整源代码保存到名为 libfancyhw.h 的文件中

#ifndef LIBFANCYHW_H
#define LIBFANCYHW_H

#include <stdlib.h>
#include <stdint.h>

// This is the fictitious hardware interfacing library

void fancyhw_init(unsigned int init_param)
{
    srand(init_param);
}

int16_t fancyhw_read_val(void)
{
    return (int16_t)rand();
}

#endif

由于有随机数生成器,该库可以模拟您想要在语言之间传递的数据。

设计 C 接口

以下将逐步介绍如何编写 C 接口——从包含库到管理数据传输。

首先加载必要的库(每个库的用途都在代码注释中)

// For printf()
#include <stdio.h>
// For EXIT_*
#include <stdlib.h>
// For memcpy()
#include <string.h>
// For sleep()
#include <unistd.h>

#include <zmq.h>

#include "libfancyhw.h"

重要参数

定义 main 函数和程序其余部分所需的重要参数

int main(void)
{
    const unsigned int INIT_PARAM = 12345;
    const unsigned int REPETITIONS = 10;
    const unsigned int PACKET_SIZE = 16;
    const char *TOPIC = "fancyhw_data";

    ...

初始化

两个库都需要一些初始化。虚构的库只需要一个参数

fancyhw_init(INIT_PARAM);

ZeroMQ 库需要一些真正的初始化。首先,定义一个上下文——一个管理所有套接字的对象

void *context = zmq_ctx_new();

if (!context)
{
    printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));

    return EXIT_FAILURE;
}

然后定义用于传递数据的套接字。ZeroMQ 支持多种类型的套接字,每种套接字都有其应用。使用发布套接字(也称为 PUB 套接字),它可以将消息的副本传递给多个接收者。这种方法使您可以连接多个接收者,它们都将获得相同的消息。如果没有接收者,消息将被丢弃(即,它们不会排队)。使用以下方法执行此操作

void *data_socket = zmq_socket(context, ZMQ_PUB);

套接字必须绑定到一个地址,以便客户端知道在哪里连接。在这种情况下,使用 TCP 传输层(还有其他选项,但 TCP 是一个很好的默认选择)

const int rb = zmq_bind(data_socket, "tcp://*:5555");

if (rb != 0)
{
    printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));

    return EXIT_FAILURE;
}

接下来,计算稍后需要的一些有用值。注意下面代码中的 TOPICPUB 套接字需要一个与它们发送的消息关联的主题。接收者可以使用主题来过滤消息

const size_t topic_size = strlen(TOPIC);
const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);

printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);

发送消息

启动一个循环,发送 REPETITIONS 条消息

for (unsigned int i = 0; i < REPETITIONS; i++)
{
    ...

在发送消息之前,填充 PACKET_SIZE 个值的缓冲区。该库提供 16 位有符号整数。由于 C 中 int 的维度未定义,因此请使用具有特定宽度的 int

int16_t buffer[PACKET_SIZE];

for (unsigned int j = 0; j < PACKET_SIZE; j++)
{
    buffer[j] = fancyhw_read_val();
}

printf("Read %u data values\n", PACKET_SIZE);

消息准备和传递的第一步是创建 ZeroMQ 消息并分配消息所需的内存。这个空消息是一个信封,用于存储您将要发送的数据

zmq_msg_t envelope;

const int rmi = zmq_msg_init_size(&envelope, envelope_size);
if (rmi != 0)
{
    printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));

    zmq_msg_close(&envelope);

    break;
}

现在内存已分配,将数据存储在 ZeroMQ 消息“信封”中。zmq_msg_data() 函数返回指向信封中缓冲区开头的指针。第一部分是主题,后跟一个空格,然后是二进制数据。添加空格作为主题和数据之间的分隔符。要沿着缓冲区移动,您必须使用强制类型转换和指针运算。(谢谢你,C,让事情变得如此简单。)使用以下方法执行此操作

memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));

通过 data_socket 发送消息

const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
if (rs != envelope_size)
{
    printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));

    zmq_msg_close(&envelope);

    break;
}

确保在使用后处理信封

zmq_msg_close(&envelope);

printf("Message sent; i: %u, topic: %s\n", i, TOPIC);

清理

由于 C 不提供 垃圾回收,因此您必须进行清理。在您完成发送消息后,关闭程序并进行清理以释放已使用的内存

const int rc = zmq_close(data_socket);

if (rc != 0)
{
    printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));

    return EXIT_FAILURE;
}

const int rd = zmq_ctx_destroy(context);

if (rd != 0)
{
    printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));

    return EXIT_FAILURE;
}

return EXIT_SUCCESS;

完整的 C 程序

将完整的接口库保存在名为 hw_interface.c 的本地文件中

// For printf()
#include <stdio.h>
// For EXIT_*
#include <stdlib.h>
// For memcpy()
#include <string.h>
// For sleep()
#include <unistd.h>

#include <zmq.h>

#include "libfancyhw.h"

int main(void)
{
    const unsigned int INIT_PARAM = 12345;
    const unsigned int REPETITIONS = 10;
    const unsigned int PACKET_SIZE = 16;
    const char *TOPIC = "fancyhw_data";

    fancyhw_init(INIT_PARAM);

    void *context = zmq_ctx_new();

    if (!context)
    {
        printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));

        return EXIT_FAILURE;
    }

    void *data_socket = zmq_socket(context, ZMQ_PUB);

    const int rb = zmq_bind(data_socket, "tcp://*:5555");

    if (rb != 0)
    {
        printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));

        return EXIT_FAILURE;
    }

    const size_t topic_size = strlen(TOPIC);
    const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);

    printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);

    for (unsigned int i = 0; i < REPETITIONS; i++)
    {
        int16_t buffer[PACKET_SIZE];

        for (unsigned int j = 0; j < PACKET_SIZE; j++)
        {
            buffer[j] = fancyhw_read_val();
        }

        printf("Read %u data values\n", PACKET_SIZE);

        zmq_msg_t envelope;
    
        const int rmi = zmq_msg_init_size(&envelope, envelope_size);
        if (rmi != 0)
        {
            printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));
    
            zmq_msg_close(&envelope);
    
            break;
        }
        
        memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);

        memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);

        memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));
    
        const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
        if (rs != envelope_size)
        {
            printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));
    
            zmq_msg_close(&envelope);
    
            break;
        }
    
        zmq_msg_close(&envelope);

        printf("Message sent; i: %u, topic: %s\n", i, TOPIC);

        sleep(1);
    }

    const int rc = zmq_close(data_socket);

    if (rc != 0)
    {
        printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));

        return EXIT_FAILURE;
    }

    const int rd = zmq_ctx_destroy(context);

    if (rd != 0)
    {
        printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));

        return EXIT_FAILURE;
    }

    return EXIT_SUCCESS;
}

使用以下命令编译

$ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface

如果没有编译错误,您可以运行该接口。最棒的是,ZeroMQ PUB 套接字可以在没有任何应用程序发送或检索数据的情况下运行。这降低了复杂性,因为在哪个进程需要先启动方面没有义务。

运行接口

$ ./hw_interface 
Topic: fancyhw_data; topic size: 12; Envelope size: 45
Read 16 data values
Message sent; i: 0, topic: fancyhw_data
Read 16 data values
Message sent; i: 1, topic: fancyhw_data
Read 16 data values
...
...

输出显示正在通过 ZeroMQ 发送的数据。现在您需要一个应用程序来读取数据。

编写 Python 数据处理器

现在您可以准备好将数据从 C 传递到 Python 应用程序。

您需要两个库来帮助传输数据。首先,您需要在 Python 中使用 ZeroMQ 绑定

$ python3 -m pip install zmq

另一个是 struct,它解码二进制数据。它通常与 Python 标准库一起提供,因此无需 pip install 它。

Python 程序的第一个部分导入这两个库

import zmq
import struct

重要参数

要使用 ZeroMQ,您必须订阅与上面常量 TOPIC 中使用的主题相同的主题

topic = "fancyhw_data".encode('ascii')

print("Reading messages with topic: {}".format(topic))

初始化

接下来,初始化上下文和套接字。使用订阅套接字(也称为 SUB 套接字),它是 PUB 套接字的天然伙伴。套接字还需要订阅正确的主题

with zmq.Context() as context:
    socket = context.socket(zmq.SUB)

    socket.connect("tcp://127.0.0.1:5555")
    socket.setsockopt(zmq.SUBSCRIBE, topic)

    i = 0

    ...

接收消息

启动一个无限循环,等待新消息传递到 SUB 套接字。如果您按下 Ctrl+C 或发生错误,循环将关闭

    try:
        while True:

            ... # we will fill this in next

    except KeyboardInterrupt:
        socket.close()
    except Exception as error:
        print("ERROR: {}".format(error))
        socket.close()

循环等待新消息通过 recv() 方法到达。然后它在第一个空格处拆分接收到的任何内容,以将主题与内容分开

binary_topic, data_buffer = socket.recv().split(b' ', 1)

解码消息

Python 尚不知道主题是字符串,因此使用标准 ASCII 编码对其进行解码

topic = binary_topic.decode(encoding = 'ascii')

print("Message {:d}:".format(i))
print("\ttopic: '{}'".format(topic))

下一步是使用 struct 库读取二进制数据,该库可以将无定形的二进制数据块转换为有意义的值。首先,计算数据包中存储的值的数量。此示例使用 16 位有符号整数,它们对应于 struct 格式中的“h”

packet_size = len(data_buffer) // struct.calcsize("h")

print("\tpacket size: {:d}".format(packet_size))

通过了解数据包中有多少个值,您可以通过准备一个包含值数量及其类型的字符串来定义格式(例如,“16h”)

struct_format = "{:d}h".format(packet_size)

将二进制数据块转换为您可以立即打印的一系列数字

data = struct.unpack(struct_format, data_buffer)

print("\tdata: {}".format(data))

完整的 Python 程序

这是完整的 Python 数据接收器

#! /usr/bin/env python3

import zmq
import struct

topic = "fancyhw_data".encode('ascii')

print("Reading messages with topic: {}".format(topic))

with zmq.Context() as context:
    socket = context.socket(zmq.SUB)

    socket.connect("tcp://127.0.0.1:5555")
    socket.setsockopt(zmq.SUBSCRIBE, topic)

    i = 0

    try:
        while True:
            binary_topic, data_buffer = socket.recv().split(b' ', 1)

            topic = binary_topic.decode(encoding = 'ascii')

            print("Message {:d}:".format(i))
            print("\ttopic: '{}'".format(topic))

            packet_size = len(data_buffer) // struct.calcsize("h")

            print("\tpacket size: {:d}".format(packet_size))

            struct_format = "{:d}h".format(packet_size)

            data = struct.unpack(struct_format, data_buffer)

            print("\tdata: {}".format(data))

            i += 1

    except KeyboardInterrupt:
        socket.close()
    except Exception as error:
        print("ERROR: {}".format(error))
        socket.close()

将其保存到名为 online_analysis.py 的文件中。Python 不需要编译,因此您可以立即运行该程序。

这是输出

$ ./online_analysis.py 
Reading messages with topic: b'fancyhw_data'
Message 0:
        topic: 'fancyhw_data'
        packet size: 16
        data: (20946, -23616, 9865, 31416, -15911, -10845, -5332, 25662, 10955, -32501, -18717, -24490, -16511, -28861, 24205, 26568)
Message 1:
        topic: 'fancyhw_data'
        packet size: 16
        data: (12505, 31355, 14083, -19654, -9141, 14532, -25591, 31203, 10428, -25564, -732, -7979, 9529, -27982, 29610, 30475)
...
...

结论

本教程介绍了一种从基于 C 的硬件接口收集数据并将其提供给基于 Python 的基础设施的替代方法。您可以获取这些数据并对其进行分析,或者将其传递到任意数量的方向。它采用消息传递库来传递“收集器”和“分析器”之间的数据,而不是使用一个庞大的软件来完成所有事情。

本教程还提高了我所说的“软件粒度”。换句话说,它将软件细分为更小的单元。这种策略的好处之一是可以同时使用不同的编程语言,而最小的接口充当它们之间的垫片程序。

在实践中,这种设计允许软件工程师更加协作且独立地工作。不同的团队可以处理分析的不同步骤,选择他们喜欢的工具。另一个好处是并行性,因为所有进程都可以并行运行。ZeroMQ 消息库是一款卓越的软件,它使所有这些都变得更加容易。

接下来阅读
标签
User profile image.
Cristiano L. Fontana 曾是帕多瓦大学(意大利)物理与天文系“伽利略·伽利雷”的研究员,后来转向其他新的体验。

8 条评论

非常有信息量

哇,好文章。

错误.. Swig?这里只需一小部分努力,经过了非常好的测试,并且没有手写 C 代码的脆弱性。我确信在两种语言之间使用消息传递库有很多用例,但对于通用接口,已经有很多优秀的替代方案,工作量明显减少。

是的,SWIG 是一种有效的替代方案,对于这种简单的情况,它当然可能更实用。
坦率地说,它从来没有首先出现在我的脑海中。我更喜欢消息传递方法,因为它允许我有独立的进程协同工作。我想这只是个人喜好问题。

回复 ,作者:Tim Parker (未验证)

Cristiano,我在美国“居家避疫”时偶然看到了你的帖子。我希望你和你的家人安好……我们都将度过这段艰难时期,并因此变得更强大。

关于这篇文章……极好的!……你有一种简洁地描述编程概念,然后指导读者阅读代码的真正技巧。
谢谢你,请保重。

谢谢!是的,这里的情况非常严峻,尤其是在红色区域。幸运的是,我的家人尚未受到病毒影响。我们都遵守法律,并尽可能待在家里。我还应该公开感谢 opensource.com 团队,他们在一些私人对话中向我表达了他们的同情。他们确实关心这个平台的作者。

也感谢您对这篇文章的赞美之词。我正在尽我所能回馈社区,为了我至今使用的所有软件。

回复 ,作者:ScottM (未验证)

Creative Commons License本作品根据知识共享署名-相同方式共享 4.0 国际许可协议获得许可。
© . All rights reserved.