如何在spark-streaming中获取通过kafka传递的flume信息header
2016-12-27 16:58
363 查看
标题好长。。。好复杂。。。
flume+kafka+spark-streaming,应该说这一套架构已经成为流式计算的标配了。
如何整合我这里就不赘述了。
上几个配置文件好了
总体的流程就是 flume会tail文件》将tail到的内容输出到Kafka中》spark订阅topic
默认模式下,Spark-streaming只能拿到flume tail到的文字,但是某些情况下我们希望spark也能处理一些header中的内容。
譬如说上述配置文件中在tail的同时,我们还像header中添加了timestamp以及host信息。
核心代码有两段:
第一点就是flume配置文件中
该配置文件使得flume将整个信息封装成一个flumeEvent发送给kafka,这个flumeEvent就包含了header信息,默认是不包含的.
第二点就是spark在接收数据的时候
接收的是一个byte数组,默认是string类型。
第三点就是讲byte数组转化成flumeEvent.
flume+kafka+spark-streaming,应该说这一套架构已经成为流式计算的标配了。
如何整合我这里就不赘述了。
上几个配置文件好了
a1.sources = r1 a1.channels = c1 a1.sinks=k1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /work/onedesk/bidlog/apache-flume-1.7.0-bin/taildir_position.json a1.sources.r1.filegroups = f1 f2 f3 a1.sources.r1.filegroups.f1 = /work/onedesk/bidlog/bid.tmp a1.sources.r1.headers.f1.topic = bid a1.sources.r1.filegroups.f2 = /work/onedesk/bidlog/sspbid.tmp a1.sources.r1.headers.f2.topic = sspbid a1.sources.r1.filegroups.f3 = /work/onedesk/bidlog/sspclick.tmp a1.sources.r1.headers.f3.topic = sspclick a1.sources.r1.fileHeader = true a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 1000 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = bidlog a1.sinks.k1.kafka.bootstrap.servers = 192.168.60.15:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.useFlumeEventFormat = true a1.sinks.k1.channel=c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = host a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sources.r1.interceptors.i2.type = timestamp
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "192.168.60.15:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[BytesDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val topics = Array("sspclick") val sspclick = KafkaUtils.createDirectStream[String, Bytes]( ssc, PreferConsistent, Subscribe[String, Bytes](topics, kafkaParams))
总体的流程就是 flume会tail文件》将tail到的内容输出到Kafka中》spark订阅topic
默认模式下,Spark-streaming只能拿到flume tail到的文字,但是某些情况下我们希望spark也能处理一些header中的内容。
譬如说上述配置文件中在tail的同时,我们还像header中添加了timestamp以及host信息。
核心代码有两段:
第一点就是flume配置文件中
a1.sinks.k1.useFlumeEventFormat = true
该配置文件使得flume将整个信息封装成一个flumeEvent发送给kafka,这个flumeEvent就包含了header信息,默认是不包含的.
第二点就是spark在接收数据的时候
"value.deserializer" -> classOf[BytesDeserializer],
val sspclick = KafkaUtils.createDirectStream[String, Bytes](
接收的是一个byte数组,默认是string类型。
第三点就是讲byte数组转化成flumeEvent.
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.kafka.common.serialization.BytesDeserializer import org.apache.avro.specific.SpecificDatumReader import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.avro.io.DecoderFactory import org.apache.kafka.common.utils.Bytes import org.apache.commons.lang3.StringUtils import org.apache.avro.util.Utf8 import java.text.SimpleDateFormat import java.sql.Timestamp import java.sql.Date import org.apache.avro.util.Utf8 import org.apache.kafka.common.serialization.BytesDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import scala.reflect.runtime.universe import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SaveMode import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val reader = new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent])
val a = sspclick.map(f => { var body = f.value().get var decoder = DecoderFactory.get().binaryDecoder(body, null); var result = reader.read(null, decoder); var hostname = result.getHeaders.get(new Utf8("hostname")) var text = new String(result.getBody.array()) var array = text.split("\t")text就是tail中的内容。
相关文章推荐
- SparkStreaming通过Kafka获取数据(Receiver方式)
- sparkstreaming + kafka如何保证数据不丢失、不重复
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- 如何管理Spark Streaming消费Kafka的偏移量(一)
- spark streaming 通过log4j 统一输出日志到kafka
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- Sparkstreaming基于kafka以Receiver方式获取数据原理和案例实战
- 如何管理Spark Streaming消费Kafka的偏移量(三)
- Spark Streaming和Kafka整合是如何保证数据零丢失
- 如何管理Spark Streaming消费Kafka的偏移量(一)
- 第117课: Spark Streaming性能优化:如何最大程度的确保Spark Cluster和Kafka链接的稳
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- 如何管理Spark Streaming消费Kafka的偏移量(二)
- 通过url传递参数时,如何获取全部参数
- 第90讲,Spark streaming基于kafka 以Receiver方式获取数据 原理和案例实战
- Spark-Streaming获取kafka数据的两种方式:Receiver与Direct的方式
- 大数据IMF传奇行动绝密课程第117课:Spark Streaming性能优化:如何最大程度的确保Spark Cluster和Kafka连接的稳定性
- 如何管理Spark Streaming消费Kafka的偏移量(三)
- Spark视频第6期:无需等待的实时计算时代如何在90分钟内通过Spark Streaming掌握大数据实时计算和流处理?
- Spark Streaming获取kafka数据的两种方式