Spark整合kafka0.10.0新特性(一)
2017-03-12 11:21
363 查看
子曰:"温故而知新,可以为师矣。"学完长时间不使用不复习便会逐渐忘记,故做一下笔记!
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)整合kafka0.10.0新特性(API都在实验中)。
The Spark Streaming integration for Kafka 0.10和kafka0.8的Direct Stream approach非常相似,并行度Kafka分区和Spark分区的比例1:1,并且可以访问Kafka的偏移和元数据。然而,新的整合方案使用的是new Kafka consumer API 而不是 simple API,所以在使用过程中需要注意区别,这个版本的整合现在正处于experimental,因此API可能随着时间会有变化。Linking
For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see Linkingsectionin the main programming guide for further information).groupId = org.apache.spark artifactId = spark-streaming-kafka-0-10_2.11 version = 2.1.0
无论使用sbt还是maven,都需引入上面的坐标。
Creating a Direct Stream
注意导入包的路径是org.apache.spark.streaming.kafka010,切勿倒错包。import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092,anotherhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("topicA", "topicB") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext,//create entry point for all streaming functionality PreferConsistent,//important feature:preferConsistent是一个方法,是consumer调度分区的位置策略 Subscribe[String, String](topics, kafkaParams)//is also import feature :Subscribe是consumer的消费策略 ) stream.map(record => (record.key, record.value))重点解释一下 PreferConsistent方法,首先我们还是看一下PreferConsistent方法实现,源码如下:
package org.apache.spark.streaming.kafka010 import java.{util => ju} import scala.collection.JavaConverters._ import org.apache.kafka.common.TopicPartition import org.apache.spark.annotation.Experimental import org.apache.spark.streaming.kafka010.{LocationStrategies, LocationStrategy, PreferFixed} /** * :: Experimental :: * Choice of how to schedule consumers for a given TopicPartition on an executor. * See [[LocationStrategies]] to obtain instances. * Kafka 0.10 consumers prefetch messages, so it's important for performance * to keep cached consumers on appropriate executors, not recreate them for every partition. * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. * 实验性API: * 在executor上consumer如何调度给定的TopicPartition,使用LocationStrategies获取调度策略实例 * Kafka 0.10的消费者可以预取消息,因此对于性能来说在适合的executors上缓存consumers是比较重要的,而不是对每一个分区 * 进行重新创建。对于分区位置的选择只是一个偏好,并非是绝对的。分区可能被调度到其他位置 * * */ @Experimental sealed abstract class LocationStrategy /** * 使用PreferBrokers策略,必须是你的executors和kafka brokers在相同节点上。 */ private case object PreferBrokers extends LocationStrategy /** * 大多数情况下使用PreferConsistent需要一贯的将kafka的分区分布到所有的executors上 */ private case object PreferConsistent extends LocationStrategy /** * Use this to place particular TopicPartitions on particular hosts if your load is uneven. * Any TopicPartition not specified in the map will use a consistent location. * 默认情况如果分区加载的不均衡的话,可以使用这个策略:放置特定的分区到特定的主机上 * 任何TopicPartition没有和hosts映射的TopicPartition将会使用consistent location(就是安置到所有executor) * * 参数Map:就是TopicPartition和主机地址的映射 */ private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy /** * :: Experimental :: object to obtain instances of [[LocationStrategy]] * */ @Experimental object LocationStrategies { /** * :: Experimental :: * Use this only if your executors are on the same nodes as your Kafka brokers. */ @Experimental def PreferBrokers: LocationStrategy = org.apache.spark.streaming.kafka010.PreferBrokers /** * :: Experimental :: * Use this in most cases, it will consistently distribute partitions across all executors. */ @Experimental def PreferConsistent: LocationStrategy = org.apache.spark.streaming.kafka010.PreferConsistent /** * :: Experimental :: * Use this to place particular TopicPartitions on particular hosts if your load is uneven. * Any TopicPartition not specified in the map will use a consistent location. * 两个方法就是Map类型不一样而已,一个是Scala Map 另一个是Java Map * */ @Experimental def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) /** * :: Experimental :: * Use this to place particular TopicPartitions on particular hosts if your load is uneven. * Any TopicPartition not specified in the map will use a consistent location. */ @Experimental def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(hostMap) }
接下来学习一下Subscribe,上面Subscribe是伴生对象ConsumerStrategies的一个静态方法,ConsumerStrategies可以说是consumer消费策略的一个工厂。
接下来看一下 ConsumerStrategy,重点部门见源码注释, 源码如下:
package org.apache.spark.streaming.kafka010import java.{ lang => jl, util => ju }import scala.collection.JavaConverters._import org.apache.kafka.clients.consumer._import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListenerimport org.apache.kafka.common.TopicPartitionimport org.apache.spark.annotation.Experimentalimport org.apache.spark.internal.Logging/*** :: Experimental ::* Choice of how to create and configure underlying Kafka Consumers on driver and executors.* See [[ConsumerStrategies]] to obtain instances.* Kafka 0.10 consumers can require additional, sometimes complex, setup after object* instantiation. This interface encapsulates that process, and allows it to be checkpointed.* @tparam K type of Kafka message key* @tparam V type of Kafka message value** 选择如何创建和配置在Driver和Executor上的Kafka Consumer实例,使用ConsumerStrategies获取策略实例* Kafka 0.10 consumers 在对象被实例化之后需要额外的配置,有时候比较复杂,这个接口封装了那些过程* 并允许实例被checkpointed**/@Experimentalabstract class ConsumerStrategy[K, V] {/*** Kafka <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.*配置参数会在executors上被使用,需要设置bootstrap.servers指定Kafka的broker的主机和端口**/def executorKafkaParams: ju.Map[String, Object]/*** Must return a fully configured Kafka Consumer, including subscribed or assigned topics.* See <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Kafka docs</a>.* This consumer will be used on the driver to query for offsets only, not messages.* The consumer must be returned in a state that it is safe to call poll(0) on.* @param currentOffsets A map from TopicPartition to offset, indicating how far the driver* has successfully read. Will be empty on initial start, possibly non-empty on restart from* checkpoint.*返回一个必须是fully配置的Kafka Consumer,消费策略可以是subscribed或者assigned* consumer将会在driver上使用查询偏移而不是查询消息* 返回的consumer状态必须是安全的调用poll(0)方法* currentOffsets参数是一个TopicPartition到offset的映射map,可以标示:* driver已经成功读取多少信息,第一次启动时候可能是empty,在checkpoint中restart不为空**/def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]}/*** Subscribe to a collection of topics.* @param topics collection of topics to subscribe* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.** 订阅一组topic消息*kafkaParams参数:*配置参数将会在driver上使用,并且相同参数也会发送到executor上使用,在executor上可能略微的配置修改*通知需要设置bootstrap.servers指定broker**offsets参数:* offsets伴随着startup开始,如果对于TopicPartition没有指定offset,* 提交的offset或auto.offset.reset将会被使用***/private case class Subscribe[K, V](topics: ju.Collection[jl.String],kafkaParams: ju.Map[String, Object],offsets: ju.Map[TopicPartition, jl.Long]) extends ConsumerStrategy[K, V] with Logging {/***继承至ConsumerStrategy的方法,获取配置参数** @return*/def executorKafkaParams: ju.Map[String, Object] = kafkaParams/**** 继承至ConsumerStrategy的方法,用于创建Consumer* @return*/def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {val consumer = new KafkaConsumer[K, V](kafkaParams)consumer.subscribe(topics)val toSeek = if (currentOffsets.isEmpty) {offsets} else {currentOffsets}if (!toSeek.isEmpty) {// work around KAFKA-3370 when reset is none// poll will throw if no position, i.e. auto offset reset none and no explicit position// but cant seek to a position before poll, because poll is what gets subscription partitions// So, poll, suppress the first exception, then seekval aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"try {consumer.poll(0)} catch {case x: NoOffsetForPartitionException if shouldSuppress =>logWarning("Catching NoOffsetForPartitionException since " +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")}toSeek.asScala.foreach { case (topicPartition, offset) =>consumer.seek(topicPartition, offset)}// we've called poll, we must pause or next poll may consume messages and set positionconsumer.pause(consumer.assignment())}consumer}}/*** Subscribe to all topics matching specified pattern to get dynamically assigned partitions.* The pattern matching will be done periodically against topics existing at the time of check.* @param pattern pattern to subscribe to* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.** SubscribePattern是订阅所有匹配正则表达式模式的topic的数据* topic模式匹配将会周期性的进行匹配,而不是在检查已存在的topic时候进行模式匹配** kafkaParams参数:*配置参数将会在driver上使用,并且相同参数也会发送到executor上使用,在executor上可能略微的配置修改*通知需要设置bootstrap.servers指定broker**offsets参数:* offsets伴随着startup开始,如果对于TopicPartition没有指定offset,* 提交的offset或auto.offset.reset将会被使用***/private case class SubscribePattern[K, V](pattern: ju.regex.Pattern,kafkaParams: ju.Map[String, Object],offsets: ju.Map[TopicPartition, jl.Long]) extends ConsumerStrategy[K, V] with Logging {def executorKafkaParams: ju.Map[String, Object] = kafkaParamsdef onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {val consumer = new KafkaConsumer[K, V](kafkaParams)consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())val toSeek = if (currentOffsets.isEmpty) {offsets} else {currentOffsets}if (!toSeek.isEmpty) {// work around KAFKA-3370 when reset is none, see explanation in Subscribe aboveval aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"try {consumer.poll(0)} catch {case x: NoOffsetForPartitionException if shouldSuppress =>logWarning("Catching NoOffsetForPartitionException since " +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")}toSeek.asScala.foreach { case (topicPartition, offset) =>consumer.seek(topicPartition, offset)}// we've called poll, we must pause or next poll may consume messages and set positionconsumer.pause(consumer.assignment())}consumer}}/*** Assign a fixed collection of TopicPartitions* @param topicPartitions collection of TopicPartitions to assign* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.** 订阅一个固定数目的TopicPartitions* kafkaParams参数:*配置参数将会在driver上使用,并且相同参数也会发送到executor上使用,在executor上可能略微的配置修改*通知需要设置bootstrap.servers指定broker**offsets参数:* offsets伴随着startup开始,如果对于TopicPartition没有指定offset,* 提交的offset或auto.offset.reset将会被使用****/private case class Assign[K, V](topicPartitions: ju.Collection[TopicPartition],kafkaParams: ju.Map[String, Object],offsets: ju.Map[TopicPartition, jl.Long]) extends ConsumerStrategy[K, V] {def executorKafkaParams: ju.Map[String, Object] = kafkaParamsdef onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {val consumer = new KafkaConsumer[K, V](kafkaParams)consumer.assign(topicPartitions)val toSeek = if (currentOffsets.isEmpty) {offsets} else {currentOffsets}if (!toSeek.isEmpty) {// this doesn't need a KAFKA-3370 workaround, because partitions are known, no poll neededtoSeek.asScala.foreach { case (topicPartition, offset) =>consumer.seek(topicPartition, offset)}}consumer}}/*** :: Experimental ::* object for obtaining instances of [[ConsumerStrategy]]** 这个是订阅策略的工厂方法:用于实例化策略的****/@Experimentalobject ConsumerStrategies {/*** :: Experimental ::* Subscribe to a collection of topics.* @param topics collection of topics to subscribe* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.*/@Experimentaldef Subscribe[K, V](topics: Iterable[jl.String],kafkaParams: collection.Map[String, Object],offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {new Subscribe[K, V](new ju.ArrayList(topics.asJavaCollection),new ju.HashMap[String, Object](kafkaParams.asJava),new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))}/*** :: Experimental ::* Subscribe to a collection of topics.* @param topics collection of topics to subscribe* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.*/@Experimentaldef Subscribe[K, V](topics: Iterable[jl.String],kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {new Subscribe[K, V](new ju.ArrayList(topics.asJavaCollection),new ju.HashMap[String, Object](kafkaParams.asJava),ju.Collections.emptyMap[TopicPartition, jl.Long]())}/*** :: Experimental ::* Subscribe to a collection of topics.* @param topics collection of topics to subscribe* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.*/@Experimentaldef Subscribe[K, V](topics: ju.Collection[jl.String],kafkaParams: ju.Map[String, Object],offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {new Subscribe[K, V](topics, kafkaParams, offsets)}/*** :: Experimental ::* Subscribe to a collection of topics.* @param topics collection of topics to subscribe* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.*/@Experimentaldef Subscribe[K, V](topics: ju.Collection[jl.String],kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())}/** :: Experimental ::* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.* The pattern matching will be done periodically against topics existing at the time of check.* @param pattern pattern to subscribe to* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.*/@Experimentaldef SubscribePattern[K, V](pattern: ju.regex.Pattern,kafkaParams: collection.Map[String, Object],offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {new SubscribePattern[K, V](pattern,new ju.HashMap[String, Object](kafkaParams.asJava),new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))}/** :: Experimental ::* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.* The pattern matching will be done periodically against topics existing at the time of check.* @param pattern pattern to subscribe to* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.*/@Experimentaldef SubscribePattern[K, V](pattern: ju.regex.Pattern,kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {new SubscribePattern[K, V](pattern,new ju.HashMap[String, Object](kafkaParams.asJava),ju.Collections.emptyMap[TopicPartition, jl.Long]())}/** :: Experimental ::* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.* The pattern matching will be done periodically against topics existing at the time of check.* @param pattern pattern to subscribe to* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.*/@Experimentaldef SubscribePattern[K, V](pattern: ju.regex.Pattern,kafkaParams: ju.Map[String, Object],offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {new SubscribePattern[K, V](pattern, kafkaParams, offsets)}/** :: Experimental ::* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.* The pattern matching will be done periodically against topics existing at the time of check.* @param pattern pattern to subscribe to* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.*/@Experimentaldef SubscribePattern[K, V](pattern: ju.regex.Pattern,kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {new SubscribePattern[K, V](pattern,kafkaParams,ju.Collections.emptyMap[TopicPartition, jl.Long]())}/*** :: Experimental ::* Assign a fixed collection of TopicPartitions* @param topicPartitions collection of TopicPartitions to assign* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.*/@Experimentaldef Assign[K, V](topicPartitions: Iterable[TopicPartition],kafkaParams: collection.Map[String, Object],offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {new Assign[K, V](new ju.ArrayList(topicPartitions.asJavaCollection),new ju.HashMap[String, Object](kafkaParams.asJava),new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))}/*** :: Experimental ::* Assign a fixed collection of TopicPartitions* @param topicPartitions collection of TopicPartitions to assign* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.*/@Experimentaldef Assign[K, V](topicPartitions: Iterable[TopicPartition],kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {new Assign[K, V](new ju.ArrayList(topicPartitions.asJavaCollection),new ju.HashMap[String, Object](kafkaParams.asJava),ju.Collections.emptyMap[TopicPartition, jl.Long]())}/*** :: Experimental ::* Assign a fixed collection of TopicPartitions* @param topicPartitions collection of TopicPartitions to assign* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.* @param offsets: offsets to begin at on initial startup. If no offset is given for a* TopicPartition, the committed offset (if applicable) or kafka param* auto.offset.reset will be used.*/@Experimentaldef Assign[K, V](topicPartitions: ju.Collection[TopicPartition],kafkaParams: ju.Map[String, Object],offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {new Assign[K, V](topicPartitions, kafkaParams, offsets)}/*** :: Experimental ::* Assign a fixed collection of TopicPartitions* @param topicPartitions collection of TopicPartitions to assign* @param kafkaParams Kafka* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">* configuration parameters</a> to be used on driver. The same params will be used on executors,* with minor automatic modifications applied.* Requires "bootstrap.servers" to be set* with Kafka broker(s) specified in host1:port1,host2:port2 form.*/@Experimentaldef Assign[K, V](topicPartitions: ju.Collection[TopicPartition],kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {new Assign[K, V](topicPartitions,kafkaParams,ju.Collections.emptyMap[TopicPartition, jl.Long]())}}
本文到此结束,其余见Spark整合kafka0.10.0新特性(二)
补充:由于csdn博客编辑器偶尔出现对java代码排版出错问题,所以为了避免博文写的太长出错前功尽弃,所以将拆分多个博文讲解。
相关文章推荐
- Spark整合kafka0.10.0新特性(二)
- Spark Streaming和Kafka整合是如何保证数据零丢失
- Flume+Kafka+SparkStreaming整合
- 基于Java+SparkStreaming整合kafka编程
- Spark Streaming 1.3对Kafka整合的提升详解
- Spark Streaming + Kafka整合(Kafka broker版本0.8.2.1+)
- spark整合kafka案例
- Spark 和 kafka 集成 Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
- flume kafka sparkstreaming整合后spark executor dead 及集群报错java.io.IOException: Connection reset by peer
- zookeeper+kafka安装以及kafka+spark streaming 的简单整合
- Flume+Kafka+SparkStreaming 最新最全整合
- sparkstreaming整合kafka参数设置,message偏移量写入mysql
- 整合Kafka到Spark Streaming——代码示例和挑战
- SparkStreaming和Kafka的整合方式
- Kafka+Spark Streaming+Redis实时计算整合实践
- DCOS实践分享(4):如何基于DC/OS整合SMACK(Spark, Mesos, Akka, Cassandra, Kafka)
- <转>整合Kafka到Spark Streaming——代码示例和挑战
- sparkstreaming整合kafka参数设置,message偏移量写入redis
- Maven+Eclipse+SparkStreaming+Kafka整合
- 整合Kafka到Spark Streaming——代码示例和挑战