Pipy 是一款开源、云原生网络流处理器。它采用模块化设计,可以创建高性能的网络代理。它使用 C++ 编写,并构建于 Asio 异步 I/O 库之上。Pipy 非常适合各种用例,范围从边缘路由器、负载均衡器、代理解决方案、API 网关、静态 HTTP 服务器、服务网格 Sidecar 等等。
Pipy 还通过 PipyJS 提供内置的 JavaScript 支持。PipyJS 具有高度可定制性和可预测的性能,且没有垃圾回收开销。目前,PipyJS 是 Pipy 代码库的一部分,但它不依赖于 Pipy,未来可能会将其移动到独立的软件包中。
Pipy 快速入门指南
您可以使用 Podman 或 Docker 运行 Pipy 的生产版本,并使用官方 Pipy Git 仓库上提供的教程脚本之一。Pipy 容器镜像可以通过几个环境变量进行配置
-
PIPY_CONFIG_FILE=</path/to/config-file>
设置 Pipy 配置文件的位置。 -
PIPY_SPAWN=n
设置您要启动的 Pipy 实例的数量,其中n
是实例的数量。这是一个从零开始的索引,因此 0 代表 1 个实例。例如,对于 4 个实例,请使用PIPY_SPAWN=3
。
使用此示例脚本启动 Pipy 服务器
$ docker run --rm -e PIPY_CONFIG_FILE=\
https://raw.githubusercontent.com/flomesh-io/pipy/main/tutorial/01-hello \
-e PIPY_SPAWN=1 -p 8080:8080 flomesh/pipy-pjs:latest
您可能会注意到,此代码不是提供本地文件,而是通过环境变量 PIPY_CONFIG_FILE
提供了远程 Pipy 脚本的链接。Pipy 足够智能来处理这种情况。
供您参考,以下是文件 tutorial/01-hello/hello.js
的内容
pipy()
.listen(8080)
.serveHTTP(
new Message('Hi, there!\n')
)
这个简单的脚本定义了一个 Port 管道,它监听 8080 端口,并为每个在监听端口上收到的 HTTP 请求返回 “Hi, there!”。
由于您已使用 docker run 命令暴露了本地 8080 端口,因此您可以继续在同一端口上进行测试
$ curl http://localhost:8080
执行上述命令会在控制台上显示 Hi, there!
。
为了学习、开发或调试目的,建议继续进行 Pipy 的本地安装(从源代码构建 Pipy 或为您的操作系统下载发行版),因为它带有管理 Web 控制台以及文档和教程。
本地安装完成后,在不带任何参数的情况下运行 pipy
将在 6060 端口上启动管理控制台,但可以使用 --admin-port
选项将其配置为监听不同的端口。

(Ali Naqvi,CC BY-SA 40)
要从源代码构建 Pipy,或为您的操作系统安装预编译的二进制文件,请参阅 Pipy Git 仓库上的 README.md。
在终端中运行 Pipy
要启动 Pipy 代理,请使用 PipyJS 脚本文件运行 Pipy,例如,如果您需要一个简单的回显服务器,该服务器使用每个传入请求接收的消息正文进行响应,则可以使用 tutorial/01-hello/hello.js
中的脚本
$ pipy tutorial/01-hello/hello.js
或者,在开发和调试时,可以使用内置的 Web UI 启动 Pipy
$ pipy tutorial/01-hello/hello.js --admin-port=6060
要查看所有命令行选项,请使用 --help
标志
$ pipy --help
Pipy 是一个流处理器
Pipy 使用事件驱动的管道在网络流上运行,其中它消耗输入流,执行用户提供的转换,并输出流。Pipy 数据流获取原始数据并将其抽象为事件。事件可以属于以下四类之一
- 数据:网络流由数据字节组成,并以块的形式到达。Pipy 将块抽象为数据事件。
-
MessageStart、MessageEnd、StreamEnd:这三个非数据事件充当标记,为原始字节流提供高级语义,以供业务逻辑依赖。
Pipy 设计
Pipy 的内部工作原理类似于 Unix 管道,但与处理离散字节的 Unix 管道不同,Pipy 处理事件流。
Pipy 通过过滤器链处理传入流,其中每个过滤器处理通用问题,如请求日志记录、身份验证、SSL 卸载、请求转发等等。每个过滤器从其输入读取并写入其输出,一个过滤器的输出连接到下一个过滤器的输入。
管道
过滤器链称为管道,Pipy 根据其输入源将管道分为 3 个不同的类别。
-
端口管道: 从网络端口读取数据事件,对其进行处理,然后将结果写回同一端口。这是最常用的请求和响应模型。例如,当 Pipy 像 HTTP 服务器一样工作时,端口管道的输入是来自客户端的 HTTP 请求,管道的输出将是发送回客户端的 HTTP 响应。
-
定时器管道: 定期获取一对 MessageStart 和 MessageEnd 事件作为其输入。当需要 Cron 类似作业 的功能时很有用。
-
子管道: 与 join 过滤器(如 link)结合使用,该过滤器从其前任管道接收事件,将其馈送到子管道进行处理,从子管道读取输出,然后将其泵送到下一个过滤器。
看待子管道和 join 过滤器的最佳方式是将它们视为过程式编程中子例程的被调用者和调用者。join 过滤器的输入是子例程的参数,join 过滤器的输出是其返回值。
根管道(例如 Port 或 Timer)不能从 join 过滤器调用。
要获取内置过滤器及其参数的列表
$ pipy --list-filters $ pipy --help-filters
上下文
Pipy 中另一个重要的概念是上下文。上下文是附加到管道的一组变量。每个管道都可以访问 Pipy 实例中的同一组变量。换句话说,上下文具有相同的形状。当您启动 Pipy 实例时,您要做的第一件事是通过定义变量及其初始值来定义上下文的形状。
每个根管道都会克隆您在启动时定义的初始上下文。当子管道启动时,它会共享或克隆其父级的上下文,具体取决于您使用的 join 过滤器。例如,link 过滤器共享其父级的上下文,而 demux 过滤器则克隆它。
对于嵌入在管道中的脚本,这些上下文变量是它们的全局变量,这意味着如果这些变量位于同一脚本文件中,则脚本始终可以从任何位置访问这些变量。
这对于经验丰富的程序员来说可能看起来很奇怪,因为全局变量通常意味着它们是全局唯一的。您只有一组这些变量,而在 Pipy 中,我们可以拥有多组变量(上下文),具体取决于有多少根管道为传入网络连接打开,以及有多少子管道克隆其父级的上下文。
编写网络代理
假设您正在运行不同服务的独立实例,并且您想添加一个代理,以根据请求 URL 路径将流量转发到相关服务。这将使您受益于公开单个 URL 和在后端扩展您的服务,而无需用户记住不同的服务 URL。在正常情况下,您的服务将在不同的节点上运行,并且每个服务可能有多个实例在运行。在本示例中,假设您正在运行以下服务,并希望根据 URI 将流量分配给它们。
-
service-hi,位于
/hi/*
(127.0.0.1:8080, 127.0.0.1:8082) -
service-echo,位于
/echo
(127.0.0.1:8081) -
service-tell-ip,位于
/ip_/_*
(127.0.0.1:8082)
Pipy 脚本用 JavaScript 编写,您可以使用您选择的任何文本编辑器来编辑它们。或者,如果您已在本地安装 Pipy,则可以使用 Pipy 管理 Web UI,它带有语法高亮、自动完成、提示以及运行脚本的功能,所有这些都在同一控制台中完成。
启动 Pipy 实例,不带任何参数,以便 Pipy 管理控制台在 6060 端口启动。现在打开您喜欢的 Web 浏览器并导航到 [http://localhost:6060](http://localhost:6060/ 以查看内置的 Pipy 管理 Web UI。

(Ali Naqvi,CC BY-SA 40)
创建 Pipy 程序
良好的设计实践是将代码和配置分开。Pipy 通过其插件支持这种模块化设计,您可以将插件视为 JavaScript 模块。也就是说,您将配置数据存储在 config 文件夹中,并将编码逻辑存储在 plugins 文件夹下的单独文件中。主代理服务器脚本存储在根文件夹中,主代理脚本 (proxy.js
) 将包含并组合在单独模块中定义的功能。最后,您的最终文件夹结构是
├── config
│ ├── balancer.json
│ ├── proxy.json
│ └── router.json
├── plugins
│ ├── balancer.js
│ ├── default.js
│ └── router.js
└── proxy.js
1.单击 新建代码库,在对话框中为代码库名称输入 /proxy
,然后单击 创建。
-
单击 + 按钮以添加新文件。为其文件名输入
/config/proxy.json
,然后单击 创建。这是用于配置您的代理的配置文件。 -
您现在在左侧窗格的 config 文件夹下列出了
proxy.json
。单击该文件以打开它并添加如下所示的配置,并确保通过单击顶部面板上的磁盘图标来保存您的文件。
{ "listen": 8000, "plugins": [ "plugins/router.js", "plugins/balancer.js", "plugins/default.js" ] }
-
重复步骤 2 和 3 以创建另一个文件 /config/router.json,以存储路由信息。输入此配置数据
{ "routes": { "/hi/*": "service-hi", "/echo": "service-echo", "/ip/*": "service-tell-ip" } }
-
重复步骤 2 和 3 以创建另一个文件 /config/balancer.json 以存储您的服务到目标映射。输入以下数据
{ "services": { "service-hi" : ["127.0.0.1:8080", "127.0.0.1:8082"], "service-echo" : ["127.0.0.1:8081"], "service-tell-ip" : ["127.0.0.1:8082"] } }
-
现在是时候编写您的第一个 Pipy 脚本了,该脚本将用作默认回退,当您的服务器收到您未配置任何目标(端点)的请求时。创建文件
/plugins/default.js
。此处的名称只是一个约定,Pipy 不依赖于名称,因此您可以选择任何您喜欢的名称。该脚本将包含如下所示的代码,该代码返回 HTTP 状态代码 404,并显示消息 No handler found
pipy() .pipeline('request') .replaceMessage( new Message({ status: 404 }, 'No handler found'))
7.创建文件 /plugins/router.js
,用于存储您的路由逻辑
(config =>
pipy({
_router: new algo.URLRouter(config.routes), })
.export('router', {
__serviceID: '', })
.pipeline('request')
.handleMessageStart(
msg => (
__serviceID = _router.find(
msg.head.headers.host,
msg.head.path, )
) )
)(JSON.decode(pipy.load('config/router.json')))
-
创建文件
/plugins/balancer.js
,用于存储您的负载均衡逻辑作为旁注。Pipy 附带多种负载均衡算法,但为简单起见,您在此处使用轮询算法。(config => pipy({ _services: ( Object.fromEntries( Object.entries(config.services).map( ([k, v]) => [ k, new algo.RoundRobinLoadBalancer(v) ] ) ) ), _balancer: null, _balancerCache: null, _target: '', }) .import({ __turnDown: 'proxy', __serviceID: 'router', }) .pipeline('session') .handleStreamStart( () => ( _balancerCache = new algo.Cache( // k is a balancer, v is a target (k ) => k.select(), (k,v) => k.deselect(v), ) ) ) .handleStreamEnd( () => ( _balancerCache.clear() ) ) .pipeline('request') .handleMessageStart( () => ( _balancer = _services[__serviceID], _balancer && (_target = _balancerCache.get(_balancer)), _target && (__turnDown = true) ) ) .link( 'forward', () => Boolean(_target), '' ) .pipeline('forward') .muxHTTP( 'connection', () => _target ) .pipeline('connection') .connect( () => _target ) )(JSON.decode(pipy.load('config/balancer.json')))
-
现在编写入口点或代理服务器脚本,以使用上述插件。创建新的代码库(步骤 1)会创建一个默认的
main.js
文件作为入口点。您可以将其用作您的主入口点,或者如果您喜欢使用不同的名称,请随意删除main.js
并创建一个您选择的名称的新文件。对于本示例,请删除它并创建一个名为/proxy.js
的新文件。确保单击顶部标志图标使其成为主入口点,以确保在您单击运行按钮(右侧的箭头图标)时启动脚本执行。(config => pipy() .export('proxy', { __turnDown: false, }) .listen(config.listen) .use(config.plugins, 'session') .demuxHTTP('request') .pipeline('request') .use( config.plugins, 'request', 'response', () => __turnDown ) )(JSON.decode(pipy.load('config/proxy.json')))
到目前为止,您的工作区看起来像这样

(Ali Naqvi,CC BY-SA 40)
要运行您的脚本,请单击播放图标按钮(从右数第 4 个)。Pipy 运行您的代理脚本,您会看到类似于以下的输出

(Ali Naqvi,CC BY-SA 40)
这表明您的代理服务器正在监听 8000 端口(您在 /config/proxy.json
中配置的)。使用 curl 运行测试
$ curl -i [http://localhost:8000](http://localhost:8000)
HTTP/1.1 404 Not Found
content-length: 10
connection: keep-alive
No handler found
该响应是有道理的,因为您尚未为根配置任何目标。尝试您的已配置路由之一,例如 /hi
$ curl -i [http://localhost:8000/hi](http://localhost:8000/hi)
HTTP/1.1 502 Connection Refused
content-length: 0
connection: keep-alive
您收到 502 Connection Refused
,因为您的配置目标端口上没有服务运行。
您可以更新 /config/balancer.json
,其中包含有关您已运行服务的主机和端口等详细信息,使其适合您的用例,或者您可以在 Pipy 中编写一个脚本来监听您的配置端口,并返回简单的消息。
将此代码保存到您本地计算机上的文件中,命名为 mock-proxy.js
,并记住您存储它的位置
pipy()
.listen(8080)
.serveHTTP(
new Message('Hi, there!\n')
)
.listen(8081)
.serveHTTP(
msg => new Message(msg.body)
)
.listen(8082)
.serveHTTP(
msg => new Message(
`You are requesting ${msg.head.path} from ${__inbound.remoteAddress}\n`
)
)
打开一个新的终端窗口并使用 Pipy 运行此脚本(将 /path/to
更改为您存储此脚本文件的位置)
$ pipy /path/to/mock-proxy.js
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [config] Module /mock-proxy.js
2022-01-11 18:56:31 [INF] [config] ================
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [config] [Listen on :::8080]
2022-01-11 18:56:31 [INF] [config] ----->|
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] serveHTTP
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] <-----|
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [config] [Listen on :::8081]
2022-01-11 18:56:31 [INF] [config] ----->|
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] serveHTTP
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] <-----|
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [config] [Listen on :::8082]
2022-01-11 18:56:31 [INF] [config] ----->|
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] serveHTTP
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] <-----|
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [listener] Listening on port 8080 at ::
2022-01-11 18:56:31 [INF] [listener] Listening on port 8081 at ::
2022-01-11 18:56:31 [INF] [listener] Listening on port 8082 at ::
您现在有了在端口 8080、8081 和 8082 上监听的模拟服务。再次在您的代理服务器上进行测试,以查看从您的模拟服务返回的正确响应。
总结
您已经使用了许多 Pipy 功能,包括变量声明、导入和导出变量、插件、管道、子管道、过滤器链、Pipy 过滤器(如 handleMessageStart
、handleStreamStart
和 link)以及 Pipy 类(如 JSON、algo.URLRouter
、algo.RoundRobinLoadBalancer
、algo.Cache
等)。有关更多信息,请阅读优秀的 Pipy 文档,并通过 Pipy 的管理 Web UI,并按照随附的分步教程进行操作。
结论
来自 Flomesh 的 Pipy 是一款开源、极速且轻量级的网络流量处理器。您可以在各种用例中使用它,范围从边缘路由器、负载均衡和代理(正向和反向)、API 网关、静态 HTTP 服务器、服务网格 Sidecar 以及许多其他应用程序。Pipy 正在积极开发中,并由专职提交者和贡献者维护。
评论已关闭。