您的位置:首页 > 运维架构 > 网站架构

zookeeper+kafka 集群和高可用

2016-10-20 17:30 281 查看
1、本机环境

操作系统:ubuntu 12.04
需安装:
java的环境,安装过程可参考: http://blog.csdn.net/u014388408/article/details/50587438


2、 Zookeeper集群搭建

(1)下载zookeeper安装

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz


然后解压:

tar -xvf  zookeeper-3.3.6.tar.gz -C /opt/amqbroker(需要解压的路径)


修改配置文件:

zoo_sample.cfg 修改文件名为 zoo.cfg


修改zoo.cfg 配置文件内容为

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/amqbroker/zookeeper/data
dataLogDir=/opt/amqbroker/zookeeper/log
clientPort=2181
server.one=192.168.0.100:2888:3888
server.two=192.168.0.101:2888:3888
server.three=192.168.0.102:2888:3888


然后在/opt/amqbroker/zookeeper/data目录下创建myid文件,在文件中写入当前机器的id,例如配置中server.one=192.168.0.100:2888:3888,在myid文件写入字符 “one” 保存退出。

另外2台机器和这台机器的配置一样,myid 写各自服务器的id名。

3、Kafka 安装

下载地址: wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz 
解压: tar -xvf kafka_2.11-0.9.0.1.tgz


进入kafka_2.11-0.9.0.1.tgz/config/目录下

vi server.properties 修改一下几处:


broker.id=11               //注意,id名,各个服务器配置一个不相同的名字。
hostname.name=192.168.0.100
port=9092

advertised.host.name=192.168.0.100
advertised.port=9092

以及集群的配置
zookeeper.connect=192.168.0.100:2181,192.168.0.101:2181,192.168.0.102:2181


然后在启动kafka,两外2台服务器也采用以上配置,分别启动kafka。

bin/kafka-server-start.sh config/server.properties


接下来创建两个分区,两个副本的Topic。

bin/kafka-topics.sh --create --zookeeper 192.168.0.100:2181,192.168.0.101:2181,192.168.0.102:2181 --replication-factor 2 --partitions 2 --topic kafkatest


查看Topic(test)的状态

bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test


输出结果:

Topic:test  PartitionCount:2    ReplicationFactor:2 Configs:
Topic: test Partition: 0    Leader: 158 Replicas: 0,158 Isr: 158
Topic: test Partition: 1    Leader: 111 Replicas: 111,0 Isr: 111


Topoc(test)有两个分区 0,1

分区0:处于leader服务器的是broker的id为158
Replicas(副本)为0,158两台服务器。
Isr(in-sync replicas): 副本列表,158


4、用JAVA程序来测试消息的生产和消费。

(1)util工具类

package com.kafka;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaUtil {
private static KafkaProducer<String, String> kp;
private static KafkaConsumer<String, String> kc;

public static KafkaProducer<String, String> getProducer() {
if (kp == null) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.100:9092,192.168.0.101:9092,192.168.0.102:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kp = new KafkaProducer<String, String>(props);
}
return kp;
}

public static KafkaConsumer<String, String> getConsumer() {
if(kc == null) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.100:9092,192.168.0.101:9092,192.168.0.102:9092");
props.put("group.id", "0");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kc = new KafkaConsumer<String, String>(props);
}
return kc;
}
}


(2)Producer类

package com.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaTest {
public static void main(String[] args) throws Exception{
Producer<String, String> producer = KafkaUtil.getProducer();
int i = 0;
while(true) {
ProducerRecord<String, String> record = new    ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
e.printStackTrace();
System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
}
});
i++;
Thread.sleep(1000);
}
}
}


(3)Consumer 类

package com.kafka;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaTest1 {
public static void main(String[] args) throws Exception{
KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
consumer.subscribe(Arrays.asList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {
System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
}
}
}
}


运行 producer端 类,结果如下:

message send to partition 0, offset: 306
message send to partition 0, offset: 307
message send to partition 0, offset: 308
message send to partition 0, offset: 309
message send to partition 0, offset: 310
message send to partition 0, offset: 311
message send to partition 0, offset: 312
message send to partition 0, offset: 313


然后运行 Consumer端 类,能看到打印的消息即可。

5、 Kafka集群高可用性测试

(1)查看当前副本及状态

bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test


输出结果

    Topic:test  PartitionCount:2    ReplicationFactor:2 Configs:
Topic: test Partition: 0    Leader: 158 Replicas: 0,158 Isr: 158
Topic: test Partition: 1    Leader: 111 Replicas: 111,0 Isr: 111


然后停掉一个broker服务器,如158,在查看当前副本状态,Leader状态会变成其他borker服务器,会通过一种选举策略生成一个新的Leader。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息