Kafka 手札(6) kafka 授权机制

KAFKA 授权机制
A 常见4种权限模型
ACL : Access-Control List 访问控制列表
RBAC:Role-Based Access Control 基于角色权限控制
ABAC:Attribute-Based Access Control 基于属性的权限控制
PBAC:Policy-Based Access Control 基于策略的权限控制
ACL模型很简单,它表征的是用户与权限的直接映射关系,规定了什么用户对什么资源有什么样的访问权限。
用户------------> 权限
RBAC 模型则加入了角色的概念,支持对用户进行分组
用户-------------> 角色-------------->权限
Kafka 没有使用RBAC 模型,他用的是ACL模型。
B 如何开启ACL
在broker端的配置文件增加一行设置即可,也就是在server.properties 文件中配置下面的这个参数值:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer//指定了ACL授权机制的实现类
C 超级用户
超级用户能够访问所有的资源,即使你没有为它们设置任何 ACL 项。
也就是在server.properties 文件中配置以下参数:
super.users=User:superuser1;User:superuser2 //多个用户之间用分号隔开
tips: allow.everyone.if.no.acl.found=true 这个参数为真的话 所有用户都可以方位没有设置任何ACL的资源。
D kafka-acls 脚本
举梨:
为用户Alice 增加了集群级别的所有权限:
$ kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Alice --operation All --topic '*' --cluster
All 代表所有操作,topic中星号则表示所有主题,指定--cluster则说明我们要为Alice 设置的是集群权限。
$ bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadUser --deny-host 10.205.96.119 --operation Read --topic test-topic
User 后面的星号表示所有用户,allow-host 后面的星号则表示所有 IP 地址。这个命令的意思是,允许所有的用户使用任意的 IP 地址读取名为 test-topic 的主题数据,同时也禁止 BadUser 用户和 10.205.96.119 的 IP 地址访问 test-topic 下的消息。
E :授权单独使用
只能对IP地址设置权限。
禁止在某IP(127.0.0.1)上Producer 向test 主题发送数据:
$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --deny-principal User:* --deny-host 127.0.0.1 --operation Write --topic test
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
[2019-07-16 10:10:57,283] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-07-16 10:10:57,284] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
[2019-07-16 10:10:57,284] ERROR Error when sending message to topic test with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: <span class="orange">Not authorized to access topics: [test]
</span class="orange">
配置实例

#!/bin/bash

#设置环境变量
BASE_DIR=/Users/jack/testenv #你需要修改此处
CERT_OUTPUT_PATH="$BASE_DIR/certificates"
PASSWORD=test1234
KEY_STORE="$CERT_OUTPUT_PATH/server.keystore.jks"
TRUST_STORE="$CERT_OUTPUT_PATH/server.truststore.jks"
CLIENT_KEY_STORE="$CERT_OUTPUT_PATH/client.keystore.jks"
CLIENT_TRUST_STORE="$CERT_OUTPUT_PATH/client.truststore.jks"
KEY_PASSWORD=$PASSWORD
STORE_PASSWORD=$PASSWORD
TRUST_KEY_PASSWORD=$PASSWORD
TRUST_STORE_PASSWORD=$PASSWORD
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert"
DAYS_VALID=365
DNAME="CN=JACK, OU=YourDept, O=YourCompany, L=Beijing, ST=Beijing, C=CN"


mkdir -p $CERT_OUTPUT_PATH

echo "1. 产生key和证书......"
keytool -keystore $KEY_STORE -alias kafka-server -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$DNAME"

keytool -keystore $CLIENT_KEY_STORE -alias kafka-client -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$DNAME"

echo "2. 创建CA......"
openssl req -new -x509 -keyout $CERT_OUTPUT_PATH/ca-key -out "$CERT_AUTH_FILE" -days "$DAYS_VALID" \
-passin pass:"$PASSWORD" -passout pass:"$PASSWORD" \
-subj "/C=CN/ST=Beijing/L=Beijing/O=YourCompany/OU=YourDept,CN=JACK"

echo "3. 添加CA文件到broker truststore......"
keytool -keystore "$TRUST_STORE" -alias CARoot \
-importcert -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" -keypass "$TRUST_KEY_PASS" -noprompt

echo "4. 添加CA文件到client truststore......"
keytool -keystore "$CLIENT_TRUST_STORE" -alias CARoot \
-importcert -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" -keypass "$TRUST_KEY_PASS" -noprompt

echo "5. 从keystore中导出集群证书......"
keytool -keystore "$KEY_STORE" -alias kafka-server -certreq -file "$CERT_OUTPUT_PATH/server-cert-file" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias kafka-client -certreq -file "$CERT_OUTPUT_PATH/client-cert-file" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

echo "6. 使用CA签发证书......"
openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CERT_OUTPUT_PATH/server-cert-file" \
-out "$CERT_OUTPUT_PATH/server-cert-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"

openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CERT_OUTPUT_PATH/client-cert-file" \
-out "$CERT_OUTPUT_PATH/client-cert-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"

echo "7. 导入CA文件到keystore......"
keytool -keystore "$KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" \
 -keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" \
 -keypass "$KEY_PASSWORD" -noprompt

echo "8. 导入已签发证书到keystore......"
keytool -keystore "$KEY_STORE" -alias kafka-server -import -file "$CERT_OUTPUT_PATH/server-cert-signed" \
 -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias kafka-client -import -file "$CERT_OUTPUT_PATH/client-cert-signed" \
 -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

echo "9. 删除临时文件......"
rm "$CERT_OUTPUT_PATH/ca-cert.srl"
rm "$CERT_OUTPUT_PATH/server-cert-signed"
rm "$CERT_OUTPUT_PATH/client-cert-signed"
rm "$CERT_OUTPUT_PATH/server-cert-file"
rm "$CERT_OUTPUT_PATH/client-cert-file"

设置SSL 脚本
该脚本主要的产出是 4 个文件,分别是:
server.keystore.jks、server.truststore.jks、
client.keystore.jks 和 client.truststore.jks。
把server开头的 拷贝到集群中所有Broker机器上,把以client开头的两个文件,拷贝到所有要连接Kafka集群的客户端应用程序上
配置每个Broker上的server.properties 文件
listeners=SSL://localhost:9093
ssl.truststore.location=/Users/huxi/testenv/certificates/server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/Users/huxi/testenv/certificates/server.keystore.jks
ssl.keystore.password=test1234
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.key.password=test1234
配置客户端的 SSL。
创建一个名为 client-ssl.config 的文件:
security.protocol=SSL
ssl.truststore.location=/Users/huxi/testenv/certificates/client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/Users/huxi/testenv/certificates/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.endpoint.identification.algorithm=
KAFKA 跨集群数据镜像工具
MirrorMaker
A 备份和镜像:
通常我们把数据在单个集群下不同节点之间的拷贝称为备份,而把数据在集群间的拷贝称为镜像(Mirroring)
B MirrorMaker:
MirrorMaker 就是一个消费者 + 生产者的程序。消费者负责从源集群(Source Cluster)消费数据,生产者负责向目标集群(Target Cluster)发送消息。
C:运行Mirror Maker
常见用法是指定生产者配置文件、消费者配置文件、线程数以及要执行数据镜像的主题正则表达式。
$ bin/kafka-mirror-maker.sh --consumer.config ./config/consumer.properties --producer.config ./config/producer.properties --num.streams 8 --whitelist ".*"
参数释义:
consumer.config :它指定了 MirrorMaker 中消费者的配置文件地址,最主要的配置项是 bootstrap.servers,也就是该 MirrorMaker 从哪个 Kafka 集群读取消息。
producer.config :它指定了 MirrorMaker 内部生产者组件的配置文件地址
num.streams: MirrorMaker 要创建多少个 KafkaConsumer 实例.当然它使用的多线程方案,即在后台创建并启动多个线程,每个线程维护专属的消费者实例。在实际使用时,你可以根据你的机器性能酌情设置多个线程。
whitelist 参数:这个参数接收一个正则表达式。所有匹配该正则表达式的主题都会被自动地执行镜像。在这个命令中,我指定了“.*”,这表明我要同步源集群上的所有主题。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注