Apache Kafka Channel Example

    Creating a KafkaChannel channel CRD

    Create a new object by configuring the YAML file as follows:

    Specifying the default channel configuration

    To configure the usage of the KafkaChannel CRD as the default channel configuration, edit the default-ch-webhook ConfigMap as follows:

    1. cat <<-EOF | kubectl apply -f -
    2. ---
    3. apiVersion: v1
    4. kind: ConfigMap
    5. metadata:
    6. name: default-ch-webhook
    7. namespace: knative-eventing
    8. data:
    9. # Configuration for defaulting channels that do not specify CRD implementations.
    10. default-ch-config: |
    11. clusterDefault:
    12. kind: KafkaChannel
    13. spec:
    14. numPartitions: 3
    15. replicationFactor: 1
    16. EOF

    Now that KafkaChannel is set as the default channel configuration, you can use the channels.messaging.knative.dev CRD to create a new Apache Kafka channel, using the generic Channel:

    1. cat <<-EOF | kubectl apply -f -
    2. ---
    3. apiVersion: messaging.knative.dev/v1
    4. kind: Channel
    5. name: testchannel-one
    6. EOF

    Check Kafka for a testchannel topic. With Strimzi this can be done by using the command:

    1. kubectl -n kafka exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --zookeeper localhost:2181 --list

    The result is:

    1. ...
    2. knative-messaging-kafka.default.testchannel-one
    3. ...

    The Apache Kafka topic that is created by the channel implementation is prefixed with knative-messaging-kafka. This indicates it is an Apache Kafka channel from Knative. It contains the name of the namespace, default in this example, followed by the actual name of the channel.

    Configuring the Knative broker for Apache Kafka channels

    To setup a broker that will use the new default Kafka channels, you must create a new default broker, using the command:

    This will give you two pods, such as:

    1. default-broker-filter-64658fc79f-nf596 1/1 Running 0 15m
    2. default-broker-ingress-ff79755b6-vj9jt 1/1 Running 0 15m
    1. ...
    2. knative-messaging-kafka.default.default-kn2-ingress
    3. knative-messaging-kafka.default.default-kn2-trigger
    4. ...

    Creating a service and trigger to use the Apache Kafka broker

    To use the Apache Kafka based broker, let’s take a look at a simple demo. Use theApiServerSource to publish events to the broker as well as the Trigger API, which then routes events to a Knative Service.

    1. Install ksvc, using the command:

      1. kubectl apply -f 000-ksvc.yaml
    2. Install a source that publishes to the default broker

      1. Create a trigger that routes the events to the ksvc:

      Now that your Eventing cluster is configured for Apache Kafka, you can verify your configuration with the following options.

      Now you can see the events in the log of the ksvc using the command:

      1. kubectl logs --selector='serving.knative.dev/service=broker-kafka-display' -c user-container

      Authentication against an Apache Kafka

      In production environments it is common that the Apache Kafka cluster is secured using TLS or . This section shows how to confiugure the KafkaChannel to work against a protected Apache Kafka cluster, with the two supported TLS and SASL authentication methods.

      To use TLS authentication you must create:

      • A CA certificate
      • A client certificate and key
      1. Create the certificate files as secrets in your chosen namespace:
      1. $ kubectl create secret --namespace <namespace> generic <kafka-auth-secret> \
      2. --from-file=user.crt=certificate.pem \
      3. --from-file=user.key=key.pem

      NOTE: It is important to use the same keys (ca.crt, user.crt and user.key).

      Reference your secret and the namespace of the secret in the config-kafka ConfigMap:

      1. apiVersion: v1
      2. kind: ConfigMap
      3. metadata:
      4. name: config-kafka
      5. namespace: knative-eventing
      6. data:
      7. bootstrapServers: <bootstrap-servers>
      8. authSecretName: <kafka-auth-secret>
      9. authSecretNamespace: <namespace>

      To use SASL authentication, you will need the following information:

      • A username and password.
      • The type of SASL mechanism you wish to use. For example; PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.

      NOTE: It is recommended to also enable TLS. If you enable this, you will also need the ca.crt certificate as described in the previous section.

      1. Create the certificate files as secrets in your chosen namespace:
      1. $ kubectl create secret --namespace <namespace> generic <kafka-auth-secret> \
      2. --from-file=ca.crt=caroot.pem \
      3. --from-literal=password="SecretPassword" \
      4. --from-literal=saslType="SCRAM-SHA-512" \

      NOTE: It is important to use the same keys; user, password and saslType.

      Reference your secret and the namespace of the secret in the config-kafka ConfigMap:

      Feedback

      Was this page helpful?

      Glad to hear it! Please .