您的位置:首页 > 运维架构 > Apache

apache kafkac系列lient发展-java

2015-06-13 09:59 686 查看
apache kafka区QQ群:162272557

1.依赖包

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.8.1</version>

</dependency>

2.producer程序开发样例

2.1 producer參数说明

#指定kafka节点列表。用于获取metadata,不必所有指定

metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092

# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到相应分区

#partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner

# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。

压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。

compression.codec=none

# 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默觉得kafka.serializer.DefaultEncoder,即byte[]

serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder

# serializer.class=kafka.serializer.DefaultEncoder

# serializer.class=kafka.serializer.StringEncoder

# 假设要压缩消息。这里指定哪些topic要压缩消息,默认empty,表示不压缩。

#compressed.topics=

########### request ack ###############

# producer接收消息ack的时机.默觉得0.

# 0: producer不会等待broker发送ack

# 1: 当leader接收到消息之后发送ack

# 2: 当全部的follower都同步消息成功后发送ack.

request.required.acks=0

# 在向producer发送ack之前,broker同意等待的最大时间

# 假设超时,broker将会向producer发送一个error ACK.意味着上一次消息由于某种

# 原因未能成功(比方follower未能同步成功)

request.timeout.ms=10000

########## end #####################

# 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步能够提高发送吞吐量,

# 也意味着消息将会在本地buffer中,并适时批量发送,可是也可能导致丢失未发送过去的消息

producer.type=sync

############## 异步发送 (下面四个异步參数可选) ####################

# 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默觉得5000ms

# 此值和batch.num.messages协同工作.

queue.buffering.max.ms = 5000

# 在async模式下,producer端同意buffer的最大消息量

# 不管怎样,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积

# 此时,假设消息的条数达到阀值,将会导致producer端堵塞或者消息被抛弃,默觉得10000

queue.buffering.max.messages=20000

# 假设是异步,指定每次批量发送数据量。默觉得200

batch.num.messages=500

# 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后

# 堵塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出不论什么消息)

# 此时producer能够继续堵塞或者将消息抛弃,此timeout值用于控制"堵塞"的时间

# -1: 无堵塞超时限制,消息不会被抛弃

# 0:马上清空队列,消息被抛弃

queue.enqueue.timeout.ms=-1

################ end ###############

# 当producer接收到error ACK,或者没有接收到ACK时,同意消息重发的次数

# 由于broker并没有完整的机制来避免消息反复,所以当网络异常时(比方ACK丢失)

# 有可能导致broker接收到反复的消息,默认值为3.

message.send.max.retries=3

# producer刷新topic metada的时间间隔,producer须要知道partition leader的位置,以及当前topic的情况

# 因此producer须要一个机制来获取最新的metadata,当producer遇到特定错误时,将会马上刷新

# (比方topic失效,partition丢失,leader失效等),此外也能够通过此參数来配置额外的刷新机制,默认值600000

topic.metadata.refresh.interval.ms=60000

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();

Properties props = new Properties();
props.put("metadata.broker.list", "192.168.2.105:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder"); //默认字符串编码消息
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}


2.1 指定keywordkey。发送消息到指定partitions

zookeeper.sync.time.ms=2000

#指定消费组

group.id=xxx

# 当consumer消费一定量的消息之后,将会自己主动向zookeeper提交offset信息

# 注意offset信息并非每消费一次消息就向zk提交一次,而是如今本地保存(内存),并定期提交,默觉得true

auto.commit.enable=true

# 自己主动更新时间。默认60 * 1000

auto.commit.interval.ms=1000

# 当前consumer的标识,能够设定,也能够有系统生成,主要用来跟踪消息消费情况,便于观察

conusmer.id=xxx

# 消费者client编号。用于区分不同client,默认client程序自己主动产生

client.id=xxxx

# 最大取多少块缓存到消费者(默认10)

queued.max.message.chunks=50

# 当有新的consumer增加到group时,将会reblance,此后将会有partitions的消费端迁移到新

# 的consumer上,假设一个consumer获得了某个partition的消费权限,那么它将会向zk注冊

# "Partition Owner registry"节点信息,可是有可能此时旧的consumer尚没有释放此节点,

# 此值用于控制,注冊节点的重试次数.

rebalance.max.retries=5

# 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk

# 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗很多其它的consumer端内存

fetch.min.bytes=6553600

# 当消息的尺寸不足时,server堵塞的时间,假设超时,消息将马上发送给consumer

fetch.wait.max.ms=5000

socket.receive.buffer.bytes=655360

# 假设zookeeper没有offset值或offset值超出范围。

那么就给个初始的offset。有smallest、largest、

# anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest

auto.offset.reset=smallest

# 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默觉得kafka.serializer.DefaultDecoder,即byte[]

derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder


3.2 多线程并行消费topic

总结:

kafka消费者api分为high api和low api,眼下上述demo是都是使用kafka high api,高级api不用关心维护消费状态信息和负载均衡。系统会依据配置參数,

定期flush offset到zk上,假设有多个consumer且每一个consumer创建了多个线程,高级api会依据zk上注冊consumer信息,进行自己主动负载均衡操作。



注意事项:

1.高级api将会内部实现持久化每一个分区最后读到的消息的offset,数据保存在zookeeper中的消费组名中(如/consumers/push-token-group/offsets/push-token/2。

当中push-token-group是消费组,push-token是topic,最后一个2表示第3个分区),每间隔一个(默认1000ms)时间更新一次offset。

那么可能在重新启动消费者时拿到反复的消息。此外。当分区leader发生变更时也可能拿到反复的消息。因此在关闭消费者时最好等待一定时间(10s)然后再shutdown()

2.消费组名是一个全局的信息,要注意在新的消费者启动之前旧的消费者要关闭。

假设新的进程启动而且消费组名同样。kafka会加入这个进程到可用消费线程组中用来消费

topic和触发又一次分配负载均衡,那么同一个分区的消息就有可能发送到不同的进程中。

3.假设消费者组中全部consumer的总线程数量大于分区数,一部分线程或某些consumer可能无法读取消息或处于空暇状态。

4.假设分区数多于线程数(假设消费组中执行者多个消费者,则线程数为消费者组内全部消费者线程总和)。一部分线程会读取到多个分区的消息

5.假设一个线程消费多个分区消息,那么接收到的消息是不能保证顺序的。

备注:可用zookeeper web ui工具管理查看zk文件夹树数据: xxx/consumers/push-token-group/owners/push-token/2当中

push-token-group为消费组,push-token为topic,2为分区3.查看里面的内容如:

push-token-group-mobile-platform03-1405157976163-7ab14bd1-0表示该分区被该标示的线程所运行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: