您的位置:首页 > 编程语言 > Java开发

Kafka学习(一)——0.10.2.0版本基础知识,java简单的demo

2017-03-24 22:38 543 查看
最近开始学习Kafka。简单的记录一下学习过程。

首先本地虚拟机安装centos 7

请参考另一篇文章。(稍后补上)

安装jdk

这里下载了 jdk-7u71-linux-x64.rpm

安装后java -version已经成功,不过还是习惯性的配置了一下

JAVA_HOME=/usr/java/jdk1.7.0_71

JRE_HOME=/usr/java/jdk1.7.0_71/jre

PATH=JAVAHOME/bin:JRE_HOME/bin

CLASSPATH=.:JAVAHOME/lib/dt.jar:JAVA_HOME/lib/tools.jar:$JRE_HOME/lib

export JAVA_HOME JRE_HOME PATH CLASSPATH

安装kafka

wget http://mirrors.hust.edu.cn/apache/kafka/0.10.2.0/kafka-0.10.2.0-src.tgz

tar –xzvr kafka-0.10.2.0-src.tgz

查看官方Quickstart

启动kafka自带zookeeper

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties

创建topic

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

查看topic

bin/kafka-topics.sh –list –zookeeper localhost:2181

启动producer发送消息

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

This is a message

This is another message

启动consumer接收消息

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

This is a message

This is another message

官方示例完成~

Jar包常用类源码学习

生产者:

kafka.javaapi.producer.Producer较早的版本都会用这个,新版的注解

@deprecated(“This class has been deprecated and will be removed in a future release. ” +

“Please use org.apache.kafka.clients.producer.KafkaProducer instead.”, “0.10.0.0”)

去看KafkaProducer类,最终的构造方法如下:

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
...
}


Key和Value都需要实现org.apache.kafka.common.serialization.Serializer这个接口:

public abstract interface Serializer<T> extends Closeable {
public abstract void configure(Map<String
d11b
, ?> paramMap, boolean paramBoolean);

public abstract byte[] serialize(String paramString, T paramT);

public abstract void close();
}


消费者:

同理,KafkaConsumer类,构造方法如下:

private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer){
...
}


Key和Value都需要实现org.apache.kafka.common.serialization.Deserializer这个接口:

public abstract interface Deserializer<T> extends Closeable {
public abstract void configure(Map<String, ?> paramMap, boolean paramBoolean);

public abstract T deserialize(String paramString, byte[] paramArrayOfByte);

public abstract void close();
}


消息对象序列化工具类BeanUtils:

public class BeanUtils {
private BeanUtils(){}
/**
* 对象转字节数组
* @param obj
* @return
*/
public static byte[] ObjectToBytes(Object obj){
...
}
/**
* 字节数组转对象
* @param bytes
* @return
*/
public static Object BytesToObject(byte[] bytes){
...
}
}


第一个java示例

添加maven依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
</dependency>


定义消息对象:

以一个支付消息对象为例

public class PayInfo implements java.io.Serializable{

public PayInfo(String orderNo,Long amount,String outTradeDate){
this.orderNo = orderNo;
this.amount = amount;
this.outTradeDate = outTradeDate;
}

private static final long serialVersionUID = 6235710215970026320L;

private String orderNo;//订单号

private Long amount;//金额

private String outTradeDate;//交易日

public String getOrderNo() {
return orderNo;
}

public void setOrderNo(String orderNo) {
this.orderNo = orderNo;
}

public Long getAmount() {
return amount;
}

public void setAmount(Long amount) {
this.amount = amount;
}

public String getOutTradeDate() {
return outTradeDate;
}

public void setOutTradeDate(String outTradeDate) {
this.outTradeDate = outTradeDate;
}

public String toString(){
return new StringBuilder("order=").append(orderNo).append("|amount=").append(amount).append("|outTradeDate=").append(outTradeDate).toString();
}
}


定义序列化和反序列化对象类:

public class PayInfoSerializer implements Serializer<PayInfo>{
public PayInfoSerializer(){
}
public void configure(Map paramMap, boolean paramBoolean) {
}
public byte[] serialize(String paramString, PayInfo pay) {
return BeanUtils.ObjectToBytes(pay);
}
public void close() {
}
}


public class PayInfoDeSerializer implements Deserializer<PayInfo>{

public void configure(Map<String, ?> paramMap, boolean paramBoolean) {
}

public PayInfo deserialize(String paramString, byte[] paramArrayOfByte) {
return (PayInfo) BeanUtils.BytesToObject(paramArrayOfByte);
}

public void close() {
}
}


生产者demo:

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerDemo {
private static String topic = "pay"; // 定义要操作的主题

public static void main(String[] args) {
try{
newKafka();
}catch(Exception e){
System.out.println(e);
}
}

/**
*新版kafka生产者示例
*/
private static void newKafka(){
Properties pro = new Properties();
pro.setProperty("bootstrap.servers", "192.168.18.132:9092");
pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
pro.setProperty("value.serializer", PayInfoSerializer.class.getName());
KafkaProducer<String, Object> producer = new KafkaProducer<String, Object>(pro);
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, Object>(topic,new PayInfo("201703200001",101L,"20170320")));
System.out.println(future.toString());
}
}


消费者demo:

package kafka;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerDemo {

private static Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
private static String topic = "pay"; // 定义要操作的主题
public static void main(String[] args) {
newConsumer();
}

private static void newConsumer(){
Properties pro = new Properties();
pro.setProperty("bootstrap.servers", "192.168.18.132:9092");
pro.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("value.deserializer", PayInfoDeSerializer.class.getName());
pro.setProperty("group.id", "group1");
KafkaConsumer<String,Object> consumer = new KafkaConsumer<String,Object>(pro);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(100);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
}
}


开启消费者demo持续等待生产者发送消息。

运行生产者demo。

在消费者控制台打印出如下结果:

offset = 0, key = null, value = order=201703200001|amount=100|outTradeDate=20170320


一个简单的单点kafka示例就完成啦~

未完待续。。。。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息