您的位置:首页 > 编程语言 > Java开发

Kafka学习笔记-Java简单操作

2017-06-26 18:28 597 查看
转自:http://www.cnblogs.com/edison2012/p/5759223.html

Maven依赖包:

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>0.8.2.1</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.11</artifactId>

<version>0.8.2.1</version>

</dependency>

代码如下:

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class KafkaProducerTest {

private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);

private static Properties properties = null;

static {

properties = new Properties();

properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");

properties.put("producer.type", "sync");

properties.put("request.required.acks", "1");

properties.put("serializer.class", "kafka.serializer.DefaultEncoder");

properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");

properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

}

public void produce() {

KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);

ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(

"test", "kkk".getBytes(), "vvv".getBytes());

kafkaProducer.send(kafkaRecord, new Callback() {

public void onCompletion(RecordMetadata metadata, Exception e) {

if(null != e) {

LOG.info("the offset of the send record is {}", metadata.offset());

LOG.error(e.getMessage(), e);

}

LOG.info("complete!");

}

});

kafkaProducer.close();

}

public static void main(String[] args) {

KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();

for (int i = 0; i < 10; i++) {

kafkaProducerTest.produce();

}

}

}

import java.util.List;

import java.util.Map;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class KafkaConsumerTest {

private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);

public static void main(String[] args) {

Properties properties = new Properties();

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

"centos.master:9092,centos.slave1:9092,centos.slave2:9092");

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");

properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");

properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

"org.apache.kafka.common.serialization.ByteArrayDeserializer");

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

"org.apache.kafka.common.serialization.ByteArrayDeserializer");

KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);

kafkaConsumer.subscribe("test");

// kafkaConsumer.subscribe("*");

boolean isRunning = true;

while(isRunning) {

Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);

if (null != results) {

for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {

LOG.info("topic {}", entry.getKey());

ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();

List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();

for (int i = 0, len = records.size(); i < len; i++) {

ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);

LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());

try {

LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));

} catch (Exception e) {

LOG.error(e.getMessage(), e);

}

}

}

}

}

kafkaConsumer.close();

}

}

发现KafkaConsumer的poll方法未实现

@Override

public Map<String, ConsumerRecords<K,V>> poll(long timeout) {

// TODO Auto-generated method stub

return null;

}

后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.cluster.Broker;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.FetchResponse;

import kafka.javaapi.OffsetRequest;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.TopicMetadataResponse;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

public class KafkaSimpleConsumerTest {

private List<String> borkerList = new ArrayList<String>();

public KafkaSimpleConsumerTest() {

borkerList = new ArrayList<String>();

}

public static void main(String args[]) {

KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();

// 最大读取消息数量

long maxReadNum = Long.parseLong("3");

// 订阅的topic

String topic = "test";

// 查找的分区

int partition = Integer.parseInt("0");

// broker节点

List<String> seeds = new ArrayList<String>();

seeds.add("centos.master");

seeds.add("centos.slave1");

seeds.add("centos.slave2");

// 端口

int port = Integer.parseInt("9092");

try {

kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);

} catch (Exception e) {

System.out.println("Oops:" + e);

e.printStackTrace();

}

}

public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {

// 获取指定topic partition的元数据

PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);

if (metadata == null) {

System.out.println("can't find metadata for topic and partition. exit");

return;

}

if (metadata.leader() == null) {

System.out.println("can't find leader for topic and partition. exit");

return;

}

String leadBroker = metadata.leader().host();

String clientName = "client_" + topic + "_" + partition;

SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);

long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

int numErrors = 0;

while (maxReadNum > 0) {

if (consumer == null) {

consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);

}

FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();

FetchResponse fetchResponse = consumer.fetch(req);

if (fetchResponse.hasError()) {

numErrors++;

short code = fetchResponse.errorCode(topic, partition);

System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);

if (numErrors > 5)

break;

if (code == ErrorMapping.OffsetOutOfRangeCode()) {

readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);

continue;

}

consumer.close();

consumer = null;

leadBroker = findNewLeader(leadBroker, topic, partition, port);

continue;

}

numErrors = 0;

long numRead = 0;

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {

long currentOffset = messageAndOffset.offset();

if (currentOffset < readOffset) {

System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);

continue;

}

readOffset = messageAndOffset.nextOffset();

ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];

payload.get(bytes);

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));

numRead++;

maxReadNum--;

}

if (numRead == 0) {

try {

Thread.sleep(1000);

} catch (InterruptedException ie) {

}

}

}

if (consumer != null)

consumer.close();

}

/**

* 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker

* @param seedBrokers

* @param port

* @param topic

* @param partition

* @return

*/

private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {

PartitionMetadata partitionMetadata = null;

loop: for (String seedBroker : seedBrokers) {

SimpleConsumer consumer = null;

try {

consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");

List<String> topics = Collections.singletonList(topic);

TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);

TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);

List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();

for (TopicMetadata topicMetadata : topicMetadatas) {

for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {

if (pMetadata.partitionId() == partition) {

partitionMetadata = pMetadata;

break loop;

}

}

}

} catch (Exception e) {

System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);

} finally {

if (consumer != null)

consumer.close();

}

}

if (partitionMetadata != null) {

borkerList.clear();

for (Broker replica : partitionMetadata.replicas()) {

borkerList.add(replica.host());

}

}

return partitionMetadata;

}

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {

TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {

System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));

return 0;

}

long[] offsets = response.offsets(topic, partition);

return offsets[0];

}

private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {

for (int i = 0; i < 3; i++) {

boolean goToSleep = false;

PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);

if (metadata == null) {

goToSleep = true;

} else if (metadata.leader() == null) {

goToSleep = true;

} else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {

goToSleep = true;

} else {

return metadata.leader().host();

}

if (goToSleep) {

try {

Thread.sleep(1000);

} catch (InterruptedException ie) {

}

}

}

System.out.println("unable to find new leader after broker failure. exit");

throw new Exception("unable to find new leader after broker failure. exit");

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: