这是关于 Linux 中进程间通信 (IPC) 系列文章的第二篇。第一篇文章侧重于通过共享存储进行 IPC:共享文件和共享内存段。本文转向管道,管道是连接进程进行通信的通道。通道有一个写入端用于写入字节,以及一个读取端用于以 FIFO(先进先出)顺序读取这些字节。在典型用法中,一个进程写入通道,而不同的进程从同一通道读取。字节本身可以代表任何事物:数字、员工记录、数字电影等等。
管道有两种类型,命名管道和匿名管道,并且可以从命令行交互式使用或在程序中使用;示例即将到来。本文还介绍了消息队列,消息队列已经过时——但不应如此。
第一篇文章中的代码示例承认了在使用共享存储的 IPC 中基于文件或基于内存的竞争条件的威胁。自然而然地会产生关于基于通道的 IPC 的安全并发性的问题,这将在本文中介绍。管道和消息队列的代码示例使用了带有 POSIX 认可印章的 API,而 POSIX 标准的核心目标是线程安全。
考虑 mq_open 函数的手册页,该函数属于消息队列 API。这些页面包含一个关于 属性 的部分,其中包含这个小表格
接口 | 属性 | 值 |
mq_open() | 线程安全 | MT-Safe |
MT-Safe 值(其中 MT 代表多线程)意味着 mq_open 函数是线程安全的,这反过来意味着进程安全:进程的执行正是其线程之一的执行,并且如果竞争条件不会在同一进程中的线程之间发生,则这种条件也不会在不同进程中的线程之间发生。MT-Safe 属性保证竞争条件不会在 mq_open 的调用中发生。一般来说,基于通道的 IPC 是并发安全的,尽管在下面的示例中提出了一个警示性说明。
匿名管道
让我们从一个人为设计的命令行示例开始,该示例展示了匿名管道的工作原理。在所有现代系统中,竖线 | 表示命令行上的匿名管道。假设 % 是命令行提示符,并考虑以下命令
% sleep 5 | echo "Hello, world!" ## writer to the left of |, reader to the right
sleep 和 echo 实用程序作为单独的进程执行,匿名管道允许它们进行通信。然而,该示例是人为设计的,因为没有发生通信。问候语 Hello, world! 出现在屏幕上;然后,大约五秒钟后,命令行提示符返回,表明 sleep 和 echo 进程都已退出。发生了什么?
在命令行中的竖线语法中,左侧的进程 (sleep) 是写入器,右侧的进程 (echo) 是读取器。默认情况下,读取器会阻塞,直到通道中有字节可读,而写入器在写入其字节后,通过发送流结束标记来完成操作。(即使写入器过早终止,也会向读取器发送流结束标记。)匿名管道会一直存在,直到写入器和读取器都终止。
[下载 Linux 中进程间通信的完整指南]
在人为设计的示例中,sleep 进程不会向通道写入任何字节,但在大约五秒钟后终止,这会向通道发送流结束标记。与此同时,echo 进程立即将问候语写入标准输出(屏幕),因为此进程不从通道读取任何字节,因此它不会等待。一旦 sleep 和 echo 进程终止,匿名管道(根本没有用于通信)就会消失,并且命令行提示符返回。
这是一个更有用的示例,使用了两个匿名管道。假设文件 test.dat 看起来像这样
this
is
the
way
the
world
ends
命令
% cat test.dat | sort | uniq
将 cat(连接)进程的输出通过管道传输到 sort 进程以生成排序后的输出,然后将排序后的输出通过管道传输到 uniq 进程以消除重复记录(在本例中,两个 the 减少为一个)
ends
is
the
this
way
world
现在为具有两个通过匿名管道通信的进程的程序设置了场景。
示例 1. 两个通过匿名管道通信的进程。
#include <sys/wait.h> /* wait */
#include <stdio.h>
#include <stdlib.h> /* exit functions */
#include <unistd.h> /* read, write, pipe, _exit */
#include <string.h>
#define ReadEnd 0
#define WriteEnd 1
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /** failure **/
}
int main() {
int pipeFDs[2]; /* two file descriptors */
char buf; /* 1-byte buffer */
const char* msg = "Nature's first green is gold\n"; /* bytes to write */
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
pid_t cpid = fork(); /* fork a child process */
if (cpid < 0) report_and_exit("fork"); /* check for failure */
if (0 == cpid) { /*** child ***/ /* child process */
close(pipeFDs[WriteEnd]); /* child reads, doesn't write */
while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */
write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */
close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */
_exit(0); /* exit and notify parent at once */
}
else { /*** parent ***/
close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */
write(pipeFDs[WriteEnd], msg, strlen(msg)); /* write the bytes to the pipe */
close(pipeFDs[WriteEnd]); /* done writing: generate eof */
wait(NULL); /* wait for child to exit */
exit(0); /* exit normally */
}
return 0;
}
上面的 pipeUN 程序使用系统函数 fork 来创建进程。虽然该程序只有一个源文件,但在(成功)执行期间会发生多进程。以下是库函数 fork 工作原理的快速回顾中的详细信息
- fork 函数在父进程中调用,如果失败,则向父进程返回 -1。在 pipeUN 示例中,调用是
pid_t cpid = fork(); /* called in parent */
返回的值存储在本例中,存储在整数类型 pid_t 的变量 cpid 中。(每个进程都有自己的进程 ID,这是一个非负整数,用于标识进程。)由于多种原因,fork 一个新进程可能会失败,包括进程表已满,进程表是系统维护的用于跟踪进程的结构。僵尸进程(稍后澄清)如果未被回收,可能会导致进程表填满。
- 如果 fork 调用成功,它会由此产生(创建)一个新的子进程,向父进程返回一个值,但向子进程返回不同的值。父进程和子进程都执行 same 代码,该代码位于 fork 调用之后。(子进程继承父进程中迄今为止声明的所有变量的副本。)特别是,成功的 fork 调用返回
- 子进程为零
- 父进程为子进程的进程 ID
- if/else 或等效结构通常在成功的 fork 调用之后使用,以将父进程的代码与子进程的代码分开。在本例中,构造是
if (0 == cpid) { /*** child ***/ ... } else { /*** parent ***/ ... }
如果 fork 子进程成功,则 pipeUN 程序按如下方式进行。有一个整数数组
int pipeFDs[2]; /* two file descriptors */
用于保存两个文件描述符,一个用于写入管道,另一个用于从管道读取。(数组元素 pipeFDs[0] 是读取端的文件描述符,数组元素 pipeFDs[1] 是写入端的文件描述符。)对系统 pipe 函数的成功调用(在调用 fork 之前立即进行)用两个文件描述符填充数组
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
父进程和子进程现在都拥有两个文件描述符的副本,但关注点分离模式意味着每个进程正好需要其中一个描述符。在本例中,父进程执行写入操作,子进程执行读取操作,尽管角色可以颠倒。因此,子进程 if 子句代码中的第一个语句关闭管道的写入端
close(pipeFDs[WriteEnd]); /* called in child code */
而父进程 else 子句代码中的第一个语句关闭管道的读取端
close(pipeFDs[ReadEnd]); /* called in parent code */
然后,父进程将一些字节(ASCII 码)写入匿名管道,子进程读取这些字节并将它们回显到标准输出。
程序的另一个方面需要澄清:父代码中对 wait 函数的调用。一旦产生,子进程在很大程度上独立于其父进程,即使是简短的 pipeUN 程序也说明了这一点。子进程可以执行可能与父进程无关的任意代码。但是,系统确实会通过信号通知父进程——如果子进程终止。
如果父进程在子进程之前终止会怎样?在这种情况下,除非采取预防措施,否则子进程将变成并保持为僵尸进程,并在进程表中有一个条目。预防措施主要有两种类型。一种预防措施是让父进程通知系统,父进程对子进程的终止不感兴趣
signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */
第二种方法是让父进程对子进程的终止执行 wait,从而确保父进程比子进程活得更久。第二种方法在 pipeUN 程序中使用,其中父代码具有以下调用
wait(NULL); /* called in parent */
对 wait 的此调用意味着等待任何子进程的终止发生,而在 pipeUN 程序中,只有一个子进程。(NULL 参数可以用整数变量的地址替换,以保存子进程的退出状态。)有一个更灵活的 waitpid 函数用于细粒度控制,例如,用于指定多个子进程中的特定子进程。
pipeUN 程序采取了另一个预防措施。当父进程完成等待后,父进程通过调用常规的 exit 函数终止。相比之下,子进程通过调用 _exit 变体终止,该变体快速跟踪终止通知。实际上,子进程告诉系统尽快通知父进程子进程已终止。
如果两个进程写入同一个匿名管道,字节是否可以交错?例如,如果进程 P1 写入
foo bar
到管道,并且进程 P2 同时写入
baz baz
到同一个管道,管道内容似乎可能是任意的,例如
baz foo baz bar
POSIX 标准确保只要写入不超过 PIPE_BUF 字节,写入就不会交错。在 Linux 系统上,PIPE_BUF 的大小为 4,096 字节。我对管道的偏好是只有一个写入器和一个读取器,从而避免了这个问题。
命名管道
匿名管道没有后备文件:系统维护一个内存缓冲区,用于将字节从写入器传输到读取器。一旦写入器和读取器终止,缓冲区将被回收,因此匿名管道将消失。相比之下,命名管道具有后备文件和不同的 API。
让我们看另一个命令行示例,以了解命名管道的要点。以下是步骤
- 打开两个终端。两个终端的工作目录应相同。
- 在一个终端中,输入以下两个命令(提示符再次为 %,我的注释以 ## 开头)
% mkfifo tester ## creates a backing file named tester % cat tester ## type the pipe's contents to stdout
一开始,终端中不应显示任何内容,因为尚未向命名管道写入任何内容。
- 在第二个终端中,输入命令
% cat > tester ## redirect keyboard input to the pipe hello, world! ## then hit Return key bye, bye ## ditto <Control-C> ## terminate session with a Control-C
输入到此终端的任何内容都会在另一个终端中回显。一旦输入 Ctrl+C,两个终端都会返回常规命令行提示符:管道已关闭。
- 通过删除实现命名管道的文件来清理
% unlink tester
正如实用程序名称 mkfifo 所暗示的那样,命名管道也称为 FIFO,因为第一个字节输入是第一个字节输出,依此类推。有一个名为 mkfifo 的库函数,用于在程序中创建命名管道,并在下一个示例中使用,该示例由两个进程组成:一个进程写入命名管道,另一个进程从该管道读取。
示例 2. fifoWriter 程序
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#define MaxLoops 12000 /* outer loop */
#define ChunkSize 16 /* how many written at a time */
#define IntsPerChunk 4 /* four 4-byte ints per chunk */
#define MaxZs 250 /* max microseconds to sleep */
int main() {
const char* pipeName = "./fifoChannel";
mkfifo(pipeName, 0666); /* read/write for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
if (fd < 0) return -1; /* can't go on */
int i;
for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */
int j;
for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */
int k;
int chunk[IntsPerChunk];
for (k = 0; k < IntsPerChunk; k++)
chunk[k] = rand();
write(fd, chunk, sizeof(chunk));
}
usleep((rand() % MaxZs) + 1); /* pause a bit for realism */
}
close(fd); /* close pipe: generates an end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */
printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);
return 0;
}
上面的 fifoWriter 程序可以总结如下
- 该程序创建一个用于写入的命名管道
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */ int fd = open(pipeName, O_CREAT | O_WRONLY);
其中 pipeName 是作为第一个参数传递给 mkfifo 的后备文件的名称。然后使用现在熟悉的对 open 函数的调用打开命名管道,该函数返回文件描述符。
- 为了增加真实感,fifoWriter 不会一次写入所有数据,而是写入一个块,休眠随机数量的微秒,依此类推。总共将 768,000 个 4 字节的整数值写入命名管道。
- 在关闭命名管道后,fifoWriter 还会取消链接该文件
close(fd); /* close pipe: generates end-of-stream marker */ unlink(pipeName); /* unlink from the implementing file */
一旦连接到管道的每个进程都执行了取消链接操作,系统就会回收后备文件。在本例中,只有两个这样的进程:fifoWriter 和 fifoReader,它们都执行了 unlink 操作。
这两个程序应在具有相同工作目录的不同终端中执行。但是,应在 fifoReader 之前启动 fifoWriter,因为前者创建管道。然后,fifoReader 访问已创建的命名管道。
示例 3. fifoReader 程序
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
unsigned is_prime(unsigned n) { /* not pretty, but efficient */
if (n <= 3) return n > 1;
if (0 == (n % 2) || 0 == (n % 3)) return 0;
unsigned i;
for (i = 5; (i * i) <= n; i += 6)
if (0 == (n % i) || 0 == (n % (i + 2))) return 0;
return 1; /* found a prime! */
}
int main() {
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
if (fd < 0) return -1; /* no point in continuing */
unsigned count = 0, total = 0, primes_count = 0;
while (1) {
int next;
int i;
ssize_t count = read(fd, &next, sizeof(int));
if (0 == count) break; /* end of stream */
else if (count == sizeof(int)) { /* read a 4-byte int value */
total++;
if (is_prime(next)) primes_count++;
}
}
close(fd); /* close pipe from read end */
unlink(file); /* unlink from the underlying file */
printf("Received ints: %u, primes: %u\n", total, primes_count);
return 0;
}
上面的 fifoReader 程序可以总结如下
- 由于 fifoWriter 创建了命名管道,因此 fifoReader 只需要标准调用 open 即可通过后备文件访问管道
const char* file = "./fifoChannel"; int fd = open(file, O_RDONLY);
该文件以只读方式打开。
- 然后,该程序进入一个可能无限循环,尝试在每次迭代中读取一个 4 字节的块。read 调用
ssize_t count = read(fd, &next, sizeof(int));
返回 0 以指示流结束,在这种情况下,fifoReader 跳出循环,关闭命名管道,并在终止之前取消链接后备文件。
- 在读取一个 4 字节的整数后,fifoReader 检查该数字是否为素数。这代表了生产级读取器可能对接收到的字节执行的业务逻辑。在一次示例运行中,在接收到的 768,000 个整数中,有 37,682 个素数。
在重复的示例运行中,fifoReader 成功读取了 fifoWriter 写入的所有字节。这并不奇怪。这两个进程在同一主机上执行,消除了网络问题。命名管道是一种高度可靠且高效的 IPC 机制,因此被广泛使用。
以下是两个程序的输出,每个程序都从单独的终端启动,但具有相同的工作目录
% ./fifoWriter
768000 ints sent to the pipe.
###
% ./fifoReader
Received ints: 768000, primes: 37682
消息队列
管道具有严格的 FIFO 行为:先写入的字节先读取,第二个写入的字节第二个读取,依此类推。消息队列可以表现出相同的行为,但足够灵活,可以以非 FIFO 顺序检索字节块。
顾名思义,消息队列是一系列消息,每条消息都有两个部分
- 有效负载,它是一个字节数组 (C 中的 char)
- 类型,以正整数值给出;类型对消息进行分类以进行灵活检索
考虑以下消息队列的描述,每条消息都标有整数类型
+-+ +-+ +-+ +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
+-+ +-+ +-+ +-+
在显示的四条消息中,标记为 1 的消息位于最前面,即最靠近接收器。接下来是两条标记为 2 的消息,最后是一条标记为 3 的消息在最后面。如果严格的 FIFO 行为正在发挥作用,则消息将按 1-2-2-3 的顺序接收。但是,消息队列允许其他检索顺序。例如,消息可以由接收器按 3-2-1-2 的顺序检索。
mqueue 示例由两个程序组成,sender 将消息写入消息队列,receiver 从该队列读取消息。这两个程序都包含头文件 queue.h,如下所示
示例 4. 头文件 queue.h
#define ProjectId 123
#define PathName "queue.h" /* any existing, accessible file would do */
#define MsgLen 4
#define MsgCount 6
typedef struct {
long type; /* must be of type long */
char payload[MsgLen + 1]; /* bytes in the message */
} queuedMessage;
头文件定义了一个名为 queuedMessage 的结构类型,其中包含 payload(字节数组)和 type(整数)字段。此文件还定义了符号常量(#define 语句),前两个符号常量用于生成键,而键又用于获取消息队列 ID。ProjectId 可以是任何正整数值,PathName 必须是现有的、可访问的文件——在本例中为文件 queue.h。sender 和 receiver 程序中的设置语句是
key_t key = ftok(PathName, ProjectId); /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */
ID qid 实际上是消息队列的文件描述符的对应物。
示例 5. 消息 sender 程序
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}
int main() {
key_t key = ftok(PathName, ProjectId);
if (key < 0) report_and_exit("couldn't get key...");
int qid = msgget(key, 0666 | IPC_CREAT);
if (qid < 0) report_and_exit("couldn't get queue id...");
char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
int i;
for (i = 0; i < MsgCount; i++) {
/* build the message */
queuedMessage msg;
msg.type = types[i];
strcpy(msg.payload, payloads[i]);
/* send the message */
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */
printf("%s sent as type %i\n", msg.payload, (int) msg.type);
}
return 0;
}
上面的 sender 程序发送六条消息,每种指定类型两条消息:第一批消息的类型为 1,接下来的两条消息的类型为 2,最后两条消息的类型为 3。发送语句
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);
配置为非阻塞(标志 IPC_NOWAIT),因为消息非常小。唯一的危险是队列已满,在本例中不太可能发生,会导致发送失败。下面的 receiver 程序也使用 IPC_NOWAIT 标志接收消息。
示例 6. 消息 receiver 程序
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include "queue.h"
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}
int main() {
key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
if (key < 0) report_and_exit("key not gotten...");
int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
if (qid < 0) report_and_exit("no access to queue...");
int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
int i;
for (i = 0; i < MsgCount; i++) {
queuedMessage msg; /* defined in queue.h */
if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
puts("msgrcv trouble...");
printf("%s received as type %i\n", msg.payload, (int) msg.type);
}
/** remove the queue **/
if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */
report_and_exit("trouble removing queue...");
return 0;
}
receiver 程序不创建消息队列,尽管 API 似乎暗示了这一点。在 receiver 中,调用
int qid = msgget(key, 0666 | IPC_CREAT);
由于 IPC_CREAT 标志而具有误导性,但此标志实际上意味着如果需要则创建,否则访问。sender 程序调用 msgsnd 发送消息,而 receiver 调用 msgrcv 检索消息。在本例中,sender 按 1-1-2-2-3-3 的顺序发送消息,但 receiver 然后按 3-1-2-1-3-2 的顺序检索它们,这表明消息队列不受严格的 FIFO 行为的约束
% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3
% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2
上面的输出显示 sender 和 receiver 可以从同一终端启动。输出还显示,即使在 sender 进程创建队列、向队列写入并退出后,消息队列仍然存在。队列仅在 receiver 进程通过调用 msgctl 显式删除它后才会消失
if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */
总结
管道和消息队列 API 从根本上是单向的:一个进程写入,另一个进程读取。有双向命名管道的实现,但我认为这种 IPC 机制在其最简单的时候是最好的。如前所述,消息队列的普及程度有所下降——但没有充分的理由;这些队列是 IPC 工具箱中的另一种工具。第 3 部分通过套接字和信号的 IPC 代码示例完成了 IPC 工具箱的快速浏览。
评论已关闭。