事件已成为现代反应式系统的重要组成部分。实际上,事件可以用于在服务之间进行通信,触发带外处理,或将有效负载发送到 Kafka 等服务。问题在于,事件发布者可能会以任意数量的不同方式表达事件消息,而与内容无关。例如,某些消息是 JSON 格式的有效负载,用于按应用程序序列化和反序列化消息。其他应用程序使用二进制格式,例如 Avro 和 Protobuf,以传输带有元数据的有效负载。当构建旨在轻松集成外部系统并降低消息传输复杂性的事件驱动架构时,这是一个问题。
CloudEvents 是一个开放规范,提供了一种通用格式来描述事件并提高互操作性。许多云提供商和中间件堆栈,包括 Knative、Kogito、Debezium 和 Quarkus,在 CloudEvents 1.0 发布后都采用了这种格式。此外,开发人员需要在无服务器架构中解耦事件生产者和消费者之间的关系。Knative Eventing 与 CloudEvents 规范一致,为以任何编程语言创建、解析、发送和接收事件提供了通用格式。Knative Eventing 还使开发人员能够延迟绑定事件源和事件消费者。例如,使用 JSON 的云事件可能如下所示
{
"specversion" : "1.0", (1)
"id" : "11111", (2)
"source" : "http://localhost:8080/cloudevents", (3)
"type" : "knative-events-binding", (4)
"subject" : "cloudevents", (5)
"time" : "2021-06-04T16:00:00Z", (6)
"datacontenttype" : "application/json", (7)
"data" : "{\"message\": \"Knative Events\"}", (8)
}
在上面的代码中
(1) 要使用的 CloudEvents 规范的版本
(2) 特定事件的 ID 字段;id
和 source
的组合提供了一个唯一的标识符
(3) 统一资源标识符 (URI) 标识事件源,根据事件发生的上下文或发出事件的应用程序
(4) 事件类型,可以使用任何随机词
(5) 关于事件的附加详细信息(可选)
(6) 事件创建时间(可选)
(7) 数据属性的内容类型(可选)
(8) 特定事件的业务数据
这是一个快速示例,说明开发人员如何使用 Knative 和 Quarkus Funqy 扩展启用 CloudEvents 绑定。
1. 创建一个 Quarkus Knative 事件 Maven 项目
生成一个 Quarkus 项目(例如,quarkus-serverless-cloudevent
),以创建一个带有 Funqy Knative 事件绑定扩展的简单函数
$ mvn io.quarkus:quarkus-maven-plugin:2.0.0.CR3:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=quarkus-serverless-cloudevent \
-Dextensions="funqy-knative-events" \
-DclassName="org.acme.getting.started.GreetingResource"
2. 在本地运行无服务器事件函数
打开 src/main/java/org/acme/getting/started/funqy/cloudevent
目录中的 CloudEventGreeting.java
文件。@funq
注解使 myCloudEventGreeting
方法能够自动将输入数据映射到云事件消息
private static final Logger log = Logger.getLogger(CloudEventGreeting.class);
@Funq
public void myCloudEventGreeting(Person input) {
log.info("Hello " + input.getName());
}
}
通过 Quarkus Dev 模式运行该函数
$ ./mvnw quarkus:dev
输出应如下所示
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
INFO [io.quarkus] (Quarkus Main Thread) quarkus-serverless-cloudevent 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.0.0.CR3) started in 1.546s. Listening on: http://localhost:8080
INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, funqy-knative-events, smallrye-context-propagation]
--
Tests paused, press [r] to resume
注意:Quarkus 2.x 提供了一个持续测试功能,以便您可以在终端中按 r
键,在添加或更新代码时保持测试您的代码。
现在 CloudEvents 函数正在您的本地开发环境中运行。因此,通过 HTTP 协议向该函数发送一个云事件
curl -v http://localhost:8080 \
-H "Content-Type:application/json" \
-H "Ce-Id:1" \
-H "Ce-Source:cloud-event-example" \
-H "Ce-Type:myCloudEventGreeting" \
-H "Ce-Specversion:1.0" \
-d "{\"name\": \"Daniel\"}"
输出应以以下内容结尾
HTTP/1.1 204 No Content
返回终端,日志应如下所示
INFO [org.acm.get.sta.fun.clo.CloudEventGreeting] (executor-thread-0) Hello Daniel
3. 将无服务器事件函数部署到 Knative
将 container-image-docker
扩展添加到 Quarkus Funqy 项目。该扩展使您能够基于无服务器事件函数构建容器镜像,然后将其推送到外部容器注册表(例如,Docker Hub, Quay.io)
$ ./mvnw quarkus:add-extension -Dextensions="container-image-docker"
打开 src/main/resources/
目录中的 application.properties
文件。然后添加以下变量来配置 Knative 和 Kubernetes 资源(确保将 yourAccountName
替换为您的容器注册表的帐户名称,例如,您在 Docker Hub 中的用户名)
quarkus.container-image.build=true
quarkus.container-image.push=true
quarkus.container-image.builder=docker
quarkus.container-image.image=docker.io/yourAccountName/funqy-knative-events-codestart
运行以下命令以容器化该函数,然后自动将其推送到 Docker Hub 容器注册表
$ ./mvnw clean package
输出应以 BUILD SUCCESS
结尾。
打开 src/main/k8s
目录中的 funqy-service.yaml
文件。然后将 yourAccountName
替换为您在 Docker Hub 注册表中的帐户信息
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: funqy-knative-events-codestart
spec:
template:
metadata:
name: funqy-knative-events-codestart-v1
annotations:
autoscaling.knative.dev/target: "1"
spec:
containers:
- image: docker.io/yourAccountName/funqy-knative-events-codestart
假设容器镜像推送成功,请使用以下 kubectl
命令行工具基于事件函数创建 Knative 服务(请务必登录到 Kubernetes 集群并更改要创建 Knative 服务的命名空间)
$ kubectl create -f src/main/k8s/funqy-service.yaml
输出应如下所示
service.serving.knative.dev/funqy-knative-events-codestart created
创建一个默认代理以订阅事件函数。使用 kn Knative Serving 命令行工具
$ kn broker create default
打开 src/main/k8s
目录中的 funqy-trigger.yaml
文件并将其替换为
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-cloudevent-greeting
spec:
broker: default
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: funqy-knative-events-codestart
使用 kubectl
命令行工具创建一个触发器
$ kubectl create -f src/main/k8s/funqy-trigger.yaml
输出应如下所示
trigger.eventing.knative.dev/my-cloudevent-greeting created
4. 向 Kubernetes 中的无服务器事件函数发送云事件
找出该函数的路由 URL 并检查输出是否如下所示
$ kubectl get rt
NAME URL READY REASON
funqy-knative-events-codestart http://funqy-knative-events-codestart-YOUR_HOST_DOMAIN True
通过 HTTP 协议向该函数发送一个云事件
curl -v http://funqy-knative-events-codestart-YOUR_HOST_DOMAIN \
-H "Content-Type:application/json" \
-H "Ce-Id:1" \
-H "Ce-Source:cloud-event-example" \
-H "Ce-Type:myCloudEventGreeting" \
-H "Ce-Specversion:1.0" \
-d "{\"name\": \"Daniel\"}"
输出应以以下内容结尾
HTTP/1.1 204 No Content
一旦函数 pod 扩容,请查看 pod 日志。使用以下 kubectl
命令检索 pod 的名称
$ kubectl get pod
输出将如下所示
NAME READY STATUS RESTARTS AGE
funqy-knative-events-codestart-v1-deployment-6569f6dfc-zxsqs 2/2 Running 0 11s
运行以下 kubectl
命令以验证 pod 的日志是否与本地测试结果匹配:
$ kubectl logs funqy-knative-events-codestart-v1-deployment-6569f6dfc-zxsqs -c user-container | grep CloudEventGreeting
输出如下所示
INFO [org.acm.get.sta.fun.clo.CloudEventGreeting] (executor-thread-0) Hello Daniel
如果您将事件函数部署到 OpenShift Kubernetes Distribution (OKD) 集群,您将在拓扑视图中找到部署状态

(Daniel Oh, CC BY-SA 4.0)
您还可以在Pod 详情选项卡中找到 pod 的日志

(Daniel Oh, CC BY-SA 4.0)
下一步是什么?
开发人员可以使用 Quarkus 函数将云事件绑定到 Knative。Quarkus 还搭建了 Kubernetes 清单,例如 Knative 服务和触发器,以通过通道或 HTTP 请求处理云事件。
通过 OpenShift 的 交互式自助学习门户了解更多关于无服务器和 Quarkus 的主题。
评论已关闭。