Apache Kafka
2015-11-30 15:09
706 查看
Apache kafka是分布式发布-订阅消息系统。
主要的组件:
主要的组件:
·话题(Topic)
话题是特定类型的消息流 消息是字节的有效负载(payload) 而话题就是消息的分类名 或者 种子(Feed)名 已经发布的消息保存在一组服务器上面,它们被成为代理(Broker)或者Kafka集群
·生产者(Producer)
生产者是能够发布消息到话题的对象,任意对象!
·消费者(Consumer)
消费者可以订阅一个或者多个消息,并且从Broker中拉取数据,进而消费这些已经发布的消息。
架构如图所示
生产者代码
/** * Instantiates a new Kafka producer. * * @param topic the topic * @param directoryPath the directory path */ public KafkaMailProducer(String topic, String directoryPath) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "localhost:9092"); producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; this.directoryPath = directoryPath; } public void run() { Path dir = Paths.get(directoryPath); try { new WatchDir(dir).start(); new ReadDir(dir).start(); } catch (IOException e) { e.printStackTrace(); } }
消费者代码
public KafkaMailConsumer(String topic) { consumer = Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } /** * Creates the consumer config. * * @return the consumer config */ private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaMailProperties.zkConnect); props.put("group.id", KafkaMailProperties.groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) System.out.println(new String(it.next().message())); }
相关文章推荐
- Apache Isis 1.4.0 发布,领域驱动开发框架
- 分布式版本管理git入门指南使用资料汇总及文章推荐
- Linux快速构建apache web服务器
- Awstats处理多apache日志
- 安装perl模块小窍门
- PHP+Apache在Windows 9x下的安装和配置
- Apache服务器配置全攻略
- Apache Web让JSP“动”起来
- Linux Apache+MySQL+PHP
- 建立Apache+PHP+MySQL数据库驱动的动态网站
- Java 版的 Ruby 解释器 JRuby 1.7.14 发布
- 浅析Ruby中继承和消息的相关知识
- Fedora Linux 7 Test 4 发布 下载地址
- apache 环境下 php 的配置注意事项
- C#分布式事务的超时处理实例分析
- 使用npm发布Node.JS程序包教程
- Erlang分布式节点中的注册进程使用实例
- Visual C++中MFC消息的分类
- MFC自定义消息的实现方法
- 在RedHat下安装apache jserv 1.1.2方法