Use EventBus and Trigger

  • You need to create a function as the target function to be triggered. For more information, see Use EventSource.
  • You need to create a Kafka cluster. For more information, see .

Deploy an NATS streaming server

Run the following commands to deploy an NATS streaming server. This document uses as the access address of the NATS streaming server and stan as the cluster ID. For more information, see .

  1. Use the following content to create a configuration file (for example, openfuncasync-function.yaml) for the target function, which is triggered by the Trigger CRD and prints the received message.

    1. apiVersion: core.openfunction.io/v1beta1
    2. kind: Function
    3. metadata:
    4. name: trigger-target
    5. spec:
    6. version: "v1.0.0"
    7. image: openfunctiondev/v1beta1-trigger-target:latest
    8. port: 8080
    9. serving:
    10. runtime: "async"
    11. scaleOptions:
    12. keda:
    13. scaledObject:
    14. pollingInterval: 15
    15. minReplicaCount: 0
    16. maxReplicaCount: 10
    17. cooldownPeriod: 30
    18. triggers:
    19. - type: stan
    20. metadata:
    21. natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222"
    22. queueGroup: "grp1"
    23. durableName: "ImDurable"
    24. subject: "metrics"
    25. lagThreshold: "10"
    26. inputs:
    27. - name: autoscaling-pubsub
    28. component: eventbus
    29. topic: metrics
    30. pubsub:
    31. eventbus:
    32. type: pubsub.natsstreaming
    33. version: v1
    34. metadata:
    35. - name: natsURL
    36. value: "nats://nats.default:4222"
    37. - name: natsStreamingClusterID
    38. value: "stan"
    39. - name: subscriptionType
    40. value: "queue"
    41. - name: durableSubscriptionName
    42. value: "ImDurable"
    43. - name: consumerID
    44. value: "grp1"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f openfuncasync-function.yaml

Create an EventBus and an EventSource

  1. Use the following content to create a configuration file (for example, eventbus.yaml) for an EventBus.

    1. kind: EventBus
    2. metadata:
    3. name: default
    4. spec:
    5. natsStreaming:
    6. natsURL: "nats://nats.default:4222"
    7. natsStreamingClusterID: "stan"
    8. durableSubscriptionName: "ImDurable"
  2. Use the following content to create a configuration file (for example, eventsource.yaml) for an EventSource.

    Note

  3. Run the following commands to apply these configuration files.

    1. kubectl apply -f eventbus.yaml
    2. kubectl apply -f eventsource.yaml
  4. Run the following commands to check the results.

    1. $ kubectl get eventsources.events.openfunction.io
    2. NAME EVENTBUS SINK STATUS
    3. my-eventsource default Ready
    4. $ kubectl get eventbus.events.openfunction.io
    5. NAME AGE
    6. default 6m53s
    7. $ kubectl get components
    8. NAME AGE
    9. serving-6r5dl-component-eventbus-jlpqf 11m
    10. serving-9689d-component-ebfes-my-eventsource-cmcbw 6m57s
    11. serving-9689d-component-esc-kafka-sample-two-l99cg 6m57s
    12. serving-k6zw8-component-cron-9x8hl 61m
    13. serving-k6zw8-component-kafka-server-sjrzs 61m
    14. $ kubectl get deployments.apps
    15. NAME READY UP-TO-DATE AVAILABLE AGE
    16. serving-6r5dl-deployment-v100-m7nq2 0/0 0 0 12m
    17. serving-9689d-deployment-v100-5qdvk 1/1 1 1 7m17s

    Note

    In the case of using the event bus, the workflow of the EventSource controller is described as follows:

    1. Create an EventSource custom resource named my-eventsource.
    2. Retrieve and reorganize the configuration of the EventBus, including the EventBus name (default in this example) and the name of the Dapr component associated with the EventBus.
    3. Create a Dapr component named serving-xxxxx-component-ebfes-my-eventsource-xxxxx to enable the EventSource to associate with the event bus.
    4. Create a Dapr component named serving-xxxxx-component-esc-kafka-sample-two-xxxxx to enable the EventSource to associate with the event source.
    5. Create a Deployment named serving-xxxxx-deployment-v100-xxxxx for processing events.
  1. Use the following content to create a configuration file (for example, trigger.yaml) for a Trigger.

    Note

    • Set the event bus associated with the Trigger through spec.eventBus.
    • Set the event input source through spec.inputs.
    • This is a simple trigger that collects events from the EventBus named default. When it retrieves a sample-two event from the EventSource my-eventsource, it triggers a Knative service named function-sample-serving-qrdx8-ksvc-fwml8 and sends the event to the topic metrics of the event bus at the same time.
    1. apiVersion: events.openfunction.io/v1alpha1
    2. kind: Trigger
    3. metadata:
    4. name: my-trigger
    5. spec:
    6. logLevel: "2"
    7. eventBus: "default"
    8. inputs:
    9. inputDemo:
    10. eventSource: "my-eventsource"
    11. event: "sample-two"
    12. subscribers:
    13. - condition: inputDemo
    14. topic: "metrics"
  2. Run the following command to apply the configuration file.

    1. $ kubectl get triggers.events.openfunction.io
    2. my-trigger default Ready
    3. NAME AGE
    4. default 62m
    5. $ kubectl get components
    6. NAME AGE
    7. serving-9689d-component-ebfes-my-eventsource-cmcbw 46m
    8. serving-9689d-component-esc-kafka-sample-two-l99cg 46m
    9. serving-dxrhd-component-eventbus-t65q7 13m
    10. serving-zwlj4-component-ebft-my-trigger-4925n 100s

    Note

    In the case of using the event bus, the workflow of the Trigger controller is as follows:

    1. Create a Trigger custom resource named my-trigger.
    2. Retrieve and reorganize the configuration of the EventBus, including the EventBus name (default in this example) and the name of the Dapr component associated with the EventBus.
    3. Create a Dapr component named serving-xxxxx-component-ebft-my-trigger-xxxxx to enable the Trigger to associatie with the event bus.
    4. Create a Deployment named serving-xxxxx-deployment-v100-xxxxx for processing trigger tasks.

Create an Event Producer

  1. Use the following content to create an event producer configuration file (for example, events-producer.yaml).

    1. apiVersion: core.openfunction.io/v1beta1
    2. kind: Function
    3. metadata:
    4. name: events-producer
    5. spec:
    6. version: "v1.0.0"
    7. image: openfunctiondev/v1beta1-bindings:latest
    8. serving:
    9. template:
    10. containers:
    11. - name: function
    12. imagePullPolicy: Always
    13. runtime: "async"
    14. inputs:
    15. - name: cron
    16. component: cron
    17. outputs:
    18. - name: target
    19. component: kafka-server
    20. operation: "create"
    21. bindings:
    22. cron:
    23. type: bindings.cron
    24. version: v1
    25. metadata:
    26. - name: schedule
    27. value: "@every 2s"
    28. kafka-server:
    29. type: bindings.kafka
    30. version: v1
    31. metadata:
    32. - name: brokers
    33. value: "kafka-server-kafka-brokers:9092"
    34. - name: topics
    35. value: "events-sample"
    36. - name: consumerGroup
    37. value: "bindings-with-output"
    38. - name: publishTopic
    39. value: "events-sample"
    40. - name: authRequired
    41. value: "false"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f events-producer.yaml