SpringCloud Stream + Kafka安全认证机制(SASL/PLAINTEXT)
转载:https://www.cnblogs.com/ilovena/p/10123516.html
我修正了一些作者笔误的地方,直接复制即可配置成功。
kafka版本:kafka_2.12-0.11.0.3
zookeeper版本:zookeeper-3.4.6
Zookeeper配置和启动
1. 为zookeeper添加SASL支持,在配置文件zoo.cfg添加
[code]authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
2. 新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息,这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/conf路径下同zoo.cfg。zk_server_jaas.conf文件的内容如下
[code]Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="cluster" password="clusterpasswd" user_kafka="kafkapasswd"; };
username和paasword是zk集群之间的认证密码。
user_kafka="kafkapasswd"定义了一个用户"kafka",密码是"kafkapasswd",本次测试用户是kafka broker。
3. 导入kafka的相关jar
由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
在ZK_HOME/下新建sasl_lib目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样
[code]kafka-clients-0.11.0.3.jar lz4-1.3.0.jar slf4j-api-1.7.25.jar slf4j-log4j12-1.7.25.jar snappy-java-1.1.2.6.jar
4. 修改zkEnv.sh
在zkEnv.sh添加
[code]LIBPATH=("${ZOOBINDIR}"/../lib/*.jar) #这个位置添加下一行 LIBSASLPATH=("${ZOOBINDIR}"/../sasl_lib/*.jar) #make it work for developers 这条注释上面添加下面的 for i in "${LIBSASLPATH[@]}" do CLASSPATH="$i:$CLASSPATH" done SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
在zk启动的时候导入sasl_lib下的jar包,SERVER_JVMFLAGS配置jvm参数,导入zk的sasl认证信息
5. 启动zk服务端
执行./zkServer.sh start启动zk。如果启动异常查看日志排查问题
kafka配置和启动
1. 新建KAFKA_HOME/config/kafka_server_jaas.conf,为kafka添加认证信息
[code]KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="cluster" password="cluster" user_cluster="cluster" user_kafka="kafkapasswd" ; }; Client{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapasswd"; };
KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。
2. 在kafka的配置文件开启SASL认证
在server.properties添加如下信息
[code]# IP替换为自己的 listeners=SASL_PLAINTEXT://IP:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN allow.everyone.if.no.acl.found=true
3.在server启动脚本JVM参数
我是直接在
[code]export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
添加了认证信息,修改后为
[code]# 路径自己修改 export KAFKA_HEAP_OPTS=" -Xmx1G -Xms1G -Djava.security.auth.login.config=/usr/local/kafka_2.11-0.11.0.3/config/kafka_server_jaas.conf "
4.启动kafka服务端
[code]./kafka-server-start.sh ../config/server.properties
kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功
[code]Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)
kafka的SASL认证功能认证和使用
我们使用kafka自带的脚本进行认证。
1. 新建KAFKA_HOME/config/kafka_client_jaas.conf为客户端添加认证信息
[code]KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapasswd"; };
2. 修改客户端配置信息
修改producer.properties和consumer.properties,添加认证机制
[code]security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
3.修改客户端启动脚本
修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将
export KAFKA_HEAP_OPTS="-Xmx512M"修改为
[code]# 路径需要修改 export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.11-0.11.0.3/config/kafka_client_jaas.conf"
kafka-console-consumer.sh的修改类似。
4.
客户端启动并认证
启动consumer
[code]./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties
启动producer
[code]./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config config/producer.properties
producer端发送消息,consumer端成功接收到消息。
SpringCloud Stream收发消息
application.properties文件中添加,地址做修改即可。
[code]spring.cloud.stream.kafka.binder.brokers=192.168.186.129:9092 spring.cloud.stream.kafka.binder.zk-nodes=192.168.186.129:2181 spring.cloud.stream.kafka.binder.auto-create-topics=true spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN spring.cloud.stream.kafka.binder.configuration.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.cloud.stream.kafka.binder.configuration.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.cloud.stream.bindings.output.destination=events spring.cloud.stream.bindings.output.content-type=text/plain spring.cloud.stream.bindings.input.destination=events
发送代码
[code]import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @EnableBinding(Source.class) public class SendService { @Autowired private Source source; public void sendMsg(String message) { Message<String> build = MessageBuilder .withPayload(message) .setHeader("head",1) .build(); source.output().send(build); } }
接收代码
[code]@RestController @EnableBinding(Sink.class) public class KafkaController { private SendService sendService; @Autowired public KafkaController(SendService sendService) { this.sendService = sendService; } @RequestMapping(path = "/send", method = RequestMethod.POST) public void send(@RequestBody Map<String, String> params) { String message = params.getOrDefault("message",""); sendService.sendMsg(message); } @StreamListener(value = Sink.INPUT) public void receiver(Object message) { System.out.println(message); } }
启动服务时加上:
[code]-Djava.security.auth.login.config=路径\kafka_client_jaas.conf
完。
- Spring Cloud Stream + Kafka
- Kafka SASL/PLAINTEXT 简单验证测试
- spring cloud-给Eureka Server加上安全的用户认证
- kafka集群使用 SASL/PLAIN 认证
- spring cloud stream kafka 动态写入不同的topic(Using dynamically bound destinations)
- spring security & oauth2 安全认证机制
- Kafka 0.10.0 SASL/PLAIN身份认证及权限实现
- spring cloud stream kafka实例
- Spring Cloud Stream Binder Kafka Monitor
- Spring Cloud stream 使用Kafka踩坑记录
- SpringCloud学习之SpringCloudStream&集成kafka
- 5-Kafka 0.10.0 SASL/PLAIN身份认证及权限实现
- SpringCloud系列四:Eureka 服务发现框架(定义 Eureka 服务端、Eureka 服务信息、Eureka 发现管理、Eureka 安全配置、Eureka-HA(高可用) 机制、Eureka 服务打包部署)
- SpringCloudStream 构建消息驱动的微服务框架 集成kafka_http://blog.spring-cloud.io/blog/sc-stream.html
- spring cloud stream配置多个kafka binders
- 【Spring Cloud】Hystrix 防御机制
- Spring Cloud (3) | spring cloud bus 消息总线kafka应用
- 00154 web安全认证机制知多少
- spring cloud使用hystrix实现断路保护机制
- Kafka JAAS 安全认证流程