kafka+Structured Streaming+s3+dynamodb
2017-08-30 17:16
786 查看
本文主要介绍从kafka消费数据,并通过两个业务需求(统计PV,UV),然后分别输出到dynamodb和S3的demo,demo仅做演示逻辑,无法直接使用
def main(args: Array[String]) { val bucket:String = _ val pvCheckLocation:String = _ val uvCheckLocation:String = _ val uvPath:String = _ //create spark session val spark = SparkSession.builder() .master("local[*]") .config("spark.eventLog.enabled", "false") .config("spark.driver.memory", "2g") .config("spark.executor.memory", "2g") .config("spark.sql.shuffle.partitions","4") .appName("kafkaDemoToS3AndDynamoDB") .getOrCreate() import spark.implicits._ //read stream from kafka val kafkaStream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .option("startingOffsets", "latest") .option("failOnDataLoss", false) .option("maxOffsetsPerTrigger", 10L) .load() .selectExpr("CAST(value AS STRING)", "CAST(topic as STRING)", "CAST(partition as INTEGER)") .as[(String, String, Integer)] val cols = List("domain", "ip", "timestamp") //parse data to entity val df = kafkaStream.map { line => val columns = line._1.split(" ") (columns(0), columns(1), columns(2)) }.toDF(cols:_*) //all logs val ds = df.distinct().select($"domain", $"ip", $"timestamp").as[ClickStream] //compute pv val dsPV = ds.map(x => (x.domain,(x.ip,x.timestamp))).groupBy(x.domain)....count() //compute uv val dsUV = ds.map(x => (x.domain,x.ip)).groupBy(x.domain).....count() //save pv to dynamodb val queryPVCount = dsPV.writeStream .outputMode("update") .trigger(Trigger.ProcessingTime("10 seconds")) .option("checkpointLocation", pvCheckLocation) .foreach(new ForeachWriter[Row] { var ddbClient: AmazonDynamoDBClient = _ override def open(partitionId: Long, version: Long) = { ddbClient = new dynamodbv2.AmazonDynamoDBClient() true } override def process(value: Row) = { val put = new PutItemRequest() put.setTableName("table") var attr = new AttributeValue() attr.setS(value.getAs[String](0)) put.addItemEntry("domain", attr) attr = new AttributeValue() attr.setN(value.getAs[Long](1).toString) put.addItemEntry("pv", attr) ddbClient.putItem(put) } override def close(errorOrNull: Throwable) = { } } ).start() //save uv to s3 val queryUVCount = dsUV.writeStream .outputMode("update") .trigger(Trigger.ProcessingTime("10 seconds")) .option("checkpointLocation", uvCheckLocation) .start("s3://" + bucket+ "/" + uvPath) queryPVCount.awaitTermination() queryUVCount.awaitTermination() } //entity case class ClickStream(domain:String, ip:String,timestamp:Long)
相关文章推荐
- Amazon DynamoDB的模型
- DynamoDB启动本地版本和python-sdk使用示例
- dynamodb 分区键排序键介绍
- Amazon AWS亚马逊云服务新服务简介(DynamoDB, Redshift,Kinesis)
- 连接AWS的DynamoDB例子(scala版)
- 新的免费教学视频和实验 - DynamoDB, Elastic Beanstalk 和 Elastic MapReduce 介绍
- boto3 dynamodb 入门使用
- Amazon DynamoDB简介(一)
- dynamoDB数据库
- DynamoDB了解(未完待续)
- js连接AWS DynamoDB数据库实现CEUD操作
- DynamoDB
- DynamoDB常见问题
- Aws Dynamodb数据导出到S3
- python--boto3 之 与dynamoDB 的基本交互,表的备份与恢复
- 从S3中导入数据到Dynamodb
- dynamodb:tried to access class com.amazonaws.services.dynamodbv2.document.internal.IteratorSupport
- aws 代码扫描所有dynamoDB数据返回ScanItemResult格式
- aws 的dynamodb 通过case class方式保存
- Nodejs课堂笔记-第四课 Dynamodb为何物