您的位置:首页 > 编程语言 > Java开发

分布式消息系统Kafka Java客户端代码

2014-08-12 00:00 591 查看
介绍

kafka是一种高吞吐量的分布式发布订阅消息系统。


kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)

当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。

高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理

测试环境

kafka_2.10-0.8.1.1 3个节点做的集群

zookeeper-3.4.5 一个实例节点

代码示例

消息生产者代码示例

import java.util.Collections;

import java.util.Date;

import java.util.Properties;

import java.util.Random;
import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;
/**

* 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

* @author Fung

*

*/

public class ProducerDemo {

public static void main(String[] args) {

Random rnd = new Random();

int events=100;
// 设置配置属性

Properties props = new Properties();

props.put(“metadata.broker.list”,”172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092″);

props.put(“serializer.class”, ”kafka.serializer.StringEncoder”);

// key.serializer.class默认为serializer.class

props.put(“key.serializer.class”, ”kafka.serializer.StringEncoder”);

// 可选配置,如果不配置,则使用默认的partitioner

props.put(“partitioner.class”, ”com.catt.kafka.demo.PartitionerDemo”);

// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失

// 值为0,1,-1,可以参考

// http://kafka.apache.org/08/configuration.html

props.put(“request.required.acks”, ”1″);

ProducerConfig config = new ProducerConfig(props);
// 创建producer

Producer<String, String> producer = new Producer<String, String>(config);

// 产生并发送消息

long start=System.currentTimeMillis();

for (long i = 0; i < events; i++) {

long runtime = new Date().getTime();

String ip = ”192.168.2.” + i;//rnd.nextInt(255);

String msg = runtime + ”,www.example.com,” + ip;

//如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0

KeyedMessage<String, String> data = new KeyedMessage<String, String>(

“page_visits”, ip, msg);

producer.send(data);

}

System.out.println(“耗时:” + (System.currentTimeMillis() - start));

// 关闭producer

producer.close();

}

}

消息消费者代码示例

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;
import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;
/**

* 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

*

* @author Fung

*

*/

public class ConsumerDemo {

private final ConsumerConnector consumer;

private final String topic;

private ExecutorService executor;
public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {

consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));

this.topic = a_topic;

}
public void shutdown() {

if (consumer != null)

consumer.shutdown();

if (executor != null)

executor.shutdown();

}
public void run(int numThreads) {

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topic, new Integer(numThreads));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer

.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads

executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.submit(new ConsumerMsgTask(stream, threadNumber));

threadNumber++;

}

}
private static ConsumerConfig createConsumerConfig(String a_zookeeper,

String a_groupId) {

Properties props = new Properties();

props.put(“zookeeper.connect”, a_zookeeper);

props.put(“group.id”, a_groupId);

props.put(“zookeeper.session.timeout.ms”, ”400″);

props.put(“zookeeper.sync.time.ms”, ”200″);

props.put(“auto.commit.interval.ms”, ”1000″);
return new ConsumerConfig(props);

}
public static void main(String[] arg) {

String[] args = { ”172.168.63.221:2188″, ”group-1″, ”page_visits”, ”12″ };

String zooKeeper = args[0];

String groupId = args[1];

String topic = args[2];

int threads = Integer.parseInt(args[3]);
ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);

demo.run(threads);
try {

Thread.sleep(10000);

} catch (InterruptedException ie) {
}

demo.shutdown();

}

}

消息处理类

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;
public class ConsumerMsgTask implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;
public ConsumerMsgTask(KafkaStream stream, int threadNumber) {

m_threadNumber = threadNumber;

m_stream = stream;

}
public void run() {

ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

while (it.hasNext())

System.out.println(“Thread ” + m_threadNumber + ”: ”

+ new String(it.next().message()));

System.out.println(“Shutting down Thread: ” + m_threadNumber);

}

}

Partitioner类示例

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;
public class PartitionerDemo implements Partitioner {

public PartitionerDemo(VerifiableProperties props) {
}

@Override

public int partition(Object obj, int numPartitions) {

int partition = 0;

if (obj instanceof String) {

String key=(String)obj;

int offset = key.lastIndexOf(‘.’);

if (offset > 0) {

partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;

}

}else{

partition = obj.toString().length() % numPartitions;

}
return partition;

}
}

参考

https://cwiki.apache.org/confluence/display/KAFKA/Index

https://kafka.apache.org/

来自:开源中国
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: