您的位置:首页 > 其它

创建Kafka0.8.2生产者与消费者

2016-06-17 10:36 295 查看
一、下载安装Kafka0.8.2



二、vi config/server.properties

三、修改为advertised.host.name=192.168.1.76

四、rm -rf  /tmp *移除临时目录下的文件

五、修改vi /etc/hosts中的127.0.0.1为192.168.1.76



六、开启zookeeper 

bin/zookeeper-server-start.sh config/zookeeper.properties


七、开启kafka
bin/kafka-server-start.sh config/server.properties



八、创建主题

bin/kafka-topics.sh --create --zookeeper 192.168.1.76:2181 --replication-factor 1 --partitions 1 --topic mytesttopic

九、开启消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.1.76:2181 --topic mytesttopic --from-beginning 回车

十、生产者代码(0.8.2.1的jar包)
import java.util.*;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.76:9092");
properties.put("metadata.broker.list", "192.168.1.76:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "1");

KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
for (int iCount = 0; iCount < 100; iCount++) {
String message = "My Test Message No " + iCount;
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("mytesttopic", message);
producer.send(record);
}
producer.close();
}
}
十一、查看结果



My Test Message No 0
My Test Message No 1
My Test Message No 2
My Test Message No 3
My Test Message No 4
My Test Message No 5
My Test Message No 6
My Test Message No 7
My Test Message No 8
My Test Message No 9
My Test Message No 10
...................
..................

十、消费者代码(0.8.2.1的jar包)

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.*;
public class SimpleConsumerExample {

private static kafka.javaapi.consumer.ConsumerConnector consumer;

public static void consume() {

Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "192.168.1.76:2181");

// group 代表一个消费组
props.put("group.id", "jd-group");

// zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
// 序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");

ConsumerConfig config = new ConsumerConfig(props);

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("mytesttopic", new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get("mytesttopic").get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext())
System.out.println(it.next().message());
}

public static void main(String[] args) {
consume();
}
}
十一、提供下C#版的代码
static void Main(string[] args)
{
//https://github.com/Jroland/kafka-net
//生产者
//var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));
//var router = new BrokerRouter(options);
//var client = new Producer(router);

//client.SendMessageAsync("mytesttopic", new[] { new Message("hello world") }).Wait();

//using (client) { }

//消费者
var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("mytesttopic", router));

//Consume returns a blocking IEnumerable (ie: never ending stream)
foreach (var message in consumer.Consume())
{
Console.WriteLine("Response: P{0},O{1} : {2}",
message.Meta.PartitionId, message.Meta.Offset,System.Text.Encoding.ASCII.GetString(message.Value));
}
Console.ReadLine();
}

注意版本,以及主机名。否则会出现莫名其妙的问题!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: