您的位置:首页 > 其它

Kafka简单测试demo

2015-09-30 15:13 399 查看

介绍

项目很简单,就是对搭建好的kafka集群【kafka_2.10-0.8.2.1】做了简单的demo测试,调用了下high level接口。

源码

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>cn.com.dimensoft.kafka</groupId>
<artifactId>kafka-study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>kafka-study</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>

</dependencies>
</project>


SampleProducer

SampleProducer是自定义的producer,用来生产消息并将消息push给broker:

/**
* project:kafka-study
* file:SampleProducer.java
* author:zxh
* time:2015年9月25日 下午4:05:51
* description:
*/
package cn.com.dimensoft.kafka;

import java.util.Properties;

import cn.com.dimensoft.kafka.constant.Constant;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
* class: SampleProducer
* package: cn.com.dimensoft.kafka
* author:zxh
* time: 2015年9月25日 下午4:05:51
* description:
*  step1 : 创建存放配置信息的properties
*  step2 : 将properties封装到ProducerConfig中
*  step3 : 创建producer对象
*  step4 : 发送数据流
*/
public class SampleProducer {

public static void main(String[] args) throws InterruptedException {

// step1 : 创建存放配置信息的properties
Properties props = new Properties();

// 指定broker集群
props.put("metadata.broker.list", "hadoop-pseudo.com.cn:9092");
/**
* ack机制
* 0 which means that the producer never waits for an acknowledgement from the broker
* 1 which means that the producer gets an acknowledgement after the leader replica has received the data
* -1 The producer gets an acknowledgement after all in-sync replicas have received the data
*/
props.put("request.required.acks", "1");
// 消息发送类型 同步/异步
props.put("producer.type", "sync");
// 指定message序列化类,默认kafka.serializer.DefaultEncoder
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 设置自定义的partition,当topic有多个partition时如何对message进行分区
props.put("partitioner.class", "cn.com.dimensoft.kafka.SamplePartition");

// step2 : 将properties封装到ProducerConfig中
ProducerConfig config = new ProducerConfig(props);

// step3 : 创建producer对象
Producer<String, String> producer = new Producer<String, String>(config);

for (int i = 1; i <= 50; i++) {
// step4 : 发送数据流
producer.send(new KeyedMessage<String, String>(Constant.TOPIC, //
i + "", //
String.valueOf("我是 " + i + " 号")));
Thread.sleep(10);
}
}

}


SampleConsumer

SampleConsumer是自定义的consumer,用来从broker pull消息对处理:

/**
* project:kafka-study
* file:SampleConsumer.java
* author:zxh
* time:2015年9月25日 下午4:41:26
* description:
*/
package cn.com.dimensoft.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import cn.com.dimensoft.kafka.constant.Constant;

/**
* class: SampleConsumer
* package: cn.com.dimensoft.kafka
* author:zxh
* time: 2015年9月25日 下午4:41:26
* description:
*  step1 : 创建存放配置信息的properties
*  step2 : 将properties封装到ConsumerConfig中
*  step3 : 调用Consumer的静态方法创建ConsumerConnector
*  step4 : 根据创建好的ConsumerConnector对象创建MessageStreams集合
*  step5 : 根据具体的topic名称得到数据流KafkaStream
*  step6 : 调用KafkaStream的iterator拿到ConsumerIterator对应,然后就可以迭代获得producer发送过来的消息了
*/
public class SampleConsumer {

public static void main(String[] args) throws InterruptedException {

// step1 : 创建存放配置信息的properties
Properties props = new Properties();

props.put("zookeeper.connect", "hadoop-pseudo.com.cn:2181");
props.put("group.id", "1");
// 下面这2个参数需要设置,否则consumer每次启动都会从头开始读取数据
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "1000");

// What to do when there is no initial offset in ZooKeeper or if an
// offset is out of range
props.put("auto.offset.reset", "smallest");

// step2 : 将properties封装到ConsumerConfig中
ConsumerConfig config = new ConsumerConfig(props);

// step3 : 调用Consumer的静态方法创建ConsumerConnector
ConsumerConnector connector = Consumer
.createJavaConsumerConnector(config);

// step4 : 根据创建好的ConsumerConnector对象创建MessageStreams集合
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
// 将每个topic对应的线程数添加到map中,topicCountMap中的topic对应的value值一直没测出实际效果
topicCountMap.put(Constant.TOPIC, 1);
// 根据填充好的map获得streams集合
// a map of (topic, list of KafkaStream) pairs
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector
.createMessageStreams(topicCountMap);

// step5 : 根据具体的topic名称得到数据流KafkaStream
KafkaStream<byte[], byte[]> stream = streams.get(Constant.TOPIC).get(0);

// step6 : 调用KafkaStream的iterator拿到ConsumerIterator
// 然后就可以迭代获得producer发送过来的消息了
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

MessageAndMetadata<byte[], byte[]> mm = null;
while (iterator.hasNext()) {
mm = iterator.next();
System.out.println(//
" group " + props.get("group.id") + //
", partition " + mm.partition() + ", " + //
new String(mm.message()));
Thread.sleep(100);
}
}
}


SamplePartition

SamplePartition是自定义的partition,用来对消息进行分区:

/**
* project:kafka-study
* file:SamplePartition.java
* author:zxh
* time:2015年9月28日 下午5:37:19
* description:
*/
package cn.com.dimensoft.kafka;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
* class: SamplePartition
* package: cn.com.dimensoft.kafka
* author:zxh
* time: 2015年9月28日 下午5:37:19
* description: 设置自定义的partition,指明当topic有多个partition时如何对message进行分区
*/
public class SamplePartition implements Partitioner {

/**
* constructor
* author:zxh
* @param verifiableProperties
* description: 去除该构造方法后启动producer报错NoSuchMethodException
*/
public SamplePartition(VerifiableProperties verifiableProperties) {

}

@Override
/**
* 这里对message分区的依据只是简单的让key(这里的key就是Producer[K,V]中的K)对partition的数量取模
*/
public int partition(Object obj, int partitions) {

// 对partitions数量取模
return Integer.parseInt(obj.toString()) % partitions;
}

}


Constant

Constant是常量类:

/**
* project:kafka-study
* file:Constant.java
* author:zxh
* time:2015年9月28日 上午10:29:50
* description:
*/
package cn.com.dimensoft.kafka.constant;
/**
* class: Constant
* package: cn.com.dimensoft.kafka.constant
* author:zxh
* time: 2015年9月28日 上午10:29:50
* description:
*/
public class Constant {

public static final String TOPIC = "topic-test";
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: