kafka学习
2015-06-23 22:12
274 查看
kafka是由LinkedIn开发,主要是用来处理Linkedin的大面积活跃数据流处理(activity stream). 此类的数据经常用来反
映网站的一些有用的信息,比如PV,页面展示给哪些用户访问,用户搜索什么关键字最多,这类信息经常被log到文件
里,然后线下且周期性的去分析这些数据。现在这种用户活跃数据已经成为互联网公司重要的一部分,所以必须构建
一个更轻量且更精炼的基础架构。
活跃数据 使用案列
分析一下用户行为(pageviews),以便我能设计出更好的广告位。
快速的统计用户投票,点击。
对用户的搜索关键词进行统计,分析出当前的流行趋势。
防止用户对网站进行无限制的抓取数据,以及超限制的使用API,辨别垃圾。
对网站进行全方位的实时监控,从而得到实时有效的性能数据,并且及时的发成警告。
批量的导入数据到数据仓库,对数据进行离线分析,从而得到有价值的商业信息。(0.6可以直接将数据导入Hadoop)
活跃数据的特点
高流量的活跃数据是无法确定其大小的,因为他可能随时的变化,比如商家可能促销,节假日打折,突然又冒出
一个跳楼价等等。所有的数据可能是数量级的往上递增。 传统日志分析方式都是需要离线,而且操作起来比较复杂,
根本无法满足实时的分析。另一方面,现有的消息队列系统只能达到近似实时的分析,因为无法消费大量的持久化在
队列系统上的信息。Kafka的目标就是能够成为一个高效的队列平台,无论是处理离线的信息还是在线的信息。
kafka是一个消息订阅和发布的系统
我们将message的发布(publish)者称为producer,将message的订阅(subscribe)者称为consumer,将中间的存
储阵列称作broker。
kafka集群
核心概念
1.topic (发布的消息都存在于某个主题中)
2.partition (topic中的分区,为了负载均衡)
3.offset (kafka的消费是依靠读取每个消息的偏移量,并且消费完不立刻销毁消息,可配置消息的生命周期)
4.consumer group
一个consumer group对于同一条消息只能消费一次
不同consumer group可以共享同一个消息,同样每个group只能消费一次
kafka伪分布式搭建
tar -zxvf kafka_2.10-0.8.1.1.tgz
#(启动自带的zookeeper)
启动ZK bin/zookeeper-server-start.sh config/zookeeper.properties
#启动kafka服务
启动服务 bin/kafka-server-start.sh config/server.properties
#在地址为localhost:2181的zookeeper上创建一个主题test,副本因子是1
创建主题 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查看地址为localhost:2181的zookeeper上所有主题
查看主题 bin/kafka-topics.sh --list --zookeeper localhost:2181
#查看test主题的详细信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
命令:
创建生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
创建消费者 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在创建生产者的命令行输入消息,即可在消费者命令行中显示。
可以在伪分布下启动多个kafka服务(broker)
创建多个server.properties文件分别命名,启动命令kafka-server-start.sh config/server.properties分别使用这几个
server.properties文件。需要修改每个server.properties中的内容:
1.broker.id
2.port
3.log.dirs
保证每个broker服务的配置项是唯一,分别启动。
kafka集群搭建
修改server.properties文件
1.指定唯一的broker.id
2.host.name指定主机ip或者主机名
3.log.dirs指定一个日志输出目录
将kafka配置信息复制到其他节点,分别修改broker.id和host.name
在各节点分别执行kafka-server-start.sh config/server.properties >/del/null 2>&1 &
以后台进程启动(&),并且将输出送到垃圾箱(>/del/null),将错误信息也输送到1的位置(垃圾箱)2>&1
映网站的一些有用的信息,比如PV,页面展示给哪些用户访问,用户搜索什么关键字最多,这类信息经常被log到文件
里,然后线下且周期性的去分析这些数据。现在这种用户活跃数据已经成为互联网公司重要的一部分,所以必须构建
一个更轻量且更精炼的基础架构。
活跃数据 使用案列
分析一下用户行为(pageviews),以便我能设计出更好的广告位。
快速的统计用户投票,点击。
对用户的搜索关键词进行统计,分析出当前的流行趋势。
防止用户对网站进行无限制的抓取数据,以及超限制的使用API,辨别垃圾。
对网站进行全方位的实时监控,从而得到实时有效的性能数据,并且及时的发成警告。
批量的导入数据到数据仓库,对数据进行离线分析,从而得到有价值的商业信息。(0.6可以直接将数据导入Hadoop)
活跃数据的特点
高流量的活跃数据是无法确定其大小的,因为他可能随时的变化,比如商家可能促销,节假日打折,突然又冒出
一个跳楼价等等。所有的数据可能是数量级的往上递增。 传统日志分析方式都是需要离线,而且操作起来比较复杂,
根本无法满足实时的分析。另一方面,现有的消息队列系统只能达到近似实时的分析,因为无法消费大量的持久化在
队列系统上的信息。Kafka的目标就是能够成为一个高效的队列平台,无论是处理离线的信息还是在线的信息。
kafka是一个消息订阅和发布的系统
我们将message的发布(publish)者称为producer,将message的订阅(subscribe)者称为consumer,将中间的存
储阵列称作broker。
kafka集群
核心概念
1.topic (发布的消息都存在于某个主题中)
2.partition (topic中的分区,为了负载均衡)
3.offset (kafka的消费是依靠读取每个消息的偏移量,并且消费完不立刻销毁消息,可配置消息的生命周期)
4.consumer group
一个consumer group对于同一条消息只能消费一次
不同consumer group可以共享同一个消息,同样每个group只能消费一次
kafka伪分布式搭建
tar -zxvf kafka_2.10-0.8.1.1.tgz
#(启动自带的zookeeper)
启动ZK bin/zookeeper-server-start.sh config/zookeeper.properties
#启动kafka服务
启动服务 bin/kafka-server-start.sh config/server.properties
#在地址为localhost:2181的zookeeper上创建一个主题test,副本因子是1
创建主题 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查看地址为localhost:2181的zookeeper上所有主题
查看主题 bin/kafka-topics.sh --list --zookeeper localhost:2181
#查看test主题的详细信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
命令:
创建生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
创建消费者 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在创建生产者的命令行输入消息,即可在消费者命令行中显示。
可以在伪分布下启动多个kafka服务(broker)
创建多个server.properties文件分别命名,启动命令kafka-server-start.sh config/server.properties分别使用这几个
server.properties文件。需要修改每个server.properties中的内容:
1.broker.id
2.port
3.log.dirs
保证每个broker服务的配置项是唯一,分别启动。
kafka集群搭建
修改server.properties文件
1.指定唯一的broker.id
2.host.name指定主机ip或者主机名
3.log.dirs指定一个日志输出目录
将kafka配置信息复制到其他节点,分别修改broker.id和host.name
在各节点分别执行kafka-server-start.sh config/server.properties >/del/null 2>&1 &
以后台进程启动(&),并且将输出送到垃圾箱(>/del/null),将错误信息也输送到1的位置(垃圾箱)2>&1
package com.bigdata.hadoop.kafka; import java.util.Properties; import java.util.concurrent.TimeUnit; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; //创建生产者,每秒给主题test12发送一个消息 public class KafkaProducer extends Thread{ String topic = "test12"; String message = "hello world"; @Override public void run() { Properties originalProps = new Properties(); originalProps.put("serializer.class", StringEncoder.class.getName()); originalProps.put("metadata.broker.list", "hadoop4:9092,hadoop5:9092,hadoop6:9092"); Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(originalProps)); int i = 0; while (true) { producer.send(new KeyedMessage<Integer, String>(topic, message + i++)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { new KafkaProducer().start(); } }
package com.bigdata.hadoop.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; //创建消费者,消费主题test12的消息 public class KafkaConsumer extends Thread{ String topic = "test12"; String message = "hello world"; @Override public void run() { Properties originalProps = new Properties(); originalProps.put("group.id", "group1"); originalProps.put("zookeeper.connect", "hadoop4:2181"); //创建消费者连接器 ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(originalProps)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1);//消费topic 每次消费1条 //获取指定topic的消息流 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = javaConsumerConnector.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); //迭代消息流中的消息 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) { System.err.println(new String(iterator.next().message())); } } public static void main(String[] args) { new KafkaConsumer().start(); } }
相关文章推荐
- 10gR2 RAC Install issues on Oracle EL5 or RHEL5 or SLES10 (VIPCA / SRVCTL / OUI Failures) (Doc ID 41
- Activity生命周期
- 燕十八老师mysql高级篇代码(手敲)
- Android关于EditText和WindowManager的一些知识
- Makefile基础
- 【java设计模式】享元模式
- 结构体内部偏移的妙用
- 一道ctf pwn 的思路以及解法
- 小费经济学
- hdoj 2899 Strange fuction 【二分查找 注意精度】
- 程序集加载与反射(一):理论篇
- java 1.5和1.6中的新特性
- 打印完整的递归调用栈
- Swift介绍
- C++中多态性学习(上)
- 设计模式模式之设计原则
- POJ_2186 Popular Cows(强连通分量)
- 数据库第七次作业 王小芬 2013212954 理学院
- java 2015.6.23
- Android客户端与服务器端交互方式总结