Use EventBus and Trigger
- You need to create a function as the target function to be triggered. For more information, see Use EventSource.
- You need to create a Kafka cluster. For more information, see .
Deploy an NATS streaming server
Run the following commands to deploy an NATS streaming server. This document uses as the access address of the NATS streaming server and stan
as the cluster ID. For more information, see .
Use the following content to create a configuration file (for example,
openfuncasync-function.yaml
) for the target function, which is triggered by the Trigger CRD and prints the received message.apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
name: trigger-target
spec:
version: "v1.0.0"
image: openfunctiondev/v1beta1-trigger-target:latest
port: 8080
serving:
runtime: "async"
scaleOptions:
keda:
scaledObject:
pollingInterval: 15
minReplicaCount: 0
maxReplicaCount: 10
cooldownPeriod: 30
triggers:
- type: stan
metadata:
natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222"
queueGroup: "grp1"
durableName: "ImDurable"
subject: "metrics"
lagThreshold: "10"
inputs:
- name: autoscaling-pubsub
component: eventbus
topic: metrics
pubsub:
eventbus:
type: pubsub.natsstreaming
version: v1
metadata:
- name: natsURL
value: "nats://nats.default:4222"
- name: natsStreamingClusterID
value: "stan"
- name: subscriptionType
value: "queue"
- name: durableSubscriptionName
value: "ImDurable"
- name: consumerID
value: "grp1"
Run the following command to apply the configuration file.
kubectl apply -f openfuncasync-function.yaml
Create an EventBus and an EventSource
Use the following content to create a configuration file (for example,
eventbus.yaml
) for an EventBus.kind: EventBus
metadata:
name: default
spec:
natsStreaming:
natsURL: "nats://nats.default:4222"
natsStreamingClusterID: "stan"
durableSubscriptionName: "ImDurable"
Use the following content to create a configuration file (for example,
eventsource.yaml
) for an EventSource.Note
Run the following commands to apply these configuration files.
kubectl apply -f eventbus.yaml
kubectl apply -f eventsource.yaml
Run the following commands to check the results.
$ kubectl get eventsources.events.openfunction.io
NAME EVENTBUS SINK STATUS
my-eventsource default Ready
$ kubectl get eventbus.events.openfunction.io
NAME AGE
default 6m53s
$ kubectl get components
NAME AGE
serving-6r5dl-component-eventbus-jlpqf 11m
serving-9689d-component-ebfes-my-eventsource-cmcbw 6m57s
serving-9689d-component-esc-kafka-sample-two-l99cg 6m57s
serving-k6zw8-component-cron-9x8hl 61m
serving-k6zw8-component-kafka-server-sjrzs 61m
$ kubectl get deployments.apps
NAME READY UP-TO-DATE AVAILABLE AGE
serving-6r5dl-deployment-v100-m7nq2 0/0 0 0 12m
serving-9689d-deployment-v100-5qdvk 1/1 1 1 7m17s
Note
In the case of using the event bus, the workflow of the EventSource controller is described as follows:
- Create an EventSource custom resource named
my-eventsource
. - Retrieve and reorganize the configuration of the EventBus, including the EventBus name (
default
in this example) and the name of the Dapr component associated with the EventBus. - Create a Dapr component named
serving-xxxxx-component-ebfes-my-eventsource-xxxxx
to enable the EventSource to associate with the event bus. - Create a Dapr component named
serving-xxxxx-component-esc-kafka-sample-two-xxxxx
to enable the EventSource to associate with the event source. - Create a Deployment named
serving-xxxxx-deployment-v100-xxxxx
for processing events.
Use the following content to create a configuration file (for example,
trigger.yaml
) for a Trigger.Note
- Set the event bus associated with the Trigger through
spec.eventBus
. - Set the event input source through
spec.inputs
. - This is a simple trigger that collects events from the EventBus named
default
. When it retrieves asample-two
event from the EventSourcemy-eventsource
, it triggers a Knative service namedfunction-sample-serving-qrdx8-ksvc-fwml8
and sends the event to the topicmetrics
of the event bus at the same time.
apiVersion: events.openfunction.io/v1alpha1
kind: Trigger
metadata:
name: my-trigger
spec:
logLevel: "2"
eventBus: "default"
inputs:
inputDemo:
eventSource: "my-eventsource"
event: "sample-two"
subscribers:
- condition: inputDemo
topic: "metrics"
- Set the event bus associated with the Trigger through
Run the following command to apply the configuration file.
-
$ kubectl get triggers.events.openfunction.io
my-trigger default Ready
NAME AGE
default 62m
$ kubectl get components
NAME AGE
serving-9689d-component-ebfes-my-eventsource-cmcbw 46m
serving-9689d-component-esc-kafka-sample-two-l99cg 46m
serving-dxrhd-component-eventbus-t65q7 13m
serving-zwlj4-component-ebft-my-trigger-4925n 100s
Note
In the case of using the event bus, the workflow of the Trigger controller is as follows:
- Create a Trigger custom resource named
my-trigger
. - Retrieve and reorganize the configuration of the EventBus, including the EventBus name (
default
in this example) and the name of the Dapr component associated with the EventBus. - Create a Dapr component named
serving-xxxxx-component-ebft-my-trigger-xxxxx
to enable the Trigger to associatie with the event bus. - Create a Deployment named
serving-xxxxx-deployment-v100-xxxxx
for processing trigger tasks.
Create an Event Producer
Use the following content to create an event producer configuration file (for example,
events-producer.yaml
).apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
name: events-producer
spec:
version: "v1.0.0"
image: openfunctiondev/v1beta1-bindings:latest
serving:
template:
containers:
- name: function
imagePullPolicy: Always
runtime: "async"
inputs:
- name: cron
component: cron
outputs:
- name: target
component: kafka-server
operation: "create"
bindings:
cron:
type: bindings.cron
version: v1
metadata:
- name: schedule
value: "@every 2s"
kafka-server:
type: bindings.kafka
version: v1
metadata:
- name: brokers
value: "kafka-server-kafka-brokers:9092"
- name: topics
value: "events-sample"
- name: consumerGroup
value: "bindings-with-output"
- name: publishTopic
value: "events-sample"
- name: authRequired
value: "false"
Run the following command to apply the configuration file.
kubectl apply -f events-producer.yaml