Writing an event source using Javascript

    • Using a ContainerSource is a simple way to turn any dispatcher container into a Knative event source.
    • Using provides a framework for injecting environment variables into any Kubernetes resource that has a and is PodSpecable.

    ContainerSource and SinkBinding both work by injecting environment variables to an application. Injected environment variables at minimum contain the URL of a sink that will receive events.

    Create the project and add the dependencies:

    NOTE: Due to this , you must use version 2.0.1 of the Javascript SDK or newer.

    A ContainerSource creates a container for your event source image and manages this container.

    The sink URL to post the events will be made available to the application through the K_SINK environment variable by the ContainerSource.

    The following example event source emits an event to the sink every 1000 milliseconds:

    1. // File - index.js
    2. const { CloudEvent, HTTPEmitter } = require("cloudevents-sdk");
    3. let sinkUrl = process.env['K_SINK'];
    4. console.log("Sink URL is " + sinkUrl);
    5. let emitter = new HTTPEmitter({
    6. url: sinkUrl
    7. });
    8. let eventIndex = 0;
    9. setInterval(function () {
    10. console.log("Emitting event #" + ++eventIndex);
    11. let myevent = new CloudEvent({
    12. source: "urn:event:from:my-api/resource/123",
    13. type: "your.event.source.type",
    14. id: "your-event-id",
    15. dataContentType: "application/json",
    16. data: {"hello": "World " + eventIndex},
    17. });
    18. // Emit the event
    19. emitter.send(myevent)
    20. .then(response => {
    21. // Treat the response
    22. console.log("Event posted successfully");
    23. console.log(response.data);
    24. })
    25. .catch(err => {
    26. // Deal with errors
    27. console.log("Error during event post");
    28. console.error(err);
    29. });
    30. }, 1000);
    1. # File - Dockerfile
    2. WORKDIR /usr/src/app
    3. COPY package*.json ./
    4. RUN npm install
    5. EXPOSE 8080
    6. CMD [ "node", "index.js" ]

    Binary mode is used in most cases because: - It is faster in terms of serialization and deserialization. - It works better with CloudEvent-aware proxies, such as Knative Channels, and can simply check the header instead of parsing the payload.

    1. Build and push the image:

      1. docker build . -t path/to/image/registry/node-knative-heartbeat-source:v1
      2. docker push path/to/image/registry/node-knative-heartbeat-source:v1
    2. Create the event display service which logs any CloudEvents posted to it:

    3. Create the ContainerSource object:

      1. apiVersion: sources.knative.dev/v1
      2. kind: ContainerSource
      3. metadata:
      4. name: test-heartbeats
      5. spec:
      6. template:
      7. spec:
      8. containers:
      9. - image: path/to/image/registry/node-knative-heartbeat-source:v1
      10. name: heartbeats
      11. sink:
      12. ref:
      13. apiVersion: serving.knative.dev/v1
      14. kind: Service
      15. name: event-display
    4. Check the logs of the event display service. You will see a new message is pushed every second:

      1. $ kubectl logs -l serving.knative.dev/service=event-display -c user-container
      2. ☁️ cloudevents.Event
      3. Validation: valid
      4. Context Attributes,
      5. specversion: 1.0
      6. type: your.event.source.type
      7. source: urn:event:from:your-api/resource/123
      8. id: your-event-id
      9. datacontenttype: application/json
      10. Data,
      11. {
      12. "hello": "World 1"
      13. }
    5. Optional: If you are interested in seeing what is injected into the event source as a K_SINK, you can check the logs:

      1. $ kubectl logs test-heartbeats-deployment-7575c888c7-85w5t
      2. Sink URL is http://event-display.default.svc.cluster.local
      3. Emitting event #1
      4. Emitting event #2
      5. Event posted successfully
    1. Create an event display service:

    2. Create a Kubernetes deployment that runs the event source:

      1. apiVersion: apps/v1
      2. metadata:
      3. name: node-heartbeats-deployment
      4. labels:
      5. app: node-heartbeats
      6. spec:
      7. replicas: 2
      8. selector:
      9. matchLabels:
      10. app: node-heartbeats
      11. template:
      12. metadata:
      13. labels:
      14. app: node-heartbeats
      15. spec:
      16. containers:
      17. - name: node-heartbeats
      18. image: path/to/image/registry/node-knative-heartbeat-source:v1
      19. ports:
      20. - containerPort: 8080
    3. Because the SinkBinding has not yet been created, you will see an error message, because the K_SINK environment variable is not yet injected:

      1. $ kubectl logs node-heartbeats-deployment-9ffbb644b-llkzk
      2. Sink URL is undefined
      3. Emitting event #1
      4. Error during event post
      5. TypeError [ERR_INVALID_ARG_TYPE]: The "url" argument must be of type string. Received type undefined
    4. Create the SinkBinding object:

      1. apiVersion: sources.knative.dev/v1
      2. kind: SinkBinding
      3. metadata:
      4. name: bind-node-heartbeat
      5. spec:
      6. subject:
      7. apiVersion: apps/v1
      8. kind: Deployment
      9. selector:
      10. matchLabels:
      11. app: node-heartbeats
      12. sink:
      13. ref:
      14. apiVersion: serving.knative.dev/v1
      15. kind: Service

      You will see the pods are recreated and this time the K_SINK environment variable is injected.

      Also note that since the replicas is set to 2, there will be 2 pods that are posting events to the sink.