创建Kafka0.8.2生产者与消费者
2016-06-17 10:36
295 查看
一、下载安装Kafka0.8.2
二、vi config/server.properties
三、修改为advertised.host.name=192.168.1.76
四、rm -rf /tmp *移除临时目录下的文件
五、修改vi /etc/hosts中的127.0.0.1为192.168.1.76
六、开启zookeeper
七、开启kafka
bin/kafka-server-start.sh config/server.properties
八、创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.1.76:2181 --replication-factor 1 --partitions 1 --topic mytesttopic
九、开启消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.1.76:2181 --topic mytesttopic --from-beginning 回车
十、生产者代码(0.8.2.1的jar包)
十、消费者代码(0.8.2.1的jar包)
static void Main(string[] args)
{
//https://github.com/Jroland/kafka-net
//生产者
//var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));
//var router = new BrokerRouter(options);
//var client = new Producer(router);
//client.SendMessageAsync("mytesttopic", new[] { new Message("hello world") }).Wait();
//using (client) { }
//消费者
var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("mytesttopic", router));
//Consume returns a blocking IEnumerable (ie: never ending stream)
foreach (var message in consumer.Consume())
{
Console.WriteLine("Response: P{0},O{1} : {2}",
message.Meta.PartitionId, message.Meta.Offset,System.Text.Encoding.ASCII.GetString(message.Value));
}
Console.ReadLine();
}
注意版本,以及主机名。否则会出现莫名其妙的问题!
二、vi config/server.properties
三、修改为advertised.host.name=192.168.1.76
四、rm -rf /tmp *移除临时目录下的文件
五、修改vi /etc/hosts中的127.0.0.1为192.168.1.76
六、开启zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
七、开启kafka
bin/kafka-server-start.sh config/server.properties
八、创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.1.76:2181 --replication-factor 1 --partitions 1 --topic mytesttopic
九、开启消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.1.76:2181 --topic mytesttopic --from-beginning 回车
十、生产者代码(0.8.2.1的jar包)
import java.util.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class SimpleProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.76:9092"); properties.put("metadata.broker.list", "192.168.1.76:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("request.required.acks", "1"); KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties); for (int iCount = 0; iCount < 100; iCount++) { String message = "My Test Message No " + iCount; ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("mytesttopic", message); producer.send(record); } producer.close(); } }十一、查看结果
My Test Message No 0 My Test Message No 1 My Test Message No 2 My Test Message No 3 My Test Message No 4 My Test Message No 5 My Test Message No 6 My Test Message No 7 My Test Message No 8 My Test Message No 9 My Test Message No 10
...................
..................
十、消费者代码(0.8.2.1的jar包)
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import java.util.*; public class SimpleConsumerExample { private static kafka.javaapi.consumer.ConsumerConnector consumer; public static void consume() { Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "192.168.1.76:2181"); // group 代表一个消费组 props.put("group.id", "jd-group"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("mytesttopic", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream<String, String> stream = consumerMap.get("mytesttopic").get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) System.out.println(it.next().message()); } public static void main(String[] args) { consume(); } }十一、提供下C#版的代码
static void Main(string[] args)
{
//https://github.com/Jroland/kafka-net
//生产者
//var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));
//var router = new BrokerRouter(options);
//var client = new Producer(router);
//client.SendMessageAsync("mytesttopic", new[] { new Message("hello world") }).Wait();
//using (client) { }
//消费者
var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("mytesttopic", router));
//Consume returns a blocking IEnumerable (ie: never ending stream)
foreach (var message in consumer.Consume())
{
Console.WriteLine("Response: P{0},O{1} : {2}",
message.Meta.PartitionId, message.Meta.Offset,System.Text.Encoding.ASCII.GetString(message.Value));
}
Console.ReadLine();
}
注意版本,以及主机名。否则会出现莫名其妙的问题!
相关文章推荐
- ViewPager(2):ViewPager与Fragment一起使用
- xshell端口转发,plsql连接远程oracle
- warning: Missing file: XXX is missing from working copy 警告错误解决
- 我的第一篇文章
- JSON 数据格式
- 比SharedPrefereces更强大的缓存工具类
- mysql 中文乱码解决办法
- devpress grid表格自适应列宽的问题
- RabbitMQ 消息队列
- MySQL 生产环境 参数 配置
- 第二章 Spring MVC入门 —— 跟开涛学SpringMVC
- Java : BlockingQueue 阻塞队列
- C# WinForm下DataGridView绘制背景图
- Cognos 助力业务用户自由分析本地个人数据
- Java函数传递参数:值传递还是引用传递
- HIVE 用户自定义函数(UDF)
- Windows校验文件哈希hash的两种常用方式
- Unit Of Work的设计
- 完美解决 easyui 请求url时 缓存问题
- 使用jQuery的ajax调用action的例子