您的位置:首页 > 编程语言 > Java开发

kafka本地java示例生产者与消费者,非ZK版

2017-09-03 15:49 441 查看
首先引入POM文件内容:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.10.2.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>

生产者代码:

import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.Executors;

public class MessageProducer {

private Producer<String,String> producer;

public static void main(String[] args) {
new MessageProducer().start();
}

public void init(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.8.15:9092,192.168.8.16:9092,192.168.8.17:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String,String>(props);
}

public void produceMsg(){
// 构建发送的消息
long timestamp = System.currentTimeMillis();
String msg = "Msg" + timestamp;
String topic = "test"; // 确保有这个topic
System.out.println("发送消息" + msg);
String key = "Msg-Key" + timestamp;

ProducerRecord<String, String> data = new ProducerRecord<String, String>("shuaige1", key, msg);
producer.send(data);
}

public void start() {
System.out.println("开始发送消息 ...");
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
init();
while (true) {
try {
produceMsg();
Thread.sleep(1000);
} catch (Throwable e) {
if (producer != null) {
try {
producer.close();
} catch (Throwable e1) {
System.out.println("Turn off Kafka producer error! " + e);
}
}
}

}
}
});
}
}


消费者代码:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class MessageConsumer {

public static void main(String[] args) {
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.15:9092,192.168.8.16:9092,192.168.8.17:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("shuaige1"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: