Apache Kafka

    To setup Apache Kafka pubsub create a component of type . See this guide on how to create and apply a pubsub configuration. For details on using secretKeyRef, see the guide on .

    The secretKeyRef above is referencing a kubernetes secrets store to access the tls information. Visit to learn more about how to configure a secret store component.

    Kafka supports a variety of authentication schemes and Dapr supports several: SASL password, mTLS, OIDC/OAuth2. With the added authentication methods, the authRequired field has been deprecated from the v1.6 release and instead the authType field should be used. If authRequired is set to true, Dapr will attempt to configure authType correctly based on the value of saslPassword. There are four valid values for authType: none, password, mtls, and oidc. Note this is authentication only; authorization is still configured within Kafka.

    None

    Setting authType to none will disable any authentication. This is NOT recommended in production.

    1. apiVersion: dapr.io/v1alpha1
    2. kind: Component
    3. metadata:
    4. name: kafka-pubsub-noauth
    5. namespace: default
    6. spec:
    7. type: pubsub.kafka
    8. version: v1
    9. metadata:
    10. - name: brokers # Required. Kafka broker connection setting
    11. value: "dapr-kafka.myapp.svc.cluster.local:9092"
    12. - name: consumerGroup # Optional. Used for input bindings.
    13. value: "group1"
    14. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    15. value: "my-dapr-app-id"
    16. value: "none"
    17. - name: maxMessageBytes # Optional.
    18. value: 1024
    19. - name: consumeRetryInterval # Optional.
    20. value: 200ms
    21. - name: version # Optional.
    22. value: 0.10.2.0
    23. - name: disableTls
    24. value: "true"

    SASL Password

    Mutual TLS

    Setting authType to mtls uses a x509 client certificate (the clientCert field) and key (the clientKey field) to authenticate. Note that mTLS as an authentication mechanism is distinct from using TLS to secure the transport layer via encryption. mTLS requires TLS transport (meaning disableTls must be false), but securing the transport layer does not require using mTLS. See for configuring underlying TLS transport.

    1. apiVersion: dapr.io/v1alpha1
    2. metadata:
    3. name: kafka-pubsub-mtls
    4. namespace: default
    5. spec:
    6. type: pubsub.kafka
    7. version: v1
    8. metadata:
    9. - name: brokers # Required. Kafka broker connection setting
    10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
    11. - name: consumerGroup # Optional. Used for input bindings.
    12. value: "group1"
    13. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    14. value: "my-dapr-app-id"
    15. - name: authType # Required.
    16. value: "mtls"
    17. - name: caCert
    18. secretKeyRef:
    19. name: kafka-tls
    20. key: caCert
    21. - name: clientCert
    22. secretKeyRef:
    23. name: kafka-tls
    24. key: clientCert
    25. - name: clientKey
    26. secretKeyRef:
    27. name: kafka-tls
    28. key: clientKey
    29. - name: maxMessageBytes # Optional.
    30. value: 1024
    31. - name: consumeRetryInterval # Optional.
    32. value: 200ms
    33. - name: version # Optional.
    34. value: 0.10.2.0

    OAuth2 or OpenID Connect

    Setting authType to oidc enables SASL authentication via the OAUTHBEARER mechanism. This supports specifying a bearer token from an external OAuth2 or identity provider. Currenly only the client_credentials grant is supported. Configure to the full URL for the identity provider access token endpoint. Set oidcClientID and oidcClientSecret to the client credentials provisioned in the identity provider. If caCert is specified in the component configuration, the certificate is appended to the system CA trust for verifying the identity provider certificate. Similarly, if skipVerify is specified in the component configuration, verification will also be skipped when accessing the identity provider. By default, the only scope requested for the token is openid; it is highly recommended that additional scopes be specified via oidcScopes in a comma-separated list and validated by the Kafka broker. If additional scopes are not used to narrow the validity of the access token, a compromised Kafka broker could replay the token to access other services as the Dapr clientID.

    By default TLS is enabled to secure the transport layer to Kafka. To disable TLS, set disableTls to true. When TLS is enabled, you can control server certificate verification using skipVerify to disable verificaiton (NOT recommended in production environments) and caCert to specify a trusted TLS certificate authority (CA). If no caCert is specified, the system CA trust will be used. To also configure mTLS authentication, see the section under Authentication. Below is an example of a Kafka pubsub component configured to use transport layer TLS:

    1. apiVersion: dapr.io/v1alpha1
    2. kind: Component
    3. metadata:
    4. name: kafka-pubsub
    5. namespace: default
    6. spec:
    7. version: v1
    8. metadata:
    9. - name: brokers # Required. Kafka broker connection setting
    10. value: "dapr-kafka.myapp.svc.cluster.local:9092"
    11. - name: consumerGroup # Optional. Used for input bindings.
    12. value: "group1"
    13. - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    14. value: "my-dapr-app-id"
    15. - name: authType # Required.
    16. value: "password"
    17. - name: saslUsername # Required if authType is `password`.
    18. value: "adminuser"
    19. - name: consumeRetryInterval # Optional.
    20. value: 200ms
    21. - name: version # Optional.
    22. value: 0.10.2.0
    23. - name: saslPassword # Required if authRequired is `true`.
    24. secretKeyRef:
    25. name: kafka-secrets
    26. key: saslPasswordSecret
    27. - name: maxMessageBytes # Optional.
    28. value: 1024
    29. - name: caCert # Certificate authority certificate.
    30. secretKeyRef:
    31. name: kafka-tls
    32. key: caCert
    33. auth:
    34. secretStore: <SECRET_STORE_NAME>

    When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the metadata query param in the request url.

    Example:

    All other metadata key/value pairs (that are not partitionKey) are set as headers in the Kafka message. Here is an example setting a correlationId for the message.

    1. curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
    2. -H "Content-Type: application/json" \
    3. -d '{
    4. "data": {
    5. "message": "Hi"
    6. }

    You can run Kafka locally using this Docker image. To run without Docker, see the getting started guide .

    To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.

    • Read this guide for instructions on configuring pub/sub components