您的位置:首页 > 编程语言

kafka_2.9.2-0.8.1.1分布式集群搭建代码开发实例

2014-11-25 17:16 344 查看
准备3台虚拟机, 系统是RHEL64服务版. 1) 每台机器配置如下:
$ cat /etc/hosts
# zookeeper hostnames: 192.168.8.182 zk1 192.168.8.183 zk2 192.168.8.184 zk3
2) 每台机器上安装jdk, zookeeper, kafka, 配置如下:
$ vi /etc/profile # jdk, zookeeper, kafka export KAFKA_HOME=/usr/local/lib/kafka/kafka_2.9.2-0.8.11 export ZK_HOME=/usr/local/lib/zookeeper/zookeeper-3.4.6 export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$KAFKA_HOME/bin:$ZK_HOME/bin:$PATH
3) 每台机器上运行:
$ source /etc/profile
$ mkdir -p /var/lib/zookeeper
$ cd $ZK_HOME/conf
$ cp zoo_sample.cfg zoo.cfg
$ vi zoo.cfg dataDir=/var/lib/zookeeper # the port at which the clients will connect clientPort=2181 # zookeeper cluster server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888
4) 每台机器上生成myid:
zk1:
$ echo "1" > /var/lib/zookeeper/myid
zk2:
$ echo "2" > /var/lib/zookeeper/myid
zk3:
$ echo "3" > /var/lib/zookeeper/myid 5) 每台机器上运行setup关闭防火墙
Firewall:
[ ] enabled 6) 每台机器上启动zookeeper:
$ zkServer.sh start
查看状态:
$ zkServer.sh status
1)下载KAFKA
$ wget http://apache.fayea.com/apache-mirror/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz 安装和配置参考上一篇文章: http://blog.csdn.net/ubuntu64fan/article/details/26678877 2)配置$KAFKA_HOME/config/server.properties
我们安装3个broker,分别在3个vm上:zk1,zk2,zk3:
zk1:
$ vi /etc/sysconfig/network
NETWORKING=yes HOSTNAME=zk1
$ vi $KAFKA_HOME/config/server.properties
broker.id=0 port=9092 host.name=zk1 advertised.host.name=zk1 ... num.partitions=2 ... zookeeper.contact=zk1:2181,zk2:2181,zk3:2181
zk2:
$ vi /etc/sysconfig/network
NETWORKING=yes HOSTNAME=zk2
$ vi $KAFKA_HOME/config/server.properties
broker.id=1 port=9092 host.name=zk2 advertised.host.name=zk2 ... num.partitions=2 ... zookeeper.contact=zk1:2181,zk2:2181,zk3:2181
zk3:
$ vi /etc/sysconfig/network
NETWORKING=yes HOSTNAME=zk3
$ vi $KAFKA_HOME/config/server.properties
broker.id=2 port=9092 host.name=zk3 advertised.host.name=zk3 ... num.partitions=2 ... zookeeper.contact=zk1:2181,zk2:2181,zk3:2181
3)启动zookeeper服务, 在zk1,zk2,zk3上分别运行:
$ zkServer.sh start 4)启动kafka服务, 在zk1,zk2,zk3上分别运行:
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties 5) 新建一个TOPIC(replication-factor=num of brokers)
$ kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181 6)假设我们在zk2上,开一个终端,发送消息至kafka(zk2模拟producer)
$ kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
在发送消息的终端输入:Hello Kafka
7)假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
$ kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning 在消费消息的终端显示:Hello Kafka
项目准备开发
项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版
本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

Xml代码



<dependencies>

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.14</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.8.2</artifactId>

<version>0.8.0</version>

<exclusions>

<exclusion>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

<version>2.8.2</version>

</dependency>

<dependency>

<groupId>com.yammer.metrics</groupId>

<artifactId>metrics-core</artifactId>

<version>2.2.0</version>

</dependency>

<dependency>

<groupId>com.101tec</groupId>

<artifactId>zkclient</artifactId>

<version>0.3</version>

</dependency>

</dependencies>

Producer端代码
1) producer.properties文件:此文件放在/resources目录下

Xml代码



#partitioner.class=

##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata

##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来

##此值,我们可以在spring中注入过来

##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093

##,127.0.0.1:9093

##同步,建议为async

producer.type=sync

compression.codec=0

serializer.class=kafka.serializer.StringEncoder

##在producer.type=async时有效

#batch.num.messages=100

2) KafkaProducerClient.java代码样例

Java代码



import java.util.ArrayList;

import java.util.Collection;

import java.util.List;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class KafkaProducerClient {

private Producer<String, String> inner;

private String brokerList;//for metadata discovery,spring setter

private String location = "kafka-producer.properties";//spring setter

private String defaultTopic;//spring setter

public void setBrokerList(String brokerList) {

this.brokerList = brokerList;

}

public void setLocation(String location) {

this.location = location;

}

public void setDefaultTopic(String defaultTopic) {

this.defaultTopic = defaultTopic;

}

public KafkaProducerClient(){}

public void init() throws Exception {

Properties properties = new Properties();

properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));

if(brokerList != null) {

properties.put("metadata.broker.list", brokerList);

}

ProducerConfig config = new ProducerConfig(properties);

inner = new Producer<String, String>(config);

}

public void send(String message){

send(defaultTopic,message);

}

public void send(Collection<String> messages){

send(defaultTopic,messages);

}

public void send(String topicName, String message) {

if (topicName == null || message == null) {

return;

}

KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);

inner.send(km);

}

public void send(String topicName, Collection<String> messages) {

if (topicName == null || messages == null) {

return;

}

if (messages.isEmpty()) {

return;

}

List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();

int i= 0;

for (String entry : messages) {

KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);

kms.add(km);

i++;

if(i % 20 == 0){

inner.send(kms);

kms.clear();

}

}

if(!kms.isEmpty()){

inner.send(kms);

}

}

public void close() {

inner.close();

}

/**

* @param args

*/

public static void main(String[] args) {

KafkaProducerClient producer = null;

try {

producer = new KafkaProducerClient();

//producer.setBrokerList("");

int i = 0;

while (true) {

producer.send("test-topic", "this is a sample" + i);

i++;

Thread.sleep(2000);

}

} catch (Exception e) {

e.printStackTrace();

} finally {

if (producer != null) {

producer.close();

}

}

}

}

Consumer端
1) consumer.properties:文件位于/resources目录下

Xml代码



## 此值可以配置,也可以通过spring注入

##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

##,127.0.0.1:2182,127.0.0.1:2183

# timeout in ms for connecting to zookeeper

zookeeper.connectiontimeout.ms=1000000

#consumer group id

group.id=test-group

#consumer timeout

#consumer.timeout.ms=5000

auto.commit.enable=true

auto.commit.interval.ms=60000

2) KafkaConsumerClient.java代码样例

Java代码



package com.test.kafka;

import java.nio.ByteBuffer;

import java.nio.CharBuffer;

import java.nio.charset.Charset;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.Message;

import kafka.message.MessageAndMetadata;

public class KafkaConsumerClient {

private String groupid; //can be setting by spring

private String zkConnect;//can be setting by spring

private String location = "kafka-consumer.properties";//配置文件位置

private String topic;

private int partitionsNum = 1;

private MessageExecutor executor; //message listener

private ExecutorService threadPool;

private ConsumerConnector connector;

private Charset charset = Charset.forName("utf8");

public void setGroupid(String groupid) {

this.groupid = groupid;

}

public void setZkConnect(String zkConnect) {

this.zkConnect = zkConnect;

}

public void setLocation(String location) {

this.location = location;

}

public void setTopic(String topic) {

this.topic = topic;

}

public void setPartitionsNum(int partitionsNum) {

this.partitionsNum = partitionsNum;

}

public void setExecutor(MessageExecutor executor) {

this.executor = executor;

}

public KafkaConsumerClient() {}

//init consumer,and start connection and listener

public void init() throws Exception {

if(executor == null){

throw new RuntimeException("KafkaConsumer,exectuor cant be null!");

}

Properties properties = new Properties();

properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));

if(groupid != null){

properties.put("groupid", groupid);

}

if(zkConnect != null){

properties.put("zookeeper.connect", zkConnect);

}

ConsumerConfig config = new ConsumerConfig(properties);

connector = Consumer.createJavaConsumerConnector(config);

Map<String, Integer> topics = new HashMap<String, Integer>();

topics.put(topic, partitionsNum);

Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);

List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);

threadPool = Executors.newFixedThreadPool(partitionsNum * 2);

//start

for (KafkaStream<byte[], byte[]> partition : partitions) {

threadPool.execute(new MessageRunner(partition));

}

}

public void close() {

try {

threadPool.shutdownNow();

} catch (Exception e) {

//

} finally {

connector.shutdown();

}

}

class MessageRunner implements Runnable {

private KafkaStream<byte[], byte[]> partition;

MessageRunner(KafkaStream<byte[], byte[]> partition) {

this.partition = partition;

}

public void run() {

ConsumerIterator<byte[], byte[]> it = partition.iterator();

while (it.hasNext()) {

// connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用

MessageAndMetadata<byte[], byte[]> item = it.next();

try{

executor.execute(new String(item.message(),charset));// UTF-8,注意异常

}catch(Exception e){

//

}

}

}

public String getContent(Message message){

ByteBuffer buffer = message.payload();

if (buffer.remaining() == 0) {

return null;

}

CharBuffer charBuffer = charset.decode(buffer);

return charBuffer.toString();

}

}

public static interface MessageExecutor {

public void execute(String message);

}

/**

* @param args

*/

public static void main(String[] args) {

KafkaConsumerClient consumer = null;

try {

MessageExecutor executor = new MessageExecutor() {

public void execute(String message) {

System.out.println(message);

}

};

consumer = new KafkaConsumerClient();

consumer.setTopic("test-topic");

consumer.setPartitionsNum(2);

consumer.setExecutor(executor);

consumer.init();

} catch (Exception e) {

e.printStackTrace();

} finally {

if(consumer != null){

consumer.close();

}

}

}

}

需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: