Kafka简单测试demo
2015-09-30 15:13
399 查看
介绍
项目很简单,就是对搭建好的kafka集群【kafka_2.10-0.8.2.1】做了简单的demo测试,调用了下high level接口。源码
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.com.dimensoft.kafka</groupId> <artifactId>kafka-study</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka-study</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency> </dependencies> </project>
SampleProducer
SampleProducer是自定义的producer,用来生产消息并将消息push给broker:/** * project:kafka-study * file:SampleProducer.java * author:zxh * time:2015年9月25日 下午4:05:51 * description: */ package cn.com.dimensoft.kafka; import java.util.Properties; import cn.com.dimensoft.kafka.constant.Constant; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * class: SampleProducer * package: cn.com.dimensoft.kafka * author:zxh * time: 2015年9月25日 下午4:05:51 * description: * step1 : 创建存放配置信息的properties * step2 : 将properties封装到ProducerConfig中 * step3 : 创建producer对象 * step4 : 发送数据流 */ public class SampleProducer { public static void main(String[] args) throws InterruptedException { // step1 : 创建存放配置信息的properties Properties props = new Properties(); // 指定broker集群 props.put("metadata.broker.list", "hadoop-pseudo.com.cn:9092"); /** * ack机制 * 0 which means that the producer never waits for an acknowledgement from the broker * 1 which means that the producer gets an acknowledgement after the leader replica has received the data * -1 The producer gets an acknowledgement after all in-sync replicas have received the data */ props.put("request.required.acks", "1"); // 消息发送类型 同步/异步 props.put("producer.type", "sync"); // 指定message序列化类,默认kafka.serializer.DefaultEncoder props.put("serializer.class", "kafka.serializer.StringEncoder"); // 设置自定义的partition,当topic有多个partition时如何对message进行分区 props.put("partitioner.class", "cn.com.dimensoft.kafka.SamplePartition"); // step2 : 将properties封装到ProducerConfig中 ProducerConfig config = new ProducerConfig(props); // step3 : 创建producer对象 Producer<String, String> producer = new Producer<String, String>(config); for (int i = 1; i <= 50; i++) { // step4 : 发送数据流 producer.send(new KeyedMessage<String, String>(Constant.TOPIC, // i + "", // String.valueOf("我是 " + i + " 号"))); Thread.sleep(10); } } }
SampleConsumer
SampleConsumer是自定义的consumer,用来从broker pull消息对处理:/** * project:kafka-study * file:SampleConsumer.java * author:zxh * time:2015年9月25日 下午4:41:26 * description: */ package cn.com.dimensoft.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import cn.com.dimensoft.kafka.constant.Constant; /** * class: SampleConsumer * package: cn.com.dimensoft.kafka * author:zxh * time: 2015年9月25日 下午4:41:26 * description: * step1 : 创建存放配置信息的properties * step2 : 将properties封装到ConsumerConfig中 * step3 : 调用Consumer的静态方法创建ConsumerConnector * step4 : 根据创建好的ConsumerConnector对象创建MessageStreams集合 * step5 : 根据具体的topic名称得到数据流KafkaStream * step6 : 调用KafkaStream的iterator拿到ConsumerIterator对应,然后就可以迭代获得producer发送过来的消息了 */ public class SampleConsumer { public static void main(String[] args) throws InterruptedException { // step1 : 创建存放配置信息的properties Properties props = new Properties(); props.put("zookeeper.connect", "hadoop-pseudo.com.cn:2181"); props.put("group.id", "1"); // 下面这2个参数需要设置,否则consumer每次启动都会从头开始读取数据 props.put("auto.commit.enable", "true"); props.put("auto.commit.interval.ms", "1000"); // What to do when there is no initial offset in ZooKeeper or if an // offset is out of range props.put("auto.offset.reset", "smallest"); // step2 : 将properties封装到ConsumerConfig中 ConsumerConfig config = new ConsumerConfig(props); // step3 : 调用Consumer的静态方法创建ConsumerConnector ConsumerConnector connector = Consumer .createJavaConsumerConnector(config); // step4 : 根据创建好的ConsumerConnector对象创建MessageStreams集合 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); // 将每个topic对应的线程数添加到map中,topicCountMap中的topic对应的value值一直没测出实际效果 topicCountMap.put(Constant.TOPIC, 1); // 根据填充好的map获得streams集合 // a map of (topic, list of KafkaStream) pairs Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector .createMessageStreams(topicCountMap); // step5 : 根据具体的topic名称得到数据流KafkaStream KafkaStream<byte[], byte[]> stream = streams.get(Constant.TOPIC).get(0); // step6 : 调用KafkaStream的iterator拿到ConsumerIterator // 然后就可以迭代获得producer发送过来的消息了 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); MessageAndMetadata<byte[], byte[]> mm = null; while (iterator.hasNext()) { mm = iterator.next(); System.out.println(// " group " + props.get("group.id") + // ", partition " + mm.partition() + ", " + // new String(mm.message())); Thread.sleep(100); } } }
SamplePartition
SamplePartition是自定义的partition,用来对消息进行分区:/** * project:kafka-study * file:SamplePartition.java * author:zxh * time:2015年9月28日 下午5:37:19 * description: */ package cn.com.dimensoft.kafka; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; /** * class: SamplePartition * package: cn.com.dimensoft.kafka * author:zxh * time: 2015年9月28日 下午5:37:19 * description: 设置自定义的partition,指明当topic有多个partition时如何对message进行分区 */ public class SamplePartition implements Partitioner { /** * constructor * author:zxh * @param verifiableProperties * description: 去除该构造方法后启动producer报错NoSuchMethodException */ public SamplePartition(VerifiableProperties verifiableProperties) { } @Override /** * 这里对message分区的依据只是简单的让key(这里的key就是Producer[K,V]中的K)对partition的数量取模 */ public int partition(Object obj, int partitions) { // 对partitions数量取模 return Integer.parseInt(obj.toString()) % partitions; } }
Constant
Constant是常量类:/** * project:kafka-study * file:Constant.java * author:zxh * time:2015年9月28日 上午10:29:50 * description: */ package cn.com.dimensoft.kafka.constant; /** * class: Constant * package: cn.com.dimensoft.kafka.constant * author:zxh * time: 2015年9月28日 上午10:29:50 * description: */ public class Constant { public static final String TOPIC = "topic-test"; }
相关文章推荐
- 用MySQL log调试程序
- Memcached java 简单实例
- 一种table超出高度自动出滚动条的解决方案
- 自定义HTML导航页
- Runnable接口与Thread类的区别
- SAN存储基本原理和配置、以及配置GFS(全局文件系统)
- activity进入出去动画
- LabelLayer
- BCD详细测试说明
- C# 读写Excel文件
- .NET 4.0里异常处理的新机制(转)
- oracle常用语法
- 《Python基础教程(第2版·修订版)》 第2章 列表和序列(学习笔记·二)
- Codeforces 576B
- [置顶] Mongodb性能调优
- windows下查找java应用占用CPU过高问题
- 妙用ES6解构和扩展运算符让你的代码更优雅
- dubbo源码分析4-基于netty的dubbo协议的server
- AlertDialog-----android.view.WindowManager$BadTokenException: Unable to add window
- [从零开始Unity入门视频教程]跟我一起来从零开始玩Unity3d