您的位置:首页 > 数据库 > Mongodb

Kafka->Spark Streaming->mongodb

2017-11-15 10:28 489 查看
项目主要做实时,从kafka拉数据进行清洗保存到mongodb(注意:mongodb的save方法是根据_id替换整个事件),用空间数据查询geo

环境(maven管理):

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.6</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.4</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.1</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>

<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.10</artifactId>
<version>3.
e4ff
1.1</version>
</dependency>


代码如下:

import com.mongodb.MongoClientOptions.Builder
import com.mongodb.casbah.commons.{Imports, MongoDBObject}
import com.mongodb._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.bson.{BSONObject, BasicBSONObject}

import scala.language.postfixOps

/**
* Created by Administrator on 2016/9/6.
*/
object KafkaAndMongodb {

val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])]) =>{
iter.flatMap{case (x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
}

def main(args: Array[String]) {
LoggerLevels.setStreamingLogLevels()
//转入参数 ,host,dbname,collection

//zkQurnum,group,topic,numThreads,cachepath,host,dbname,collection
//shizhanmini:2181,mini02:2181,mini03:2181 rto db100 2 c://ck2 192.168.199.7 spark oc
var Array(zkQurnum,group,topic,numThreads,cachepath,host,dbname,collection)=args
//创建sparkconf  测试本地模式
val sparkConf= new SparkConf().setAppName("KafkaAndMongodb")
.setMaster("local[2]")
//创建流式处理对象
val ssc= new StreamingContext(sparkConf,Seconds(3))
ssc.checkpoint(cachepath)
//将topic转为map
val topicMap= topic.split(",").map((_,numThreads.toInt)).toMap

//根据参数,创建Stream
val lines= KafkaUtils.createStream(ssc,zkQurnum,group,topicMap,StorageLevel.MEMORY_AND_DISK).map(_._2)
val users = lines.map(x => (x.split(",")(1),x.split(",")(3),x.split(",")(6),x.split(",")(7)))
//初始化mongo
//val db= getMongo(host,dbname)

users.print()

users.foreachRDD(rdd => {
var counts:Int=0
rdd.foreachPartition(par =>{
//val db= MongodbUtils.getMongo()
val db= getMongo(host,dbname)
if(!par.isEmpty) {
par.foreach(pair => {
val id= pair._1
val vname = pair._2
val lat=pair._3.toFloat
val lon=pair._4.toFloat
// println(lat+"-----------geo-----------"+lon)

val dBObject: Imports.DBObject = MongoDBObject("_id"->id,"name" -> vname,"geo" -> Array(lat,lon))
db.getCollection(collection).save(dBObject)
})

}
})

})

ssc.start()
ssc.awaitTermination()

}

def getMongo(host:String,dbname:String): DB= {
//初始化线程池
val builder: Builder = MongoClientOptions.builder()
builder.connectionsPerHost(500);// 与目标数据库可以建立的最大链接数
builder.connectTimeout(1000 * 60 * 20);// 与数据库建立链接的超时时间
builder.maxWaitTime(100 * 60 * 5);// 一个线程成功获取到一个可用数据库之前的最大等待时间
builder.threadsAllowedToBlockForConnectionMultiplier(100);
builder.maxConnectionIdleTime(0);
builder.maxConnectionLifeTime(0);
builder.socketTimeout(0);
builder.socketKeepAlive(true);
val options: MongoClientOptions = builder.build()
var db:DB=null;
var mongo:MongoClient=null;
//判断参数为空,否则用默认
if(host.isEmpty){
mongo= new MongoClient("192.168.30.170",options)
}else{
mongo= new MongoClient(host,options)
}
//判断参数为空,否则用默认
if(dbname.isEmpty){
db = mongo.getDB("spark")
}else{
db = mongo.getDB(dbname)
}

db

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: