kafka 0.8.x producer Example(scala)
2015-08-18 17:29
239 查看
Producer
最简配置metadata.broker.list参数指定broker地址,这里不需要填上所有的broker地址,但是如果只写一个,这个broker挂掉后就无法往topic中写入信息,一般写入2-3个broker地址。
serializer.class指定序列化的方式
props.put("metadata.broker.list","broker1:9092,broker2:9092,broker3:9092") props.put("serializer.class","kafka.serializer.StringEncoder")
producer
两个类型参数,第一个为partition key类型,第二个为消息类型
val producer = new Producer[String,String] (config)
发送消息
KeyedMessage的两个参数,第一个为要写入的topic名字,第二个为要写入的消息。
val date = new KeyedMessage[String, String] ("kafka-spark-test", "testInfo") producer.send (date)
完整代码
import java.util.Properties import kafka.javaapi.producer.Producer import kafka.producer.KeyedMessage import kafka.producer.ProducerConfig object kafka_producer { def main(args: Array[String]) { val props = new Properties() props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092") props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("request.required.acks", "1") val config = new ProducerConfig(props); val producer = new Producer[String, String](config) val date = new KeyedMessage[String, String]("kafka-spark-test", "testInfo") producer.send(date) producer.close } }
Tip
如果运行时发现如下错误:log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please initialize the log4j system properly.
将
log4j.properties加入到src下
相关文章推荐
- java,c#将秒转换为hh:MM:ss的实现
- Java EE_学习规划
- 使用最小堆来完成k路归并 6.5-8
- 多个动画合并使用
- 【华为OJ平台练习题】求最大公共子串的个数和元素
- Android 图片三级缓存之内存缓存(告别软引用(SoftRefrerence)和弱引用(WeakReference))
- 基于zabbix API添加监控主机
- HTTPS的证书未经权威机构认证的情况下,访问HTTPS站点的两种方法
- 二叉树的递归遍历
- 网络应用层——http协议
- ScheduledExecutorService定时周期执行指定的任务
- hdu 1867 A + B for you again
- Java之Exception
- 浅析python 中__name__ = '__main__' 的作用
- OC NSFileManager
- js框架
- Ceph:一种可扩展,高性能的分布式文件系统
- Unsupported major.minor version 51.0解决
- [Leetcode]Single Number III
- 2014阿里前端笔试题(关于弹性盒布局的实现)