您的位置:首页 > 其它

分布式消息队列Kafka学习笔记

2018-03-02 18:00 666 查看

Kafka概述

a distributed streaming platform

Kafka架构和核心概念

producer, 生产者,生产馒头。consumer, 消费者,吃馒头。broker, 篮子。topic, 主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃。

Zookeeper集群部署

安装包解压,
1tar -xzvf zookeeper-3.4.5.tar.gz -C /export/servers
zookeeper配置文件修改,
1cp zoo_sample.cfg zoo.cfg
2vi zoo.cfg
3#数据目录. 可以是任意目录,其中的dataDir目录和dataLogDir需要提前建立好
4#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。
5dataDir=/export/servers/data/zookeeper
6#log目录, 同样可以是任意目录. 如果没有设置该参数, 将使用和dataDir相同的设置,其中的dataDir目录和dataLogDir需要提前建立好
7#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。
8dataLogDir=/export/servers/logs/zookeeper
9# 主机名:心跳端口:数据端口
10server.1=zk01:2888:3888
11server.2=zk02:2888:3888
12server.3=zk03:2888:3888
myid记录到数据文件夹,
1# zk01
2mkdir -p /export/servers/data/zookeeper
3echo 1 > myid
4cat myid
zookeeper分发到其他节点,
1sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk02:/export/servers/
2# zk02
3mkdir -p /export/servers/data/zookeeper
4echo 2 > myid
5sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk03:/export/servers/
6# zk03
7mkdir -p /export/servers/data/zookeeper
8echo 3 > myid
配置环境变量,
1vi /etc/profile
2export ZK_HOME=/export/servers/zookeeper-3.4.5
3export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin
4source /etc/profile
启动,
1# 启动
2zkServer.sh start
3# 查看集群状态和主从信息
4zkServer.sh status
5# 查看进程
6jps -m
export变量作用域解析,export A=1,定义的变量,会对自己所在的shell进程及子进程生效。B=1,定义的变量,只对自己所在的shell进程生效。在script.sh中定义的变量,在当前登陆的shell进程中,source script.sh时,脚本中定义的变量也会进入当前登陆的进程。要在父进程shell可见,可source一下定义export变量的脚本文件,让当前shell可见。Zookeeper集群启动和停止脚本,可先配置集群机器间的免密登录。
1#!/bin/sh
2# start-zkServer-cluster.sh
3zkServers="zk01 zk02 zk03"
4echo "start zkServer..."
5for i in $zkServers
6do
7    echo "start zkServer on ${i} ..."
8    ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh start > /dev/null 2>&1 &"
9done
1#!/bin/sh
2# stop-zkServer-cluster.sh
3zkServers="zk01 zk02 zk03"
4echo "stop zkServer..."
5for i in $zkServers
6do
7    echo "stop zkServer on ${i} ..."
8    ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh stop > /dev/null 2>&1 &"
9done

Kafka集群部署及使用

安装包解压,
1tar -xzvf kafka_2.11-0.9.0.1.tar.gz -C /export/servers
$KAFKA_HOME/config/server.properties修改,集群的每个节点的broker.id和host.name都需要修改。
1#broker的全局唯一编号,不能重复
2broker.id=0
3#用来监听链接的端口,producer或consumer将在此端口建立连接
4port=9092
5#kafka运行日志存放的路径
6log.dirs=/export/servers/logs/kafka
7#broker需要使用zookeeper保存meta数据
8zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
9#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
10host.name=kafka01
Kafka集群启动和停止脚本,
1#!/bin/bash
2#start-kafka-cluster.sh
3brokers="kafka01 kafka02 kafka03"
4kafka_home="/export/servers/kafka_2.11-0.9.0.1"
5for i in $brokers
6do
7    echo "Starting kafka on ${i} ... "
8    ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
9    if [[ $? -eq 0 ]];  then
10        echo "Start kafka on ${i} is OK !"
11    fi
12done
13echo all kafka are started !
14exit 0
1#!/bin/bash
2#stop-kafka-cluster.sh
3brokers="kafka01 kafka02 kafka03"
4kafka_home="/export/servers/kafka_2.11-0.9.0.1"
5for i in $brokers
6do
7    echo "stop kafka on ${i} ... "
8    ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-stop.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
9    if [[ $? -eq 0 ]];  then
10        echo "stop kafka on ${i} is OK !"
11    fi
12done
13echo all kafka are stoped !
14exit 0
启动Kafka,
start-kafka-cluster.sh
。创建topic, bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic hello_topic
查看所有topic, bin/kafka-topics.sh --list --zookeeper localhost:2181
1kafka-topics.sh --list --zookeeper localhost:2181
发送消息, bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
1kafka-console-producer.sh --broker-list zk01:9092 --topic hello_topic
消费消息, bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
1kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic --from-beginning
查看topic详细信息,
1kafka-topics.sh -describe --zookeeper zk01:2181 --topic hello_topic

整合Flume和Kafka完成实时数据的采集

分布式日志收集框架Flume学习笔记的应用需求3中,将A服务器上的日志实时采集到B服务器,打印到控制台,通过整合Flume和Kafka,把logger sink改为kafka sink,这里的kafka sink是作为producer的角色,通过控制台起一个consumer进行消费来验证。技术选型:exec-memory-avro.conf: exec source + memory channel + avro sinkavro-memory-logger.conf: avro source + memory channel + kafka sink

1# avro-memory-kafka.conf
2# Name the components on this agent
3avro-memory-kafka.sources = avro-source
4avro-memory-kafka.sinks = kafka-sink
5avro-memory-kafka.channels = memory-channel
6# Describe/configure the source
7avro-memory-kafka.sources.avro-source.type = avro
8avro-memory-kafka.sources.avro-source.bind = 192.168.169.100
9avro-memory-kafka.sources.avro-source.port = 44444
10# Describe the sink
11avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
12avro-memory-kafka.sinks.kafka-sink.brokerList = kafka01:9092,kafka02:9092,kafka03:9092
13avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
14avro-memory-kafka.sinks.kafka-sink.batchSize = 5
15avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
16# Use a channel which buffers events in memory
17avro-memory-kafka.channels.memory-channel.type = memory
18# Bind the source and sink to the channel
19avro-memory-kafka.sources.avro-source.channels = memory-channel
20avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
验证,先启动avro-memory-kafka.conf,因为它监听192.168.169.100的44444端口,
1flume-ng agent \
2--name avro-memory-kafka \
3--conf $FLUME_HOME/conf/myconf \
4--conf-file $FLUME_HOME/conf/myconf/avro-memory-kafka.conf \
5-Dflume.root.logger=INFO,console
1flume-ng agent \
2--name exec-memory-avro \
3--conf $FLUME_HOME/conf/myconf \
4--conf-file $FLUME_HOME/conf/myconf/exec-memory-avro.conf \
5-Dflume.root.logger=INFO,console
在控制台启动消费者验证,
1kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic
1echo hellokafka1 >> data.log
2echo hellokafka2 >> data.log
本文首发于steem,感谢阅读,转载请注明。https://steemit.com/@padluo
微信公众号「数据分析」,分享数据科学家的自我修养,既然遇见,不如一起成长。

读者交流电报群https://t.me/sspadluo知识星球交流群
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: