您的位置:首页 > 编程语言 > Java开发

kafka java编程

2016-01-25 00:00 483 查看
摘要:kafkajava编程

自定义系列化方式Encoder

kafka自带的序列化方式

DefaultEncoder默认的这个Encoder事实上不做任何处理,接收到什么byte[]就返回什么byte[]:

classDefaultEncoder(props:VerifiableProperties=null)extendsEncoder[Array[Byte]]{overridedeftoBytes(value:Array[Byte]):Array[Byte]=value}

[b][b]NullEncoder[/b][/b]不管接收什么都返回null:

classNullEncoder[T](props:VerifiableProperties=null)extendsEncoder[T]{

overridedeftoBytes(value:T):Array[Byte]=null}

[b][b][b][b]String[/b]Encoder[/b][/b][/b]则返回字符串,默认是utf-8的格式:

classStringEncoder(props:VerifiableProperties=null)extendsEncoder[String]{

valencoding=

if(props==null)

"UTF8"

else

props.getString("serializer.encoding","UTF8")

overridedeftoBytes(s:String):Array[Byte]=

if(s==null)

null

else

s.getBytes(encoding)}

[b]自己编写[/b][b]Encoder[/b]来序列化消息,只需要实现下面接口:

interfaceEncoder<T>{

publicMessagetoMessage(Tdata);

}

例如,我们的消息是一个对象



用四个字段分别表示消息的ID、用户、查询关键词和查询时间。当然你如果要设计的更复杂,可以加入IP这些信息。这些用java写就是一个简单的pojo类,这是getter/setter方法即可。由于在封转成kafka的message时需要将数据转化成bytep[]类型,可以提供一个序列化的方法。我在这里直接重写toString了:

@Override

publicStringtoString(){

Stringkeyword="[infokafkaproducer:]";

keyword=keyword+this.getId()+"-"+this.getUser()+"-"

+this.getKeyword()+"-"+this.getCurrent();

returnkeyword;

}

这样还没有完成,这只是将数据格式用java对象表现出来,解析来要对其按照kafka的消息类型进行封装,在这里我们只需要实现Encoder类即可:

publicclassKeywordMessageimplementskafka.serializer.Encoder<Keyword>{

publicstaticfinalLoggerLOG=LoggerFactory.getLogger(Keyword.class);

@Override

publicMessagetoMessage(Keywordwords){

LOG.info("startinencoding...");

returnnewMessage(words.toString().getBytes());

}

}

自定义partition

kafka自带分区方式

[b][b]DefaultPartitioner[/b][/b]默认的分区函数,他根据key的hashcode与分区数取余,得到相应的分区。

classDefaultPartitioner(props:VerifiableProperties=null)extendsPartitioner{

privatevalrandom=newjava.util.Random

defpartition(key:Any,numPartitions:Int):Int={

Utils.abs(key.hashCode)%numPartitions

}

}

[b][b]如果key为null[/b][/b]会在一定时间内往一个特定的分区发送,超过一定时间又会随机选择一个,请参考key为null时Kafka会将消息发送给哪个分区?.所以推荐你发送Kafka消息时总是指定一个key,以便消息能均匀的分到每个分区上。

[b][b]自定义分区方式[/b]需要实现下面的接口:[/b]

interfacePartitioner<T>{

intpartition(Tkey,intnumPartitions);

}

分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数,例如:

publicclassProducerPartitionerimplementsPartitioner<String>{

publicstaticfinalLoggerLOG=LoggerFactory.getLogger(Keyword.class);

@Override

publicintpartition(Stringkey,intnumPartitions){

LOG.info("ProducerPartitionerkey:"+key+"partitions:"+numPartitions);

returnkey.length()%numPartitions;

}

}

key我们是在构造数据发送对象时设置的,这个key是区分存储的关键,比如我想将我的数据按照不同的用户类别存储。

[b]java编写producer[/b]

producerapi:

classProducer{

/*将消息发送到指定分区*/

publicvoidsend(kafka.javaapi.producer.ProducerData<K,V>producerData);

/*批量发送一批消息*/

publicvoidsend(java.util.List<kafka.javaapi.producer.ProducerData<K,V>>producerData);

/*关闭producer*/

publicvoidclose();

}

例子:

Propertiesprops=newProperties();
//指定kafka节点:注意这里无需指定集群中所有Boker,只要指定其中部分即可,它会自动取meta信息并连接到对应的Boker节点
props.put("metadata.broker.list","172.17.1.163:9093");
//指定采用哪种序列化方式将消息传输给Boker,你也可以在发送消息的时候指定序列化类型,不指定则以此为默认序列化类型
props.put("serializer.class","kafka.serializer.StringEncoder");
//指定消息发送对应分区方式,若不指定,则随机发送到一个分区,也可以在发送消息的时候指定分区类型。
props.put("partitioner.class","example.producer.DefaultPartitioner");
//该属性表示你需要在消息被接收到的时候发送ack给发送者。以保证数据不丢失
props.put("request.required.acks","1");
ProducerConfigconfig=newProducerConfig(props);
//申明生产者:泛型1为分区key类型,泛型2为消息类型
Producer<String,String>producer=newProducer<String,String>(config);
//创建KeyedMessage发送消息,参数1为topic名,参数2为分区名(若为null则随机发到一个分区),参数3为消息
producer.send(newProducerData<String,String>("topic","partitionKey1","msg1"));
//另一种写法producer.send(newProducerRecord<String,String>("topic","partitionKey1","msg1"));
producer.close();

[b]java编写consumer[/b]

[b]ConsumerAPI有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。[/b]

[b]classSimpleConsumer{
/*向一个broker发送读取请求并得到消息集*/
publicByteBufferMessageSetfetch(FetchRequestrequest);
/*向一个broker发送读取请求并得到一个相应集*/
publicMultiFetchResponsemultifetch(List<FetchRequest>fetches);
/**
*得到指定时间之前的offsets
*返回值是offsets列表,以倒序排序
*@paramtime:时间,毫秒,
*如果指定为OffsetRequest$.MODULE$.LATIEST_TIME(),得到最新的offset.
*如果指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
*/
publiclong[]getOffsetsBefore(Stringtopic,intpartition,longtime,intmaxNumOffsets);
}
[/b]

注意:

1.你必须自己实现当停止消费时如何持久化offset

2.你必须自己找到哪个broker是leader以便处理topic和分区

3.你必须自己处理leader变更

使用阶段:

1.找到那些broker是leader以便读取topic和partition

2.自己决定哪个副本作为你的topic和分区

3.建立自己需要请求并自定义获取你感兴趣的数据

4.获取数据

5.当leader变更时自己识别和恢复。

例子:


Stringtopic="test2";

intpartition=1;

Stringbrokers="172.17.1.163:9093";

intmaxReads=100;//读多少条数据

//1.找leader

PartitionMetadatametadata=null;

for(StringipPort:brokers.split(",")){

//我们无需要把所有的brokers列表加进去,目的只是为了获得metedata信息,故只要有broker可连接即可

SimpleConsumerconsumer=null;

try{

String[]ipPortArray=ipPort.split(":");

consumer=newSimpleConsumer(ipPortArray[0],

Integer.parseInt(ipPortArray[1]),100000,64*1024,

"leaderLookup");

List<String>topics=newArrayList<String>();

topics.add(topic);

TopicMetadataRequestreq=newTopicMetadataRequest(topics);

//取meta信息

TopicMetadataResponseresp=consumer.send(req)

//获取topic的所有metedate信息(目测只有一个metedata信息,何来多个?)

List<TopicMetadata>metaData=resp.topicsMetadata();

for(TopicMetadataitem:metaData){

for(PartitionMetadatapart:item.partitionsMetadata()){

//获取每个meta信息的分区信息,这里我们只取我们关心的partition的metedata

System.out.println("----"+part.partitionId());

if(part.partitionId()==partition){

metadata=part;

break;

}

}

}

}catch(Exceptione){

System.out.println("ErrorcommunicatingwithBroker["+ipPort

+"]tofindLeaderfor["+topic+","+partition

+"]Reason:"+e);

}finally{

if(consumer!=null)

consumer.close();

}

}

if(metadata==null||metadata.leader()==null){

System.out.println("metadataorleadernotfound,exit.");

return;

}

//拿到leader

BrokerleadBroker=metadata.leader();

//获取所有副本

System.out.println(metadata.replicas());

//2.获取lastOffset(这里提供了两种方式:从头取或从最后拿到的开始取,下面这个是从头取)

longwhichTime=kafka.api.OffsetRequest.EarliestTime();

//这个是从最后拿到的开始取

//longwhichTime=kafka.api.OffsetRequest.LatestTime();

System.out.println("lastTime:"+whichTime);

StringclientName="Client_"+topic+"_"+partition;

SimpleConsumerconsumer=newSimpleConsumer(leadBroker.host(),

leadBroker.port(),100000,64*1024,clientName);

TopicAndPartitiontopicAndPartition=newTopicAndPartition(topic,

partition);

Map<TopicAndPartition,PartitionOffsetRequestInfo>requestInfo=newHashMap<TopicAndPartition,PartitionOffsetRequestInfo>();

requestInfo.put(topicAndPartition,newPartitionOffsetRequestInfo(

whichTime,1));

OffsetRequestrequest=newOffsetRequest(requestInfo,

kafka.api.OffsetRequest.CurrentVersion(),clientName);

//获取指定时间前有效的offset列表

OffsetResponseresponse=consumer.getOffsetsBefore(request);

if(response.hasError()){

System.out

.println("ErrorfetchingdataOffsetDatatheBroker.Reason:"

+response.errorCode(topic,partition));

return;

}

//千万不要认为offset一定是从0开始的

long[]offsets=response.offsets(topic,partition);

System.out.println("offsetlist:"+Arrays.toString(offsets));

longoffset=offsets[0];

//读数据

while(maxReads>0){

//注意不要调用里面的replicaId()方法,这是内部使用的。

FetchRequestreq=newFetchRequestBuilder().clientId(clientName)

.addFetch(topic,partition,offset,100000).build();

FetchResponsefetchResponse=consumer.fetch(req);

if(fetchResponse.hasError()){

//出错处理。这里只直接返回了。实际上可以根据出错的类型进行判断,如code==ErrorMapping.OffsetOutOfRangeCode()表示拿到的offset错误

//一般出错处理可以再次拿offset,或重新找leader,重新建立consumer。可以将上面的操作都封装成方法。再在该循环来进行消费

//当然,在取所有leader的同时可以用metadata.replicas()更新最新的节点信息。另外zookeeper可能不会立即检测到有节点挂掉,故如果发现老的leader和新的leader一样,可能是leader根本没挂,也可能是zookeeper还没检测到,总之需要等等。

shortcode=fetchResponse.errorCode(topic,partition);

System.out.println("ErrorfetchingdatafromtheBroker:"

+leadBroker+"Reason:"+code);

return;

}

//取一批消息

booleanempty=true;

for(MessageAndOffsetmessageAndOffset:fetchResponse.messageSet(

topic,partition)){

empty=false;

longcurOffset=messageAndOffset.offset();

//下面这个检测有必要,因为当消息是压缩的时候,通过fetch获取到的是一个整块数据。块中解压后不一定第一个消息就是offset所指定的。就是说存在再次取到已读过的消息。

if(curOffset<offset){

System.out.println("Foundanoldoffset:"+curOffset

+"Expecting:"+offset);

continue;

}

//可以通过当前消息知道下一条消息的offset是多少

offset=messageAndOffset.nextOffset();

ByteBufferpayload=messageAndOffset.message().payload();

byte[]bytes=newbyte[payload.limit()];

payload.get(bytes);

System.out.println(String.valueOf(messageAndOffset.offset())

+":"+newString(bytes,"UTF-8"));

maxReads++;

}

//进入循环中,等待一会后获取下一批数据

if(empty){

Thread.sleep(1000);

}

}

//退出(这里象征性的写一下)

if(consumer!=null)

consumer.close();

[b]高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。[/b]

[b]/*创建连接*/
ConsumerConnectorconnector=Consumer.create(consumerConfig);
interfaceConsumerConnector{
/**
*这个方法可以得到一个流的列表,每个流都是MessageAndMetadata的迭代,通过MessageAndMetadata可以拿到消息和其他的元数据(目前之后topic)
*Input:amapof<topic,#streams>
*Output:amapof<topic,listofmessagestreams>
*/
publicMap<String,List<KafkaStream>>createMessageStreams(Map<String,Int>topicCountMap);
/**
*你也可以得到一个流的列表,它包含了符合TopicFiler的消息的迭代,
*一个TopicFilter是一个封装了白名单或黑名单的正则表达式。
*/
publicList<KafkaStream>createMessageStreamsByFilter(
TopicFiltertopicFilter,intnumStreams);
/*提交目前消费到的offset*/
publiccommitOffsets()
/*关闭连接*/
publicshutdown()
}
[/b]

[b]这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。[/b]

注意:

1.上层api将会内部实现持久化每个分区最后读到的消息的offset,数据保存在zookeeper中的消费组名中(如/consumers/id1/offsets/test2/2。其中id1是消费组,test2是topic,最后一个2表示第3个分区),每间隔一个很短的时间更新一次offset,那么可能在重启消费者时拿到重复的消息。此外,当分区leader发生变更时也可能拿到重复的消息。因此在关闭消费者时最好等待一定时间(10s)然后再shutdown()

2.消费组名是一个全局的信息,要注意在新的消费者启动之前旧的消费者要关闭。如果新的进程启动并且消费组名相同,kafka会添加这个进程到可用消费线程组中用来消费topic和触发重新分配负载均衡,那么同一个分区的消息就有可能发送到不同的进程中。

3.如果消费的线程多于分区数,一些线程可能永远无法看到一些消息。

4.如果分区数多于线程数,一些线程会收到多个分区的消息

5.如果一个线程对应了多个分区,那么接收到的消息是不能保证顺序的。

备注:可用zk的命令查询:get/consumers/id1/owners/test3/2其中id1为消费组,test3为topic,2为分区3.查看里面的内容如:id1_163-PC-1382409386474-1091aef2-1表示该分区被该标示的线程所执行。

[b]例子:[/b]


Propertiesprops=newProperties();

//指定zookeeper服务器地址

props.put("zookeeper.connect","172.17.1.163:2181");

//指定消费组(没有它会自动添加)

props.put("group.id","id1");

//指定kafka等待多久zookeeper回复(ms)以便放弃并继续消费。

props.put("zookeeper.session.timeout.ms","4000");

//指定zookeeper同步最长延迟多久再产生异常

props.put("zookeeper.sync.time.ms","2000");

//指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息

props.put("auto.commit.interval.ms","1000");

ConsumerConnectorconsumer=Consumer

.createJavaConsumerConnector(newConsumerConfig(props));

//我们要告诉kafka该进程会有多少个线程来处理对应的topic

Map<String,Integer>topicCountMap=newHashMap<String,Integer>();

inta_numThreads=3;

//用3个线程来处理topic:test2

topicCountMap.put("test2",a_numThreads);

//拿到每个stream对应的topic

Map<String,List<KafkaStream<byte[],byte[]>>>consumerMap=consumer

.createMessageStreams(topicCountMap);

List<KafkaStream<byte[],byte[]>>streams=consumerMap.get("test2");

//调用threadpool来处理topic

ExecutorServiceexecutor=Executors.newFixedThreadPool(a_numThreads);

for(finalKafkaStreamstream:streams){

executor.submit(newRunnable(){

publicvoidrun(){

ConsumerIterator<byte[],byte[]>it=stream.iterator();

while(it.hasNext()){

System.out.println(Thread.currentThread()+":"

+newString(it.next().message()));

}

}

});

}

System.in.read();

//关闭

if(consumer!=null)consumer.shutdown();

if(executor!=null)executor.shutdown();
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka java编程