Linux 中的进程间通信:使用管道和消息队列

了解进程如何在 Linux 中彼此同步。
207 位读者喜欢这篇文章。
Chat bubbles

这是关于 Linux 中进程间通信 (IPC) 系列文章的第二篇。第一篇文章侧重于通过共享存储进行 IPC:共享文件和共享内存段。本文转向管道,管道是连接进程进行通信的通道。通道有一个写入端用于写入字节,以及一个读取端用于以 FIFO(先进先出)顺序读取这些字节。在典型用法中,一个进程写入通道,而不同的进程从同一通道读取。字节本身可以代表任何事物:数字、员工记录、数字电影等等。

管道有两种类型,命名管道和匿名管道,并且可以从命令行交互式使用或在程序中使用;示例即将到来。本文还介绍了消息队列,消息队列已经过时——但不应如此。

第一篇文章中的代码示例承认了在使用共享存储的 IPC 中基于文件或基于内存的竞争条件的威胁。自然而然地会产生关于基于通道的 IPC 的安全并发性的问题,这将在本文中介绍。管道和消息队列的代码示例使用了带有 POSIX 认可印章的 API,而 POSIX 标准的核心目标是线程安全。

考虑 mq_open 函数的手册页,该函数属于消息队列 API。这些页面包含一个关于 属性 的部分,其中包含这个小表格

接口 属性
mq_open() 线程安全 MT-Safe

MT-Safe 值(其中 MT 代表多线程)意味着 mq_open 函数是线程安全的,这反过来意味着进程安全:进程的执行正是其线程之一的执行,并且如果竞争条件不会在同一进程中的线程之间发生,则这种条件也不会在不同进程中的线程之间发生。MT-Safe 属性保证竞争条件不会在 mq_open 的调用中发生。一般来说,基于通道的 IPC 是并发安全的,尽管在下面的示例中提出了一个警示性说明。

匿名管道

让我们从一个人为设计的命令行示例开始,该示例展示了匿名管道的工作原理。在所有现代系统中,竖线 | 表示命令行上的匿名管道。假设 % 是命令行提示符,并考虑以下命令

% sleep 5 | echo "Hello, world!" ## writer to the left of |, reader to the right

sleepecho 实用程序作为单独的进程执行,匿名管道允许它们进行通信。然而,该示例是人为设计的,因为没有发生通信。问候语 Hello, world! 出现在屏幕上;然后,大约五秒钟后,命令行提示符返回,表明 sleepecho 进程都已退出。发生了什么?

在命令行中的竖线语法中,左侧的进程 (sleep) 是写入器,右侧的进程 (echo) 是读取器。默认情况下,读取器会阻塞,直到通道中有字节可读,而写入器在写入其字节后,通过发送流结束标记来完成操作。(即使写入器过早终止,也会向读取器发送流结束标记。)匿名管道会一直存在,直到写入器和读取器都终止。

[下载 Linux 中进程间通信的完整指南]

在人为设计的示例中,sleep 进程不会向通道写入任何字节,但在大约五秒钟后终止,这会向通道发送流结束标记。与此同时,echo 进程立即将问候语写入标准输出(屏幕),因为此进程不从通道读取任何字节,因此它不会等待。一旦 sleepecho 进程终止,匿名管道(根本没有用于通信)就会消失,并且命令行提示符返回。

这是一个更有用的示例,使用了两个匿名管道。假设文件 test.dat 看起来像这样

this
is
the
way
the
world
ends

命令

% cat test.dat | sort | uniq

cat(连接)进程的输出通过管道传输到 sort 进程以生成排序后的输出,然后将排序后的输出通过管道传输到 uniq 进程以消除重复记录(在本例中,两个 the 减少为一个)

ends
is
the
this
way
world

现在为具有两个通过匿名管道通信的进程的程序设置了场景。

示例 1. 两个通过匿名管道通信的进程。

#include <sys/wait.h> /* wait */
#include <stdio.h>
#include <stdlib.h>   /* exit functions */
#include <unistd.h>   /* read, write, pipe, _exit */
#include <string.h>

#define ReadEnd  0
#define WriteEnd 1

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1);    /** failure **/
}

int main() {
  int pipeFDs[2]; /* two file descriptors */
  char buf;       /* 1-byte buffer */
  const char* msg = "Nature's first green is gold\n"; /* bytes to write */

  if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
  pid_t cpid = fork();                                /* fork a child process */
  if (cpid < 0) report_and_exit("fork");              /* check for failure */

  if (0 == cpid) {    /*** child ***/                 /* child process */
    close(pipeFDs[WriteEnd]);                         /* child reads, doesn't write */

    while (read(pipeFDs[ReadEnd], &buf, 1) > 0)       /* read until end of byte stream */
      write(STDOUT_FILENO, &buf, sizeof(buf));        /* echo to the standard output */

    close(pipeFDs[ReadEnd]);                          /* close the ReadEnd: all done */
    _exit(0);                                         /* exit and notify parent at once  */
  }
  else {              /*** parent ***/
    close(pipeFDs[ReadEnd]);                          /* parent writes, doesn't read */

    write(pipeFDs[WriteEnd], msg, strlen(msg));       /* write the bytes to the pipe */
    close(pipeFDs[WriteEnd]);                         /* done writing: generate eof */

    wait(NULL);                                       /* wait for child to exit */
    exit(0);                                          /* exit normally */
  }
  return 0;
}

上面的 pipeUN 程序使用系统函数 fork 来创建进程。虽然该程序只有一个源文件,但在(成功)执行期间会发生多进程。以下是库函数 fork 工作原理的快速回顾中的详细信息

  • fork 函数在进程中调用,如果失败,则向父进程返回 -1。在 pipeUN 示例中,调用是
    pid_t cpid = fork(); /* called in parent */

    返回的值存储在本例中,存储在整数类型 pid_t 的变量 cpid 中。(每个进程都有自己的进程 ID,这是一个非负整数,用于标识进程。)由于多种原因,fork 一个新进程可能会失败,包括进程表已满,进程表是系统维护的用于跟踪进程的结构。僵尸进程(稍后澄清)如果未被回收,可能会导致进程表填满。

  • 如果 fork 调用成功,它会由此产生(创建)一个新的子进程,向父进程返回一个值,但向子进程返回不同的值。父进程和子进程都执行 same 代码,该代码位于 fork 调用之后。(子进程继承父进程中迄今为止声明的所有变量的副本。)特别是,成功的 fork 调用返回
    • 子进程为零
    • 父进程为子进程的进程 ID
  • if/else 或等效结构通常在成功的 fork 调用之后使用,以将父进程的代码与子进程的代码分开。在本例中,构造是
    if (0 == cpid) {    /*** child ***/
    ...
    }
    else {              /*** parent ***/
    ...
    }

如果 fork 子进程成功,则 pipeUN 程序按如下方式进行。有一个整数数组

int pipeFDs[2]; /* two file descriptors */

用于保存两个文件描述符,一个用于写入管道,另一个用于从管道读取。(数组元素 pipeFDs[0] 是读取端的文件描述符,数组元素 pipeFDs[1] 是写入端的文件描述符。)对系统 pipe 函数的成功调用(在调用 fork 之前立即进行)用两个文件描述符填充数组

if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");

父进程和子进程现在都拥有两个文件描述符的副本,但关注点分离模式意味着每个进程正好需要其中一个描述符。在本例中,父进程执行写入操作,子进程执行读取操作,尽管角色可以颠倒。因此,子进程 if 子句代码中的第一个语句关闭管道的写入端

close(pipeFDs[WriteEnd]); /* called in child code */

而父进程 else 子句代码中的第一个语句关闭管道的读取端

close(pipeFDs[ReadEnd]);  /* called in parent code */

然后,父进程将一些字节(ASCII 码)写入匿名管道,子进程读取这些字节并将它们回显到标准输出。

程序的另一个方面需要澄清:父代码中对 wait 函数的调用。一旦产生,子进程在很大程度上独立于其父进程,即使是简短的 pipeUN 程序也说明了这一点。子进程可以执行可能与父进程无关的任意代码。但是,系统确实会通过信号通知父进程——如果子进程终止。

如果父进程在子进程之前终止会怎样?在这种情况下,除非采取预防措施,否则子进程将变成并保持为僵尸进程,并在进程表中有一个条目。预防措施主要有两种类型。一种预防措施是让父进程通知系统,父进程对子进程的终止不感兴趣

signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */

第二种方法是让父进程对子进程的终止执行 wait,从而确保父进程比子进程活得更久。第二种方法在 pipeUN 程序中使用,其中父代码具有以下调用

wait(NULL); /* called in parent */

wait 的此调用意味着等待任何子进程的终止发生,而在 pipeUN 程序中,只有一个子进程。(NULL 参数可以用整数变量的地址替换,以保存子进程的退出状态。)有一个更灵活的 waitpid 函数用于细粒度控制,例如,用于指定多个子进程中的特定子进程。

pipeUN 程序采取了另一个预防措施。当父进程完成等待后,父进程通过调用常规的 exit 函数终止。相比之下,子进程通过调用 _exit 变体终止,该变体快速跟踪终止通知。实际上,子进程告诉系统尽快通知父进程子进程已终止。

如果两个进程写入同一个匿名管道,字节是否可以交错?例如,如果进程 P1 写入

foo bar

到管道,并且进程 P2 同时写入

baz baz

到同一个管道,管道内容似乎可能是任意的,例如

baz foo baz bar

POSIX 标准确保只要写入不超过 PIPE_BUF 字节,写入就不会交错。在 Linux 系统上,PIPE_BUF 的大小为 4,096 字节。我对管道的偏好是只有一个写入器和一个读取器,从而避免了这个问题。

命名管道

匿名管道没有后备文件:系统维护一个内存缓冲区,用于将字节从写入器传输到读取器。一旦写入器和读取器终止,缓冲区将被回收,因此匿名管道将消失。相比之下,命名管道具有后备文件和不同的 API。

让我们看另一个命令行示例,以了解命名管道的要点。以下是步骤

  • 打开两个终端。两个终端的工作目录应相同。
  • 在一个终端中,输入以下两个命令(提示符再次为 %,我的注释以 ## 开头)
    % mkfifo tester  ## creates a backing file named tester
    % cat tester     ## type the pipe's contents to stdout

    一开始,终端中不应显示任何内容,因为尚未向命名管道写入任何内容。

  • 在第二个终端中,输入命令
    % cat > tester  ## redirect keyboard input to the pipe
    hello, world!   ## then hit Return key
    bye, bye        ## ditto
    <Control-C>     ## terminate session with a Control-C

    输入到此终端的任何内容都会在另一个终端中回显。一旦输入 Ctrl+C,两个终端都会返回常规命令行提示符:管道已关闭。

  • 通过删除实现命名管道的文件来清理
    % unlink tester

正如实用程序名称 mkfifo 所暗示的那样,命名管道也称为 FIFO,因为第一个字节输入是第一个字节输出,依此类推。有一个名为 mkfifo 的库函数,用于在程序中创建命名管道,并在下一个示例中使用,该示例由两个进程组成:一个进程写入命名管道,另一个进程从该管道读取。

示例 2. fifoWriter 程序

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>

#define MaxLoops         12000   /* outer loop */
#define ChunkSize           16   /* how many written at a time */
#define IntsPerChunk         4   /* four 4-byte ints per chunk */
#define MaxZs              250   /* max microseconds to sleep */

int main() {
  const char* pipeName = "./fifoChannel";
  mkfifo(pipeName, 0666);                      /* read/write for user/group/others */
  int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
  if (fd < 0) return -1;                       /* can't go on */

  int i;
  for (i = 0; i < MaxLoops; i++) {          /* write MaxWrites times */
    int j;
    for (j = 0; j < ChunkSize; j++) {       /* each time, write ChunkSize bytes */
      int k;
      int chunk[IntsPerChunk];
      for (k = 0; k < IntsPerChunk; k++)
        chunk[k] = rand();
      write(fd, chunk, sizeof(chunk));
    }
    usleep((rand() % MaxZs) + 1);           /* pause a bit for realism */
  }

  close(fd);           /* close pipe: generates an end-of-stream marker */
  unlink(pipeName);    /* unlink from the implementing file */
  printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);

  return 0;
}

上面的 fifoWriter 程序可以总结如下

  • 该程序创建一个用于写入的命名管道
    mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
    int fd = open(pipeName, O_CREAT | O_WRONLY);

    其中 pipeName 是作为第一个参数传递给 mkfifo 的后备文件的名称。然后使用现在熟悉的对 open 函数的调用打开命名管道,该函数返回文件描述符。

  • 为了增加真实感,fifoWriter 不会一次写入所有数据,而是写入一个块,休眠随机数量的微秒,依此类推。总共将 768,000 个 4 字节的整数值写入命名管道。
  • 在关闭命名管道后,fifoWriter 还会取消链接该文件
    close(fd);        /* close pipe: generates end-of-stream marker */
    unlink(pipeName); /* unlink from the implementing file */

    一旦连接到管道的每个进程都执行了取消链接操作,系统就会回收后备文件。在本例中,只有两个这样的进程:fifoWriterfifoReader,它们都执行了 unlink 操作。

这两个程序应在具有相同工作目录的不同终端中执行。但是,应在 fifoReader 之前启动 fifoWriter,因为前者创建管道。然后,fifoReader 访问已创建的命名管道。

示例 3. fifoReader 程序

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>

unsigned is_prime(unsigned n) { /* not pretty, but efficient */
  if (n <= 3) return n > 1;
  if (0 == (n % 2) || 0 == (n % 3)) return 0;

  unsigned i;
  for (i = 5; (i * i) <= n; i += 6)
    if (0 == (n % i) || 0 == (n % (i + 2))) return 0;

  return 1; /* found a prime! */
}

int main() {
  const char* file = "./fifoChannel";
  int fd = open(file, O_RDONLY);
  if (fd < 0) return -1; /* no point in continuing */
  unsigned count = 0, total = 0, primes_count = 0;

  while (1) {
    int next;
    int i;

    ssize_t count = read(fd, &next, sizeof(int));
    if (0 == count) break;                  /* end of stream */
    else if (count == sizeof(int)) {        /* read a 4-byte int value */
      total++;
      if (is_prime(next)) primes_count++;
    }
  }

  close(fd);       /* close pipe from read end */
  unlink(file);    /* unlink from the underlying file */
  printf("Received ints: %u, primes: %u\n", total, primes_count);

  return 0;
}

上面的 fifoReader 程序可以总结如下

  • 由于 fifoWriter 创建了命名管道,因此 fifoReader 只需要标准调用 open 即可通过后备文件访问管道
    const char* file = "./fifoChannel";
    int fd = open(file, O_RDONLY);

    该文件以只读方式打开。

  • 然后,该程序进入一个可能无限循环,尝试在每次迭代中读取一个 4 字节的块。read 调用
    ssize_t count = read(fd, &next, sizeof(int));

    返回 0 以指示流结束,在这种情况下,fifoReader 跳出循环,关闭命名管道,并在终止之前取消链接后备文件。

  • 在读取一个 4 字节的整数后,fifoReader 检查该数字是否为素数。这代表了生产级读取器可能对接收到的字节执行的业务逻辑。在一次示例运行中,在接收到的 768,000 个整数中,有 37,682 个素数。

在重复的示例运行中,fifoReader 成功读取了 fifoWriter 写入的所有字节。这并不奇怪。这两个进程在同一主机上执行,消除了网络问题。命名管道是一种高度可靠且高效的 IPC 机制,因此被广泛使用。

以下是两个程序的输出,每个程序都从单独的终端启动,但具有相同的工作目录

% ./fifoWriter
768000 ints sent to the pipe.
###
% ./fifoReader
Received ints: 768000, primes: 37682

消息队列

管道具有严格的 FIFO 行为:先写入的字节先读取,第二个写入的字节第二个读取,依此类推。消息队列可以表现出相同的行为,但足够灵活,可以以非 FIFO 顺序检索字节块。

顾名思义,消息队列是一系列消息,每条消息都有两个部分

  • 有效负载,它是一个字节数组 (C 中的 char)
  • 类型,以正整数值给出;类型对消息进行分类以进行灵活检索

考虑以下消息队列的描述,每条消息都标有整数类型

          +-+    +-+    +-+    +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
          +-+    +-+    +-+    +-+

在显示的四条消息中,标记为 1 的消息位于最前面,即最靠近接收器。接下来是两条标记为 2 的消息,最后是一条标记为 3 的消息在最后面。如果严格的 FIFO 行为正在发挥作用,则消息将按 1-2-2-3 的顺序接收。但是,消息队列允许其他检索顺序。例如,消息可以由接收器按 3-2-1-2 的顺序检索。

mqueue 示例由两个程序组成,sender 将消息写入消息队列,receiver 从该队列读取消息。这两个程序都包含头文件 queue.h,如下所示

示例 4. 头文件 queue.h

#define ProjectId 123
#define PathName  "queue.h" /* any existing, accessible file would do */
#define MsgLen    4
#define MsgCount  6

typedef struct {
  long type;                 /* must be of type long */
  char payload[MsgLen + 1];  /* bytes in the message */
} queuedMessage;

头文件定义了一个名为 queuedMessage 的结构类型,其中包含 payload(字节数组)和 type(整数)字段。此文件还定义了符号常量(#define 语句),前两个符号常量用于生成键,而键又用于获取消息队列 ID。ProjectId 可以是任何正整数值,PathName 必须是现有的、可访问的文件——在本例中为文件 queue.hsenderreceiver 程序中的设置语句是

key_t key = ftok(PathName, ProjectId);   /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */

ID qid 实际上是消息队列的文件描述符的对应物。

示例 5. 消息 sender 程序

#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  key_t key = ftok(PathName, ProjectId);
  if (key < 0) report_and_exit("couldn't get key...");

  int qid = msgget(key, 0666 | IPC_CREAT);
  if (qid < 0) report_and_exit("couldn't get queue id...");

  char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
  int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
  int i;
  for (i = 0; i < MsgCount; i++) {
    /* build the message */
    queuedMessage msg;
    msg.type = types[i];
    strcpy(msg.payload, payloads[i]);

    /* send the message */
    msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */
    printf("%s sent as type %i\n", msg.payload, (int) msg.type);
  }
  return 0;
}

上面的 sender 程序发送六条消息,每种指定类型两条消息:第一批消息的类型为 1,接下来的两条消息的类型为 2,最后两条消息的类型为 3。发送语句

msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);

配置为非阻塞(标志 IPC_NOWAIT),因为消息非常小。唯一的危险是队列已满,在本例中不太可能发生,会导致发送失败。下面的 receiver 程序也使用 IPC_NOWAIT 标志接收消息。

示例 6. 消息 receiver 程序

#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include "queue.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
  if (key < 0) report_and_exit("key not gotten...");

  int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
  if (qid < 0) report_and_exit("no access to queue...");

  int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
  int i;
  for (i = 0; i < MsgCount; i++) {
    queuedMessage msg; /* defined in queue.h */
    if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
      puts("msgrcv trouble...");
    printf("%s received as type %i\n", msg.payload, (int) msg.type);
  }

  /** remove the queue **/
  if (msgctl(qid, IPC_RMID, NULL) < 0)  /* NULL = 'no flags' */
    report_and_exit("trouble removing queue...");

  return 0;
}

receiver 程序不创建消息队列,尽管 API 似乎暗示了这一点。在 receiver 中,调用

int qid = msgget(key, 0666 | IPC_CREAT);

由于 IPC_CREAT 标志而具有误导性,但此标志实际上意味着如果需要则创建,否则访问sender 程序调用 msgsnd 发送消息,而 receiver 调用 msgrcv 检索消息。在本例中,sender 按 1-1-2-2-3-3 的顺序发送消息,但 receiver 然后按 3-1-2-1-3-2 的顺序检索它们,这表明消息队列不受严格的 FIFO 行为的约束

% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3

% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2

上面的输出显示 senderreceiver 可以从同一终端启动。输出还显示,即使在 sender 进程创建队列、向队列写入并退出后,消息队列仍然存在。队列仅在 receiver 进程通过调用 msgctl 显式删除它后才会消失

if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */

总结

管道和消息队列 API 从根本上是单向的:一个进程写入,另一个进程读取。有双向命名管道的实现,但我认为这种 IPC 机制在其最简单的时候是最好的。如前所述,消息队列的普及程度有所下降——但没有充分的理由;这些队列是 IPC 工具箱中的另一种工具。第 3 部分通过套接字和信号的 IPC 代码示例完成了 IPC 工具箱的快速浏览。

标签
User profile image.
我是一名计算机科学领域的学者(德保罗大学计算与数字媒体学院),在软件开发方面拥有广泛的经验,主要是在生产计划和调度(钢铁行业)以及产品配置(卡车和客车制造)方面。有关书籍和其他出版物的详细信息,请访问

评论已关闭。

© . All rights reserved.