您的位置:首页 > 其它

kafka学习

2015-06-23 22:12 274 查看
kafka是由LinkedIn开发,主要是用来处理Linkedin的大面积活跃数据流处理(activity stream). 此类的数据经常用来反

映网站的一些有用的信息,比如PV,页面展示给哪些用户访问,用户搜索什么关键字最多,这类信息经常被log到文件

里,然后线下且周期性的去分析这些数据。现在这种用户活跃数据已经成为互联网公司重要的一部分,所以必须构建

一个更轻量且更精炼的基础架构。

活跃数据 使用案列

分析一下用户行为(pageviews),以便我能设计出更好的广告位。

快速的统计用户投票,点击。

对用户的搜索关键词进行统计,分析出当前的流行趋势。

防止用户对网站进行无限制的抓取数据,以及超限制的使用API,辨别垃圾。

对网站进行全方位的实时监控,从而得到实时有效的性能数据,并且及时的发成警告。

批量的导入数据到数据仓库,对数据进行离线分析,从而得到有价值的商业信息。(0.6可以直接将数据导入Hadoop)

活跃数据的特点

高流量的活跃数据是无法确定其大小的,因为他可能随时的变化,比如商家可能促销,节假日打折,突然又冒出

一个跳楼价等等。所有的数据可能是数量级的往上递增。 传统日志分析方式都是需要离线,而且操作起来比较复杂,

根本无法满足实时的分析。另一方面,现有的消息队列系统只能达到近似实时的分析,因为无法消费大量的持久化在

队列系统上的信息。Kafka的目标就是能够成为一个高效的队列平台,无论是处理离线的信息还是在线的信息。



kafka是一个消息订阅和发布的系统

我们将message的发布(publish)者称为producer,将message的订阅(subscribe)者称为consumer,将中间的存

储阵列称作broker。



kafka集群



核心概念

1.topic (发布的消息都存在于某个主题中)

2.partition (topic中的分区,为了负载均衡)

3.offset (kafka的消费是依靠读取每个消息的偏移量,并且消费完不立刻销毁消息,可配置消息的生命周期)

4.consumer group

一个consumer group对于同一条消息只能消费一次

不同consumer group可以共享同一个消息,同样每个group只能消费一次



kafka伪分布式搭建

tar -zxvf kafka_2.10-0.8.1.1.tgz

#(启动自带的zookeeper)

启动ZK bin/zookeeper-server-start.sh config/zookeeper.properties

#启动kafka服务

启动服务 bin/kafka-server-start.sh config/server.properties

#在地址为localhost:2181的zookeeper上创建一个主题test,副本因子是1

创建主题 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

#查看地址为localhost:2181的zookeeper上所有主题

查看主题 bin/kafka-topics.sh --list --zookeeper localhost:2181

#查看test主题的详细信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

命令:

创建生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

创建消费者 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

在创建生产者的命令行输入消息,即可在消费者命令行中显示。

可以在伪分布下启动多个kafka服务(broker)

创建多个server.properties文件分别命名,启动命令kafka-server-start.sh config/server.properties分别使用这几个

server.properties文件。需要修改每个server.properties中的内容:

1.broker.id

2.port

3.log.dirs

保证每个broker服务的配置项是唯一,分别启动。

kafka集群搭建

修改server.properties文件

1.指定唯一的broker.id

2.host.name指定主机ip或者主机名

3.log.dirs指定一个日志输出目录

将kafka配置信息复制到其他节点,分别修改broker.id和host.name

在各节点分别执行kafka-server-start.sh config/server.properties >/del/null 2>&1 &

以后台进程启动(&),并且将输出送到垃圾箱(>/del/null),将错误信息也输送到1的位置(垃圾箱)2>&1

package com.bigdata.hadoop.kafka;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

//创建生产者,每秒给主题test12发送一个消息
public class KafkaProducer extends Thread{

String topic = "test12";
String message = "hello world";

@Override
public void run() {
Properties originalProps = new Properties();
originalProps.put("serializer.class", StringEncoder.class.getName());
originalProps.put("metadata.broker.list", "hadoop4:9092,hadoop5:9092,hadoop6:9092");

Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(originalProps));

int i = 0;
while (true) {
producer.send(new KeyedMessage<Integer, String>(topic, message + i++));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

public static void main(String[] args) {
new KafkaProducer().start();
}
}


package com.bigdata.hadoop.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

//创建消费者,消费主题test12的消息
public class KafkaConsumer extends Thread{

String topic = "test12";
String message = "hello world";

@Override
public void run() {
Properties originalProps = new Properties();
originalProps.put("group.id", "group1");
originalProps.put("zookeeper.connect", "hadoop4:2181");
//创建消费者连接器
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(originalProps));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);//消费topic 每次消费1条
//获取指定topic的消息流
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = javaConsumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
//迭代消息流中的消息
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()) {
System.err.println(new String(iterator.next().message()));
}
}

public static void main(String[] args) {
new KafkaConsumer().start();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: