您的位置:首页 > 其它

zookeeper+Kafka集群部署

2017-03-24 15:50 519 查看
一 准备工作

准备3台机器,IP地址分别为:192.168.46.130(131,132)

下载 jdk-8u121-linux-x64.tar.gz   zookeeper-3.4.6.tar.gz   kafka_2.11-0.10.1.0.tgz

1.修改主机名 /etc/hosts 及/etc/sysconfig/network

hosts:

192.168.46.130 kafka1

192.168.46.131 kafka2

192.168.46.132 kafka3

network

分别将network中的主机名修改为 kafka1 kafka2 kafka3

2.jdk安装配置省略

3.zookeeper部署

1) 解压zookeeper-3.4.6.tar.gz

2) tar -zxvf zookeeper-3.4.6.tar.gz

3) 建立myid  在/usr/local/目录下建立 zkData目录,即/usr/local/zkData,

4) 并执行  echo 1 >myid (kafka1服务器上)  echo
2 >myid (kafka2服务器上)  echo 3 >myid (kafka3服务器上) 

5)
三台服务器的zoo.cfg配置文件为

zoo.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/usr/local/zkData

clientPort=2181

server.1=192.168.46.130:2888:3888

server.2=192.168.46.131:2888:3888

server.3=192.168.46.132:2888:3888

4.kafka部署(解压kafka_2.11-0.10.1.0.tgz文件)

1).进入config目录,修改server.properties(逐个机器修改)

broker.id=1 (其他服务器2/3)
zookeeper.connect=192.168.46.130:2181,192.168.46.131:2181,192.168.46.132:2181

zookeeper.connection.timeout.ms=6000

port
= 9092

host.name = 192.168.46.131

5.启动每台服务器上的zookeeper

1) 启动前关闭各台服务器的防火墙:service iptables stop 

2) bin/zkServer.sh start 启动

3) bin/zkServer.sh status 查看状态

6 启动每台服务器的kafka:

        > bin/kafka-server-start.sh config/server.properties &

7.集群测试

    1.创建一个topic

        > bin/kafka-topics.sh --create --zookeeper 192.168.46.130:2181 --replication-factor 3 --partitions 1

            --topic test-topic

 

    2.查看创建的topic

        > bin/kafka-topics.sh --describe --zookeeper 192.168.46.130:2181 --topic test-topic

            Topic:test-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:

            Topic: test-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

 

     3.查看topic列表

          > bin/kafka-topics.sh --list --zookeeper 192.168.46.130 :2181 

    test-topic

    查看列表及具体信息

          > bin/kafka-topics.sh --zookeeper localhost --describe

 

    4.查看集群情况:

        >bin/kafka-topics.sh --describe --zookeeper 192.168.46.131:2181 --topic test-topic

        >bin/kafka-topics.sh --describe --zookeeper 192.168.46.132:2181 --topic test-topic

        发现都能看到test-topic。

 

     5.生产消息(定义不同的生产者)

          > bin/kafka-console-producer.sh --broker-list 192.168.46.130:9092 -topic test-topic

         接着输入生产的内容:如 haha

     6.消费消息(在不同的服务器上执行下面的命令后都可接受到生产的消息)

          > bin/kafka-console-consumer.sh --zookeeper 192.168.46.131:2181 --from-beginning --topic test-topic

          > bin/kafka-console-consumer.sh --zookeeper 192.168.46.132:2181 --from-beginning --topic test-topic

         消费端的内容:haha

       每个节点既可以作为生产者也可以作为消费者,一个节点要么为生产者要么为消费者

 8 客户端调用(参考:http://blog.csdn.net/u011622226/article/details/53520382)

maven 引入 

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>


package com.fuliwd.kafka;

import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducerService {
private static Logger LOG = LoggerFactory
.getLogger(KafkaProducerService.class);

public static void main(String[] args) {
Properties props = new Properties();
//zookeeper地址
props.put("bootstrap.servers", "192.168.46.130:9092,192.168.46.131:9092,192.168.46.132:9092");
props.put("retries", 3);
props.put("linger.ms", 1);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
props);
//test-topic为队列名称
for (int i = 0; i < 1; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test-topic", "11", "客户端调用 yoyo=======>" + i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
// TODO Auto-generated method stub
if (e != null)
System.out.println("the producer has a error:"
+ e.getMessage());
else {
System.out
.println("The offset of the record we just sent is: "
+ metadata.offset());
System.out
.println("The partition of the record we just sent is: "
+ metadata.partition());
}
}
});
try {
Thread.sleep(1000);
// producer.close();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}


package com.fuliwd.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Joker
* 自己控制偏移量提交
* 很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。
*/
public class ManualOffsetConsumer {
private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);

public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//设置brokerServer(kafka)ip地址 zookeeper地址
props.put("bootstrap.servers", "192.168.46.130:9092,192.168.46.131:9092,192.168.46.132:9092");
//设置consumer group name
props.put("group.id","mygroup11");
props.put("enable.auto.commit", "false");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//设置心跳时间
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("test-topic"));//队列名称
final int minBatchSize = 5;  //批量提交数量
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("consumer message values is "+record.value()+" and the offset is "+ record.offset());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
System.out.println("now commit offset"+buffer.size());
consumer.commitSync();
buffer.clear();
}
}
}
}


下图为服务器端进行消息生产和客户端调用生产的执行情况(这里只讲kafka1设置为生产者,kafka2为消费者,也可以将kafka3设置生产或消费者)





                  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper kafka