kafka生产者、消费者java示例
2015-11-09 16:06
302 查看
1. 生产者
import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.concurrent.*; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroup { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new Consumer(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = "localhost:2181"; String groupId = "0"; String topic = "mykafka"; int threads = 2; //启动的线程数 ConsumerGroup group = new ConsumerGroup(zooKeeper, groupId, topic); group.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } group.shutdown(); } }
View Code
相关文章推荐
- Java将Unix时间戳转换成指定格式日期
- Spring、Spring依赖注入与编码剖析Spring依赖注入的原理
- Spring、Spring依赖注入与编码剖析Spring依赖注入的原理
- 集合_java集合框架
- spring AOP 注解
- 记录java的学习之路
- Java的堆栈
- Java中关于时间的处理 Date SimpleDateFormat Calendar GegorianCalendar
- Java中快如闪电的线程间通讯
- Java枚举
- eclipse按住ctrl点布局文件只能跳转到R文件
- Ajax访问后台500 (Internal Server Error)问题
- Ajax访问后台500 (Internal Server Error)问题
- java定时器 schedule和scheduleAtFixedRate区别
- JAVA 大数值
- 修改窗体颜色,是的eclipse的背景色改变,保护开发者视力
- Eclipse快捷键10个最有用的快捷键
- eclipse spring4 ehache2.10 整合
- java常用设计模式
- java原理—反射机制