Apache Kafka Broker

    Notable features are:

    1. .
    2. An Apache Kafka cluster (if you’re just getting started you can follow Strimzi Quickstart page).

    Installation

    1. Install the Kafka controller by entering the following command:

    2. Install the Kafka Broker data plane by entering the following command:

    3. Verify that kafka-controller, kafka-broker-receiver and kafka-broker-dispatcher are running, by entering the following command:

      1. kubectl get deployments.apps -n knative-eventing

      Example output:

      1. NAME READY UP-TO-DATE AVAILABLE AGE
      2. eventing-controller 1/1 1 1 10s
      3. eventing-webhook 1/1 1 1 9s
      4. kafka-controller 1/1 1 1 3s
      5. kafka-broker-dispatcher 1/1 1 1 4s
      6. kafka-broker-receiver 1/1 1 1 5s

    Create a Kafka Broker

    A Kafka Broker object looks like this:

    1. apiVersion: eventing.knative.dev/v1
    2. kind: Broker
    3. metadata:
    4. annotations:
    5. # case-sensitive
    6. eventing.knative.dev/broker.class: Kafka
    7. name: default
    8. namespace: default
    9. spec:
    10. # Configuration specific to this broker.
    11. config:
    12. apiVersion: v1
    13. kind: ConfigMap
    14. name: kafka-broker-config
    15. namespace: knative-eventing
    16. # Optional dead letter sink, you can specify either:
    17. # - deadLetterSink.ref, which is a reference to a Callable
    18. # - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
    19. delivery:
    20. deadLetterSink:
    21. ref:
    22. apiVersion: serving.knative.dev/v1
    23. kind: Service
    24. name: dlq-service

    spec.config should reference any ConfigMap that looks like the following:

    This ConfigMap is installed in the cluster. You can edit the configuration or create a new one with the same values depending on your needs.

    Note

    The default.topic.replication.factor value must be less than or equal to the number of Kafka broker instances in your cluster. For example, if you only have one Kafka broker, the default.topic.replication.factor value should not be more than 1.

    To set the Kafka broker as the default implementation for all brokers in the Knative deployment, you can apply global settings by modifying the config-br-defaults ConfigMap in the knative-eventing namespace.

    This allows you to avoid configuring individual or per-namespace settings for each broker, such as metadata.annotations.eventing.knative.dev/broker.class or spec.config.

    1. apiVersion: v1
    2. metadata:
    3. name: config-br-defaults
    4. namespace: knative-eventing
    5. data:
    6. default-br-config: |
    7. clusterDefault:
    8. brokerClass: Kafka
    9. apiVersion: v1
    10. kind: ConfigMap
    11. name: kafka-broker-config
    12. namespace: knative-eventing
    13. namespaceDefaults:
    14. namespace1:
    15. brokerClass: Kafka
    16. apiVersion: v1
    17. name: kafka-broker-config
    18. namespace: knative-eventing
    19. namespace2:
    20. brokerClass: Kafka
    21. apiVersion: v1
    22. kind: ConfigMap
    23. name: kafka-broker-config
    24. namespace: knative-eventing

    Security

    Apache Kafka supports different security features, Knative supports the followings:

    To enable security features, in the ConfigMap referenced by broker.spec.config, we can reference a Secret:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: kafka-broker-config
    5. namespace: knative-eventing
    6. data:
    7. # Other configurations
    8. # ...
    9. # Reference a Secret called my_secret
    10. auth.secret.ref.name: my_secret

    The Secret my_secret must exist in the same namespace of the ConfigMap referenced by broker.spec.config, in this case: knative-eventing.

    Note

    Certificates and keys must be in PEM format.

    Knative supports the following SASL mechanisms:

    • PLAIN
    • SCRAM-SHA-256
    • SCRAM-SHA-512

    To use a specific SASL mechanism replace <sasl_mechanism> with the mechanism of your choice.

    Authentication using SASL without encryption

    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SASL_PLAINTEXT \
    3. --from-literal=sasl.mechanism=<sasl_mechanism> \
    4. --from-literal=user=<my_user> \
    5. --from-literal=password=<my_password>
    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SASL_SSL \
    3. --from-literal=sasl.mechanism=<sasl_mechanism> \
    4. --from-file=ca.crt=caroot.pem \
    5. --from-literal=user=<my_user> \
    6. --from-literal=password=<my_password>

    Encryption using SSL without client authentication

    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SSL \
    3. --from-file=ca.crt=<my_caroot.pem_file_path> \
    4. --from-file=user.crt=<my_cert.pem_file_path> \
    5. --from-file=user.key=<my_key.pem_file_path>

    Note

    ca.crt can be omitted to fallback to use system’s root CA set.

    Consumer Offsets Commit Interval

    Kafka consumers keep track of the last successfully sent events by committing offsets.

    Knative Kafka Broker commits the offset every auto.commit.interval.ms milliseconds.

    Note

    The interval can be changed by changing the config-kafka-broker-data-plane ConfigMap in the knative-eventing namespace by modifying the parameter auto.commit.interval.ms as follows:

    1. kind: ConfigMap
    2. metadata:
    3. name: config-kafka-broker-data-plane
    4. namespace: knative-eventing
    5. data:
    6. # Some configurations omitted ...
    7. config-kafka-broker-consumer.properties: |
    8. # Some configurations omitted ...
    9. # Commit the offset every 5000 millisecods (5 seconds)
    10. auto.commit.interval.ms=5000

    Note

    Knative Kafka Broker guarantees at least once delivery, which means that your applications may receive duplicate events. A higher commit interval means that there is a higher probability of receiving duplicate events, because when a Consumer restarts, it restarts from the last committed offset.

    Knative exposes all available Kafka producer and consumer configurations that can be modified to suit your workloads.

    You can change these configurations by modifying the config-kafka-broker-data-plane ConfigMap in the knative-eventing namespace.

    Documentation for the settings available in this ConfigMap is available on the Apache Kafka website, in particular, and Consumer configurations.

    Enable debug logging for data plane components

    The following YAML shows the default logging configuration for data plane components, that is created during the installation step:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: kafka-config-logging
    5. namespace: knative-eventing
    6. data:
    7. config.xml: |
    8. <configuration>
    9. <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
    10. <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
    11. </appender>
    12. <root level="INFO">
    13. <appender-ref ref="jsonConsoleAppender"/>
    14. </root>
    15. </configuration>

    To change the logging level to DEBUG, you must:

    1. Apply the following kafka-config-logging ConfigMap or replace level="INFO" with level="DEBUG" to the ConfigMap kafka-config-logging:

      1. apiVersion: v1
      2. kind: ConfigMap
      3. metadata:
      4. name: kafka-config-logging
      5. namespace: knative-eventing
      6. data:
      7. config.xml: |
      8. <configuration>
      9. <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
      10. <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
      11. </appender>
      12. <root level="DEBUG">
      13. <appender-ref ref="jsonConsoleAppender"/>
      14. </root>
      15. </configuration>
    2. Restart the kafka-broker-receiver and the kafka-broker-dispatcher, by entering the following commands:

    Configuring the order of delivered events

    When dispatching events, the Kafka broker can be configured to support different delivery ordering guarantees.

    You can configure the delivery order of events using the kafka.eventing.knative.dev/delivery.order annotation on the Trigger object:

    1. apiVersion: eventing.knative.dev/v1
    2. kind: Trigger
    3. metadata:
    4. name: my-service-trigger
    5. annotations:
    6. kafka.eventing.knative.dev/delivery.order: ordered
    7. spec:
    8. broker: my-kafka-broker
    9. subscriber:
    10. ref:
    11. apiVersion: serving.knative.dev/v1
    12. kind: Service
    13. name: my-service
    • unordered: An unordered consumer is a non-blocking consumer that delivers messages unordered, while preserving proper offset management.
    • ordered: An ordered consumer is a per-partition blocking consumer that waits for a successful response from the CloudEvent subscriber before it delivers the next message of the partition.

    is the default ordering guarantee, while ordered is considered unstable, use with caution.

    Additional information