Kafka acls are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP”. You can read more about the acl structure in and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string “User” is case sensitive.
super.users=User:Bob;User:Alice
Customizing SSL User Name
By default, the SSL user name will be of the form “CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”. One can change that by setting ssl.principal.mapping.rules
to a customized rule in server.properties. This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
The format of ssl.principal.mapping.rules
is a list where each rule starts with “RULE:” and contains an expression as the following formats. Default rule will return string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name. This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a “/L” or “/U’ to the end of the rule.
RULE:pattern/replacement/
RULE:pattern/replacement/[LU]
Example ssl.principal.mapping.rules
values are:
RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
DEFAULT
Above rules translate distinguished name “CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown” to “serviceuser” and “CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown” to “adminuser@admin”.
For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
principal.builder.class=CustomizedPrincipalBuilderClass
Customizing SASL User Name
By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting sasl.kerberos.principal.to.local.rules
to a customized rule in server.properties. The format of sasl.kerberos.principal.to.local.rules
is a list where each rule works in the same way as the auth_to_local in . This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a “/L” or “/U” to the end of the rule. check below formats for syntax. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports:
-
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
By default, all principals that don’t have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the —deny-principal and —deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
Note that
--allow-host
and only support IP addresses (hostnames are not supported). Above examples add acls to a topic by specifying —topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying —cluster and to a consumer group by specifying —group [group-name]. You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl “Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0” You can do that by using the wildcard resource ‘*‘, e.g. by executing the CLI with following options:> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *
You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl “Principal User:Jane is allowed to produce to any Topic whose name starts with ‘Test-‘ from any host”. You can do that by executing the CLI with following options:
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed
Note, —resource-pattern-type defaults to ‘literal’, which only affects resources with the exact same name or, in the case of the wildcard resource name ‘*‘, a resource with any name.
Removing Acls
Removing acls is pretty much the same. The only difference is instead of —add option users will have to specify —remove option. To remove the acls added by the first example above we can execute the CLI with following options:If you want to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options:
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed
List Acls
We can list acls for any resource by specifying the —list option with the resource. To list all acls on the literal resource pattern Test-topic, we can execute the CLI with following options:> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
However, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic, e.g. any acls on the topic wildcard ‘*‘, or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic *
> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type match
This will list acls on all matching literal, wildcard and prefixed resource patterns.
Adding or removing a principal as producer or consumer
The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of Test-topic we can execute the following command:Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass —consumer option:
Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass —remove option.
Admin API based acl management
Users having Alter permission on ClusterResource can use Admin API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly. All the above examples can be executed by using --bootstrap-server option. For example:> bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the operations and resources to set up effective protection. In this section we’ll list these operations and resources, then list the combination of these with the protocols to see the valid scenarios.
There are a few operation primitives that can be used to build up privileges. These can be matched up with certain resources to allow specific protocol calls for a given user. These are:
- Read
- Write
- Create
- Delete
- Alter
- Describe
- ClusterAction
- DescribeConfigs
- AlterConfigs
- IdempotentWrite
- All
The operations above can be applied on certain resources which are described below.
- Topic: this simply represents a Topic. All protocol calls that are acting on topics (such as reading, writing them) require the corresponding privilege to be added. If there is an authorization error with a topic resource, then a TOPIC_AUTHORIZATION_FAILED (error code: 29) will be returned.
- Group: this represents the consumer groups in the brokers. All protocol calls that are working with consumer groups, like joining a group must have privileges with the group in subject. If the privilege is not given then a GROUP_AUTHORIZATION_FAILED (error code: 30) will be returned in the protocol response.
- Cluster: this resource represents the cluster. Operations that are affecting the whole cluster, like controlled shutdown are protected by privileges on the Cluster resource. If there is an authorization problem on a cluster resource, then a CLUSTER_AUTHORIZATION_FAILED (error code: 31) will be returned.
- TransactionalId: this resource represents actions related to transactions, such as committing. If any error occurs, then a TRANSACTIONAL_ID_AUTHORIZATION_FAILED (error code: 53) will be returned by brokers.
- DelegationToken: this represents the delegation tokens in the cluster. Actions, such as describing delegation tokens could be protected by a privilege on the DelegationToken resource. Since these objects have a little special behavior in Kafka it is recommended to read KIP-48 and the related upstream documentation at .
Protocol (API key) | Operation | Resource | Note |
---|---|---|---|
PRODUCE (0) | Write | TransactionalId | An transactional producer which has its transactional.id set requires this privilege. |
PRODUCE (0) | IdempotentWrite | Cluster | An idempotent produce action requires this privilege. |
PRODUCE (0) | Write | Topic | This applies to a normal produce action. |
FETCH (1) | ClusterAction | Cluster | A follower must have ClusterAction on the Cluster resource in order to fetch partition data. |
FETCH (1) | Read | Topic | Regular Kafka consumers need READ permission on each partition they are fetching. |
LIST_OFFSETS (2) | Describe | Topic | |
METADATA (3) | Describe | Topic | |
METADATA (3) | Create | Cluster | If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it’s found then it’ll allow creating the topic, otherwise it’ll iterate through the Topic level privileges (see the next one). |
METADATA (3) | Create | Topic | This authorizes auto topic creation if enabled but the given user doesn’t have a cluster level permission (above). |
LEADER_AND_ISR (4) | ClusterAction | Cluster | |
STOP_REPLICA (5) | ClusterAction | Cluster | |
UPDATE_METADATA (6) | ClusterAction | Cluster | |
CONTROLLED_SHUTDOWN (7) | ClusterAction | Cluster | |
OFFSET_COMMIT (8) | Read | Group | An offset can only be committed if it’s authorized to the given group and the topic too (see below). Group access is checked first, then Topic access. |
OFFSET_COMMIT (8) | Read | Topic | Since offset commit is part of the consuming process, it needs privileges for the read action. |
OFFSET_FETCH (9) | Describe | Group | Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access. |
OFFSET_FETCH (9) | Describe | Topic | |
FIND_COORDINATOR (10) | Describe | Group | The FIND_COORDINATOR request can be of “Group” type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode. |
FIND_COORDINATOR (10) | Describe | TransactionalId | This applies only on transactional producers and checked when a producer tries to find the transaction coordinator. |
JOIN_GROUP (11) | Read | Group | |
HEARTBEAT (12) | Read | Group | |
LEAVE_GROUP (13) | Read | Group | |
SYNC_GROUP (14) | Read | Group | |
DESCRIBE_GROUPS (15) | Describe | Group | |
LIST_GROUPS (16) | Describe | Cluster | When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn’t return CLUSTER_AUTHORIZATION_FAILED. |
LIST_GROUPS (16) | Describe | Group | If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn’t return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release. |
SASL_HANDSHAKE (17) | The SASL handshake is part of the authentication process and therefore it’s not possible to apply any kind of authorization here. | ||
API_VERSIONS (18) | The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it’s not possible to control this with authorization. | ||
CREATE_TOPICS (19) | Create | Cluster | If there is no cluster level authorization then it won’t return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That’ll throw error if there is a problem. |
CREATE_TOPICS (19) | Create | Topic | This is applicable from the 2.0 release. |
DELETE_TOPICS (20) | Delete | Topic | |
DELETE_RECORDS (21) | Delete | Topic | |
INIT_PRODUCER_ID (22) | Write | TransactionalId | |
INIT_PRODUCER_ID (22) | IdempotentWrite | Cluster | |
OFFSET_FOR_LEADER_EPOCH (23) | ClusterAction | Cluster | If there is no cluster level privilege for this operation, then it’ll check for topic level one. |
OFFSET_FOR_LEADER_EPOCH (23) | Describe | Topic | This is applicable from the 2.1 release. |
ADD_PARTITIONS_TO_TXN (24) | Write | TransactionalId | This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below). |
ADD_PARTITIONS_TO_TXN (24) | Write | Topic | |
ADD_OFFSETS_TO_TXN (25) | Write | TransactionalId | Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below). |
ADD_OFFSETS_TO_TXN (25) | Read | Group | |
END_TXN (26) | Write | TransactionalId | |
WRITE_TXN_MARKERS (27) | ClusterAction | Cluster | |
TXN_OFFSET_COMMIT (28) | Write | TransactionalId | |
TXN_OFFSET_COMMIT (28) | Read | Group | |
TXN_OFFSET_COMMIT (28) | Read | Topic | |
DESCRIBE_ACLS (29) | Describe | Cluster | |
CREATE_ACLS (30) | Alter | Cluster | |
DELETE_ACLS (31) | Alter | Cluster | |
DESCRIBE_CONFIGS (32) | DescribeConfigs | Cluster | If broker configs are requested, then the broker will check cluster level privileges. |
DESCRIBE_CONFIGS (32) | DescribeConfigs | Topic | If topic configs are requested, then the broker will check topic level privileges. |
ALTER_CONFIGS (33) | AlterConfigs | Cluster | If broker configs are altered, then the broker will check cluster level privileges. |
ALTER_CONFIGS (33) | AlterConfigs | Topic | If topic configs are altered, then the broker will check topic level privileges. |
ALTER_REPLICA_LOG_DIRS (34) | Alter | Cluster | |
DESCRIBE_LOG_DIRS (35) | Describe | Cluster | An empty response will be returned on authorization failure. |
SASL_AUTHENTICATE (36) | SASL_AUTHENTICATE is part of the authentication process and therefore it’s not possible to apply any kind of authorization here. | ||
CREATE_PARTITIONS (37) | Alter | Topic | |
CREATE_DELEGATION_TOKEN (38) | Creating delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. | ||
RENEW_DELEGATION_TOKEN (39) | Renewing delegation tokens has special rules, for this please see the section. | ||
EXPIRE_DELEGATION_TOKEN (40) | Expiring delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. | ||
DESCRIBE_DELEGATION_TOKEN (41) | Describe | DelegationToken | Describing delegation tokens has special rules, for this please see the section. |
DELETE_GROUPS (42) | Delete | Group | |
ELECT_PREFERRED_LEADERS (43) | ClusterAction | Cluster | |
INCREMENTAL_ALTER_CONFIGS (44) | AlterConfigs | Cluster | If broker configs are altered, then the broker will check cluster level privileges. |
INCREMENTAL_ALTER_CONFIGS (44) | AlterConfigs | Topic | If topic configs are altered, then the broker will check topic level privileges. |
ALTER_PARTITION_REASSIGNMENTS (45) | Alter | Cluster | |
LIST_PARTITION_REASSIGNMENTS (46) | Describe | Cluster | |
OFFSET_DELETE (47) | Delete | Group | |
OFFSET_DELETE (47) | Read | Topic |