is a high-performance distributed streaming platform deployed by thousands of companies. In many deployments, administrators require fine-grained access control over Kafka topics to enforce important requirements around confidentiality and integrity.
This tutorial shows how to enforce fine-grained access control over Kafka topics. In this tutorial you will use OPA to define and enforce an authorization policy stating:
- Consumers of topics containing Personally Identifiable Information (PII) must be whitelisted.
- Producers to topics with high fanout must be whitelisted.
In addition, this tutorial shows how to break up a policy with small helper rules to reuse logic and improve overall readability.
This tutorial requires Docker Compose to run Kafka, ZooKeeper, and OPA.
First, create an OPA policy that allows all requests. You will update this policy later in the tutorial.
policies/tutorial.rego:
allow = true
Next, create a docker-compose.yaml
file that runs OPA, ZooKeeper, and Kafka.
docker-compose.yaml:
version: "2"
services:
opa:
hostname: opa
image: openpolicyagent/opa:0.12.2
ports:
- 8181:8181
# WARNING: OPA is NOT running with an authorization policy configured. This
# means that clients can read and write policies in OPA. If you are deploying
# OPA in an insecure environment, you should configure authentication and
# authorization on the daemon. See the Security page for details:
# https://www.openpolicyagent.org/docs/security.html.
command: "run --server --watch /policies"
volumes:
- ./policies:/policies
zookeeper:
image: confluentinc/cp-zookeeper:4.0.0-3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
zk_id: "1"
hostname: kafka
image: openpolicyagent/demo-kafka:1.0
links:
- zookeeper
- opa
ports:
- "9092:9092"
environment:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "SSL://:9093"
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_SSL_CLIENT_AUTH: required
KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker.keystore.jks
KAFKA_SSL_KEY_CREDENTIALS: broker_sslkey_creds
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds
KAFKA_AUTHORIZER_CLASS_NAME: com.lbg.kafka.opa.OpaAuthorizer
KAFKA_OPA_AUTHORIZER_URL: "http://opa:8181/v1/data/kafka/authz/allow"
KAFKA_OPA_AUTHORIZER_ALLOW_ON_ERROR: "false"
KAFKA_OPA_AUTHORIZER_CACHE_INITIAL_CAPACITY: 100
KAFKA_OPA_AUTHORIZER_CACHE_MAXIMUM_SIZE: 100
KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_MS: 600000
For more information on how to configure the OPA plugin for Kafka, see the repository.
Once you have created the file, launch the containers for this tutorial.
docker-compose --project-name opa-kafka-tutorial up
Now that the tutorial environment is running, we can define an authorization policy using OPA and test it.
Authentication
{
"operation": {
"name": "Write",
},
"resource": {
"resourceType": {
"name": "Topic",
},
"name": "credit-scores"
},
"principal": {
"principalType": "User",
},
"clientAddress": "172.21.0.5",
"sanitizedUser": "CN%3Danon_producer.tutorial.openpolicyagent.org%2COU%3DTUTORIAL%2CO%3DOPA%2CL%3DSF%2CST%3DCA%2CC%3DUS"
}
}
The client identity is extracted from the SSL certificates that clients present when they connect to the broker. The client identity information is encoded in the input.session.sanitizedUser
field. This field can be decoded inside the policy.
Generating SSL certificates and JKS files required for SSL client authentication is outside the scope of this tutorial. To simplify the steps below, the Docker Compose file uses an extended version of the image from Docker Hub. The extended image includes pre-generated SSL certificates that the broker and clients use to identify themselves.
Do not rely on these pre-generated SSL certificates in real-world scenarios. They are only provided for convenience/test purposes.
Kafka Authorizer JAR File
The Kafka image used in this tutorial includes a pre-installed JAR file that implements the interface. For more information on the authorizer see open-policy-agent/contrib/kafka_authorizer.
Update the policies/tutorial.rego
with the following content.
The ./policies
directory is mounted into the Docker container running OPA. When the files under this directory change, OPA is notified and the policies are automatically reloaded.
At this point, you can exercise the policy.
This step shows how you can grant fine-grained access to services using Kafka. In this scenario, some services are allowed to read PII data while others are not.
First, run kafka-console-producer
to generate some data on the credit-scores
topic.
docker run --rm --network opakafkatutorial_default \
openpolicyagent/demo-kafka:1.0 \
bash -c 'for i in {1..10}; do echo "{\"user\": \"bob\", \"score\": $i}"; done | kafka-console-producer --topic credit-scores --broker-list kafka:9093 -producer.config /etc/kafka/secrets/anon_producer.ssl.config'
This command will send 10 messages to the topic. Bob’s credit score seems to be improving.
docker run --rm --network opakafkatutorial_default \
openpolicyagent/demo-kafka:1.0 \
kafka-console-consumer --bootstrap-server kafka:9093 --topic credit-scores --from-beginning --consumer.config /etc/kafka/secrets/pii_consumer.ssl.config
This command will output the 10 messages sent to the topic in the first part of this step. Once the 10 messages have been printed, exit out of the script (^C).
Finally, run kafka-console-consumer
again but this time try to use the anon_consumer
credentials. The anon_consumer
credentials simulate a service that has not been explicitly granted access to PII data.
docker run --rm --network opakafkatutorial_default \
openpolicyagent/demo-kafka:1.0 \
kafka-console-consumer --bootstrap-server kafka:9093 --topic credit-scores --from-beginning --consumer.config /etc/kafka/secrets/anon_consumer.ssl.config
Because the anon_consumer
is not allowed to read PII data, the request will be denied and the consumer will output an error message.
Not authorized to read from topic credit-scores.
...
Processed a total of 0 messages
First, add the following content to the policy file (./policies/tutorial.rego
):
Next, update the topic_metadata
data structure in the same file to indicate that the click-stream
topic has a high fanout.
topic_metadata = {
"click-stream": {
"tags": ["large-fanout"],
},
"credit-scores": {
"tags": ["pii"],
}
}
First, run kafka-console-producer
and simulate a service with access to the click-stream
topic.
docker run --rm --network opakafkatutorial_default \
openpolicyagent/demo-kafka:1.0 \
bash -c 'for i in {1..10}; do echo "{\"user\": \"alice\", \"button\": $i}"; done | kafka-console-producer --topic click-stream --broker-list kafka:9093 -producer.config /etc/kafka/secrets/fanout_producer.ssl.config'
Next, run the kafka-console-consumer
to confirm that the messages were published.
docker run --rm --network opakafkatutorial_default \
openpolicyagent/demo-kafka:1.0 \
kafka-console-consumer --bootstrap-server kafka:9093 --topic click-stream --from-beginning --consumer.config /etc/kafka/secrets/anon_consumer.ssl.config
Once you see the 10 messages produced by the first part of this step, exit the console consumer (^C).
Lastly, run kafka-console-producer
to simulate a service that should not have access to high fanout topics.
docker run --rm --network opakafkatutorial_default \
openpolicyagent/demo-kafka:1.0 \
Because anon_producer
is not authorized to write to high fanout topics, the request will be denied and the producer will output an error message.
Congratulations for finishing the tutorial!
If you want to use the Kafka Authorizer plugin that integrates Kafka with OPA, see the build and install instructions in the repository.