将云事件绑定到 Knative

CloudEvents 提供了一种通用格式来描述事件并提高互操作性。
55 位读者喜欢这篇文章。
woman on laptop sitting at the window

CC BY 3.0 US Mapbox Uncharted ERG

事件已成为现代反应式系统的重要组成部分。实际上,事件可以用于在服务之间进行通信,触发带外处理,或将有效负载发送到 Kafka 等服务。问题在于,事件发布者可能会以任意数量的不同方式表达事件消息,而与内容无关。例如,某些消息是 JSON 格式的有效负载,用于按应用程序序列化和反序列化消息。其他应用程序使用二进制格式,例如 AvroProtobuf,以传输带有元数据的有效负载。当构建旨在轻松集成外部系统并降低消息传输复杂性的事件驱动架构时,这是一个问题。

CloudEvents 是一个开放规范,提供了一种通用格式来描述事件并提高互操作性。许多云提供商和中间件堆栈,包括 KnativeKogitoDebeziumQuarkus,在 CloudEvents 1.0 发布后都采用了这种格式。此外,开发人员需要在无服务器架构中解耦事件生产者和消费者之间的关系。Knative Eventing 与 CloudEvents 规范一致,为以任何编程语言创建、解析、发送和接收事件提供了通用格式。Knative Eventing 还使开发人员能够延迟绑定事件源和事件消费者。例如,使用 JSON 的云事件可能如下所示

{
    "specversion" : "1.0", (1)
    "id" : "11111", (2)
    "source" : "http://localhost:8080/cloudevents", (3)
    "type" : "knative-events-binding", (4)
    "subject" : "cloudevents", (5)
    "time" : "2021-06-04T16:00:00Z", (6)
    "datacontenttype" : "application/json", (7)
    "data" : "{\"message\": \"Knative Events\"}", (8)
}

在上面的代码中

(1) 要使用的 CloudEvents 规范的版本

(2) 特定事件的 ID 字段;idsource 的组合提供了一个唯一的标识符

(3) 统一资源标识符 (URI) 标识事件源,根据事件发生的上下文或发出事件的应用程序

(4) 事件类型,可以使用任何随机词

(5) 关于事件的附加详细信息(可选)

(6) 事件创建时间(可选)

(7) 数据属性的内容类型(可选)

(8) 特定事件的业务数据

这是一个快速示例,说明开发人员如何使用 Knative 和 Quarkus Funqy 扩展启用 CloudEvents 绑定。

1. 创建一个 Quarkus Knative 事件 Maven 项目

生成一个 Quarkus 项目(例如,quarkus-serverless-cloudevent),以创建一个带有 Funqy Knative 事件绑定扩展的简单函数

$ mvn io.quarkus:quarkus-maven-plugin:2.0.0.CR3:create \
       -DprojectGroupId=org.acme \
       -DprojectArtifactId=quarkus-serverless-cloudevent \
       -Dextensions="funqy-knative-events" \
       -DclassName="org.acme.getting.started.GreetingResource" 

2. 在本地运行无服务器事件函数

打开 src/main/java/org/acme/getting/started/funqy/cloudevent 目录中的 CloudEventGreeting.java 文件。@funq 注解使 myCloudEventGreeting 方法能够自动将输入数据映射到云事件消息

private static final Logger log = Logger.getLogger(CloudEventGreeting.class);

    @Funq
    public void myCloudEventGreeting(Person input) {
        log.info("Hello " + input.getName());
    }
}

通过 Quarkus Dev 模式运行该函数

$ ./mvnw quarkus:dev

输出应如下所示

__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
INFO  [io.quarkus] (Quarkus Main Thread) quarkus-serverless-cloudevent 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.0.0.CR3) started in 1.546s. Listening on: http://localhost:8080
INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, funqy-knative-events, smallrye-context-propagation]

--
Tests paused, press [r] to resume

注意:Quarkus 2.x 提供了一个持续测试功能,以便您可以在终端中按 r 键,在添加或更新代码时保持测试您的代码。

现在 CloudEvents 函数正在您的本地开发环境中运行。因此,通过 HTTP 协议向该函数发送一个云事件

curl -v http://localhost:8080 \
  -H "Content-Type:application/json" \
  -H "Ce-Id:1" \
  -H "Ce-Source:cloud-event-example" \
  -H "Ce-Type:myCloudEventGreeting" \
  -H "Ce-Specversion:1.0" \
  -d "{\"name\": \"Daniel\"}"

输出应以以下内容结尾

HTTP/1.1 204 No Content

返回终端,日志应如下所示

INFO [org.acm.get.sta.fun.clo.CloudEventGreeting] (executor-thread-0) Hello Daniel

3. 将无服务器事件函数部署到 Knative

container-image-docker 扩展添加到 Quarkus Funqy 项目。该扩展使您能够基于无服务器事件函数构建容器镜像,然后将其推送到外部容器注册表(例如,Docker Hub, Quay.io

$ ./mvnw quarkus:add-extension -Dextensions="container-image-docker"

打开 src/main/resources/ 目录中的 application.properties 文件。然后添加以下变量来配置 Knative 和 Kubernetes 资源(确保将 yourAccountName 替换为您的容器注册表的帐户名称,例如,您在 Docker Hub 中的用户名)

quarkus.container-image.build=true
quarkus.container-image.push=true
quarkus.container-image.builder=docker
quarkus.container-image.image=docker.io/yourAccountName/funqy-knative-events-codestart

运行以下命令以容器化该函数,然后自动将其推送到 Docker Hub 容器注册表

$ ./mvnw clean package

输出应以 BUILD SUCCESS 结尾。

打开 src/main/k8s 目录中的 funqy-service.yaml 文件。然后将 yourAccountName 替换为您在 Docker Hub 注册表中的帐户信息

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: funqy-knative-events-codestart
spec:
  template:
    metadata:
      name: funqy-knative-events-codestart-v1
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
        - image: docker.io/yourAccountName/funqy-knative-events-codestart

假设容器镜像推送成功,请使用以下 kubectl 命令行工具基于事件函数创建 Knative 服务(请务必登录到 Kubernetes 集群并更改要创建 Knative 服务的命名空间)

$ kubectl create -f src/main/k8s/funqy-service.yaml

输出应如下所示

service.serving.knative.dev/funqy-knative-events-codestart created

创建一个默认代理以订阅事件函数。使用 kn Knative Serving 命令行工具

$ kn broker create default

打开 src/main/k8s 目录中的 funqy-trigger.yaml 文件并将其替换为

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-cloudevent-greeting
spec:
  broker: default
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: funqy-knative-events-codestart

使用 kubectl 命令行工具创建一个触发器

$ kubectl create -f src/main/k8s/funqy-trigger.yaml

输出应如下所示

trigger.eventing.knative.dev/my-cloudevent-greeting created

4. 向 Kubernetes 中的无服务器事件函数发送云事件

找出该函数的路由 URL 并检查输出是否如下所示

$ kubectl get rt
NAME URL READY REASON
funqy-knative-events-codestart  http://funqy-knative-events-codestart-YOUR_HOST_DOMAIN   True 

通过 HTTP 协议向该函数发送一个云事件

curl -v http://funqy-knative-events-codestart-YOUR_HOST_DOMAIN \
  -H "Content-Type:application/json" \
  -H "Ce-Id:1" \
  -H "Ce-Source:cloud-event-example" \
  -H "Ce-Type:myCloudEventGreeting" \
  -H "Ce-Specversion:1.0" \
  -d "{\"name\": \"Daniel\"}"

输出应以以下内容结尾

HTTP/1.1 204 No Content

一旦函数 pod 扩容,请查看 pod 日志。使用以下 kubectl 命令检索 pod 的名称

$ kubectl get pod

输出将如下所示

NAME                                                           READY   STATUS    RESTARTS   AGE
funqy-knative-events-codestart-v1-deployment-6569f6dfc-zxsqs   2/2     Running   0          11s

运行以下 kubectl 命令以验证 pod 的日志是否与本地测试结果匹配: 

$ kubectl logs funqy-knative-events-codestart-v1-deployment-6569f6dfc-zxsqs -c user-container | grep CloudEventGreeting

输出如下所示

INFO  [org.acm.get.sta.fun.clo.CloudEventGreeting] (executor-thread-0) Hello Daniel

如果您将事件函数部署到 OpenShift Kubernetes Distribution (OKD) 集群,您将在拓扑视图中找到部署状态

您还可以在Pod 详情选项卡中找到 pod 的日志

下一步是什么?

开发人员可以使用 Quarkus 函数将云事件绑定到 Knative。Quarkus 还搭建了 Kubernetes 清单,例如 Knative 服务和触发器,以通过通道或 HTTP 请求处理云事件。

通过 OpenShift 的 交互式自助学习门户了解更多关于无服务器和 Quarkus 的主题。

接下来阅读什么

什么是 Java 无服务器?

Java 仍然是开发企业应用程序最流行的语言之一。那么,为什么无服务器开发人员要避开它呢?

(特约撰稿人, Red Hat)
2021 年 5 月 19 日
标签
danieloh
技术营销、开发者布道师、CNCF 大使、公开演讲者、已出版作者、Quarkus、Red Hat Runtimes

评论已关闭。

© . All rights reserved.