kafka生产者、消费者代码示例
2018-02-12 16:55
441 查看
1、生产者
2、消费者
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("metadata.broker.list","localhost:9092"); props.setProperty("serializer.class","kafka.serializer.StringEncoder"); props.put("request.required.acks","1"); ProducerConfig config = new ProducerConfig(props); //创建生产这对象 Producer<String, String> producer = new Producer<String, String>(config); //生成消息 KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka"); try { int i =1; while(i < 100){ //发送消息 producer.send(data); } } catch (Exception e) { e.printStackTrace(); } producer.close(); } } |
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class MyConsumer extends Thread{ //消费者连接 private final ConsumerConnector consumer; //要消费的话题 private final String topic; public MyConsumer(String topic) { consumer =kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); this.topic =topic; } 4000 //配置相关信息 private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); // props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181"); //配置要连接的zookeeper地址与端口 //The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster. //Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Group props.put("zookeeper.connect","localhost:2181"); //配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.) props.put("group.id", "0"); //配置zookeeper连接超时间隔 //The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for //ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages. props.put("zookeeper.session.timeout.ms","10000"); //The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs. props.put("zookeeper.sync.time.ms", "200"); //The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. //Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages. props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run(){ Map<String,Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1); Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap); KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0); ConsumerIterator<byte[],byte[]> it =stream.iterator(); System.out.println("*********Results********"); while(true){ if(it.hasNext()){ //打印得到的消息 System.err.println(Thread.currentThread()+" get data:" +new String(it.next().message())); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyConsumer consumerThread = new MyConsumer("mykafka"); consumerThread.start(); } } |
相关文章推荐
- kafka生产者消费者示例代码
- kafka生产者与消费者java代码示例
- kafka生产者和消费者的javaAPI的示例代码
- JAVA生产者消费者(线程同步)代码学习示例
- Kafka java api-消费者代码与消费分析、生产者消费者配置文件详解
- kafka 0.10.1.0 生产者消费者代码
- Java线程同步:生产者-消费者 模型(代码示例)
- kafka生产者、消费者java示例
- Kafka常用操作命令及生产者与消费者的代码实现
- Kafka生产者消费者java示例(包含Avro序列化)
- kafka 官方示例代码--消费者
- kafka 消费者代码示例
- Kafka的生产者和消费者代码解析
- Kafka的生产者和消费者代码解析
- 多生产者多消费者简单代码示例
- Java线程同步:生产者-消费者 模型(代码示例)
- Java 多线程生产者和消费者代码示例
- JAVA,生产者消费者代码示例
- kafka本地java示例生产者与消费者,非ZK版
- 生产者消费者问题代码示例