分布式消息队列Kafka学习笔记
2018-03-02 18:00
666 查看
Kafka概述
a distributed streaming platformKafka架构和核心概念
producer, 生产者,生产馒头。consumer, 消费者,吃馒头。broker, 篮子。topic, 主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃。Zookeeper集群部署
安装包解压,1tar -xzvf zookeeper-3.4.5.tar.gz -C /export/serverszookeeper配置文件修改,
1cp zoo_sample.cfg zoo.cfg 2vi zoo.cfg 3#数据目录. 可以是任意目录,其中的dataDir目录和dataLogDir需要提前建立好 4#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。 5dataDir=/export/servers/data/zookeeper 6#log目录, 同样可以是任意目录. 如果没有设置该参数, 将使用和dataDir相同的设置,其中的dataDir目录和dataLogDir需要提前建立好 7#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。 8dataLogDir=/export/servers/logs/zookeeper 9# 主机名:心跳端口:数据端口 10server.1=zk01:2888:3888 11server.2=zk02:2888:3888 12server.3=zk03:2888:3888myid记录到数据文件夹,
1# zk01 2mkdir -p /export/servers/data/zookeeper 3echo 1 > myid 4cat myidzookeeper分发到其他节点,
1sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk02:/export/servers/ 2# zk02 3mkdir -p /export/servers/data/zookeeper 4echo 2 > myid 5sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk03:/export/servers/ 6# zk03 7mkdir -p /export/servers/data/zookeeper 8echo 3 > myid配置环境变量,
1vi /etc/profile 2export ZK_HOME=/export/servers/zookeeper-3.4.5 3export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin 4source /etc/profile启动,
1# 启动 2zkServer.sh start 3# 查看集群状态和主从信息 4zkServer.sh status 5# 查看进程 6jps -mexport变量作用域解析,export A=1,定义的变量,会对自己所在的shell进程及子进程生效。B=1,定义的变量,只对自己所在的shell进程生效。在script.sh中定义的变量,在当前登陆的shell进程中,source script.sh时,脚本中定义的变量也会进入当前登陆的进程。要在父进程shell可见,可source一下定义export变量的脚本文件,让当前shell可见。Zookeeper集群启动和停止脚本,可先配置集群机器间的免密登录。
1#!/bin/sh 2# start-zkServer-cluster.sh 3zkServers="zk01 zk02 zk03" 4echo "start zkServer..." 5for i in $zkServers 6do 7 echo "start zkServer on ${i} ..." 8 ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh start > /dev/null 2>&1 &" 9done
1#!/bin/sh 2# stop-zkServer-cluster.sh 3zkServers="zk01 zk02 zk03" 4echo "stop zkServer..." 5for i in $zkServers 6do 7 echo "stop zkServer on ${i} ..." 8 ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh stop > /dev/null 2>&1 &" 9done
Kafka集群部署及使用
安装包解压,1tar -xzvf kafka_2.11-0.9.0.1.tar.gz -C /export/servers$KAFKA_HOME/config/server.properties修改,集群的每个节点的broker.id和host.name都需要修改。
1#broker的全局唯一编号,不能重复 2broker.id=0 3#用来监听链接的端口,producer或consumer将在此端口建立连接 4port=9092 5#kafka运行日志存放的路径 6log.dirs=/export/servers/logs/kafka 7#broker需要使用zookeeper保存meta数据 8zookeeper.connect=zk01:2181,zk02:2181,zk03:2181 9#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误! 10host.name=kafka01Kafka集群启动和停止脚本,
1#!/bin/bash 2#start-kafka-cluster.sh 3brokers="kafka01 kafka02 kafka03" 4kafka_home="/export/servers/kafka_2.11-0.9.0.1" 5for i in $brokers 6do 7 echo "Starting kafka on ${i} ... " 8 ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &" 9 if [[ $? -eq 0 ]]; then 10 echo "Start kafka on ${i} is OK !" 11 fi 12done 13echo all kafka are started ! 14exit 0
1#!/bin/bash 2#stop-kafka-cluster.sh 3brokers="kafka01 kafka02 kafka03" 4kafka_home="/export/servers/kafka_2.11-0.9.0.1" 5for i in $brokers 6do 7 echo "stop kafka on ${i} ... " 8 ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-stop.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &" 9 if [[ $? -eq 0 ]]; then 10 echo "stop kafka on ${i} is OK !" 11 fi 12done 13echo all kafka are stoped ! 14exit 0启动Kafka,
start-kafka-cluster.sh。创建topic, bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic hello_topic查看所有topic, bin/kafka-topics.sh --list --zookeeper localhost:2181
1kafka-topics.sh --list --zookeeper localhost:2181发送消息, bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
1kafka-console-producer.sh --broker-list zk01:9092 --topic hello_topic消费消息, bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
1kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic --from-beginning查看topic详细信息,
1kafka-topics.sh -describe --zookeeper zk01:2181 --topic hello_topic
整合Flume和Kafka完成实时数据的采集
在分布式日志收集框架Flume学习笔记的应用需求3中,将A服务器上的日志实时采集到B服务器,打印到控制台,通过整合Flume和Kafka,把logger sink改为kafka sink,这里的kafka sink是作为producer的角色,通过控制台起一个consumer进行消费来验证。技术选型:exec-memory-avro.conf: exec source + memory channel + avro sinkavro-memory-logger.conf: avro source + memory channel + kafka sink1# avro-memory-kafka.conf 2# Name the components on this agent 3avro-memory-kafka.sources = avro-source 4avro-memory-kafka.sinks = kafka-sink 5avro-memory-kafka.channels = memory-channel 6# Describe/configure the source 7avro-memory-kafka.sources.avro-source.type = avro 8avro-memory-kafka.sources.avro-source.bind = 192.168.169.100 9avro-memory-kafka.sources.avro-source.port = 44444 10# Describe the sink 11avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink 12avro-memory-kafka.sinks.kafka-sink.brokerList = kafka01:9092,kafka02:9092,kafka03:9092 13avro-memory-kafka.sinks.kafka-sink.topic = hello_topic 14avro-memory-kafka.sinks.kafka-sink.batchSize = 5 15avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1 16# Use a channel which buffers events in memory 17avro-memory-kafka.channels.memory-channel.type = memory 18# Bind the source and sink to the channel 19avro-memory-kafka.sources.avro-source.channels = memory-channel 20avro-memory-kafka.sinks.kafka-sink.channel = memory-channel验证,先启动avro-memory-kafka.conf,因为它监听192.168.169.100的44444端口,
1flume-ng agent \ 2--name avro-memory-kafka \ 3--conf $FLUME_HOME/conf/myconf \ 4--conf-file $FLUME_HOME/conf/myconf/avro-memory-kafka.conf \ 5-Dflume.root.logger=INFO,console
1flume-ng agent \ 2--name exec-memory-avro \ 3--conf $FLUME_HOME/conf/myconf \ 4--conf-file $FLUME_HOME/conf/myconf/exec-memory-avro.conf \ 5-Dflume.root.logger=INFO,console在控制台启动消费者验证,
1kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic
1echo hellokafka1 >> data.log 2echo hellokafka2 >> data.log本文首发于steem,感谢阅读,转载请注明。https://steemit.com/@padluo
微信公众号「数据分析」,分享数据科学家的自我修养,既然遇见,不如一起成长。
读者交流电报群https://t.me/sspadluo知识星球交流群
相关文章推荐
- Kafka是什么,JMS是什么,常见的类JMS消息服务器,为什么需要消息队列(来自学习笔记)
- 分布式消息队列Kafka & RocketMQ 深度学习资料精选
- 分布式消息队列Kafka & RocketMQ 深度学习资料精选
- uC/OS-II学习笔记 消息队列
- kafka分布式消息队列使用(springboot和springmvc)
- 学习笔记之消息队列
- 【转】快速理解Kafka分布式消息队列框架
- ucos-ii学习笔记——消息队列的原理及使用
- 消息队列-Kafka学习
- 分布式消息队列kafka原理简介
- Windows消息队列学习笔记
- 分布式消息队列kafka系列介绍 — 配置文件详解
- linux 进程学习笔记-消息队列messagequeue
- 分布式消息队列kafka系列介绍 — 配置文件详解
- 分布式消息处理中间件-Kafka学习笔…
- uC/OS-II 学习笔记之:信号量、消息邮箱、消息队列之间的使用区别
- Java消息中间件学习笔记四 -- ActiveMQ的使用,【队列模式】
- Kafka分布式消息队列(一):基础
- PetShop 4.0学习笔记:消息队列MSMQ
- 分布式消息队列kafka系列介绍 — 配置文件详解