您的位置:首页 > 其它

kafka+Structured Streaming+s3+dynamodb

2017-08-30 17:16 786 查看
本文主要介绍从kafka消费数据,并通过两个业务需求(统计PV,UV),然后分别输出到dynamodb和S3的demo,demo仅做演示逻辑,无法直接使用

def main(args: Array[String]) {
val bucket:String = _
val pvCheckLocation:String = _
val uvCheckLocation:String = _
val uvPath:String = _
//create spark session
val spark = SparkSession.builder()
.master("local[*]")
.config("spark.eventLog.enabled", "false")
.config("spark.driver.memory", "2g")
.config("spark.executor.memory", "2g")
.config("spark.sql.shuffle.partitions","4")
.appName("kafkaDemoToS3AndDynamoDB")
.getOrCreate()
import spark.implicits._
//read stream from kafka
val kafkaStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.option("maxOffsetsPerTrigger", 10L)
.load()
.selectExpr("CAST(value AS STRING)",
"CAST(topic as STRING)",
"CAST(partition as INTEGER)")
.as[(String, String, Integer)]
val cols = List("domain", "ip", "timestamp")
//parse data to entity
val df = kafkaStream.map { line =>
val columns = line._1.split(" ")
(columns(0), columns(1), columns(2))
}.toDF(cols:_*)

//all logs
val ds = df.distinct().select($"domain", $"ip", $"timestamp").as[ClickStream]

//compute pv
val dsPV = ds.map(x => (x.domain,(x.ip,x.timestamp))).groupBy(x.domain)....count()
//compute uv
val dsUV = ds.map(x => (x.domain,x.ip)).groupBy(x.domain).....count()

//save pv to dynamodb
val queryPVCount = dsPV.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("checkpointLocation", pvCheckLocation)
.foreach(new ForeachWriter[Row] {
var ddbClient: AmazonDynamoDBClient = _

override def open(partitionId: Long, version: Long) = {
ddbClient = new dynamodbv2.AmazonDynamoDBClient()
true
}

override def process(value: Row) = {
val put = new PutItemRequest()
put.setTableName("table")
var attr = new AttributeValue()
attr.setS(value.getAs[String](0))
put.addItemEntry("domain", attr)
attr = new AttributeValue()
attr.setN(value.getAs[Long](1).toString)
put.addItemEntry("pv", attr)
ddbClient.putItem(put)
}

override def close(errorOrNull: Throwable) = {
}
}
).start()

//save uv to s3
val queryUVCount = dsUV.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("checkpointLocation", uvCheckLocation)
.start("s3://" + bucket+ "/" +  uvPath)

queryPVCount.awaitTermination()
queryUVCount.awaitTermination()
}
//entity
case class ClickStream(domain:String, ip:String,timestamp:Long)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息