您的位置:首页 > 数据库

Spark-Streaming与Spark-Sql整合实现实时股票排行---通过kafka列队数据

2017-03-18 23:45 411 查看
Spark-Streaming与Spark-Sql整合实现实时股票排行---通过kafka列队数据,前端数据通过 kafka队列传递,外层还有flume的实时收集。

1、mvn构建工程,指定好依赖的库,这里用的是spark1.4.1

[html] view
plain copy

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hexun</groupId>

<artifactId>spark-streaming-java</artifactId>

<version>0.0.1-SNAPSHOT</version>

<packaging>jar</packaging>

<name>spark-study-java</name>

<url>http://maven.apache.org</url>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

</properties>

<dependencies>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>3.8.1</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.10</artifactId>

<version>1.4.1</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.10</artifactId>

<version>1.4.1</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-hive_2.10</artifactId>

<version>1.4.1</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.10</artifactId>

<version>1.4.1</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.6.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka_2.10</artifactId>

<version>1.4.1</version>

</dependency>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.6</version>

</dependency>

</dependencies>

<build>

<sourceDirectory>src/main/java</sourceDirectory>

<testSourceDirectory>src/main/test</testSourceDirectory>

<plugins>

<plugin>

<artifactId>maven-assembly-plugin</artifactId>

<configuration>

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

<archive>

<manifest>

<mainClass></mainClass>

</manifest>

</archive>

</configuration>

<executions>

<execution>

<id>make-assembly</id>

<phase>package</phase>

<goals>

<goal>single</goal>

</goals>

</execution>

</executions>

</plugin>

<plugin>

<groupId>org.codehaus.mojo</groupId>

<artifactId>exec-maven-plugin</artifactId>

<version>1.2.1</version>

<executions>

<execution>

<goals>

<goal>exec</goal>

</goals>

</execution>

</executions>

<configuration>

<executable>java</executable>

<includeProjectDependencies>true</includeProjectDependencies>

<includePluginDependencies>false</includePluginDependencies>

<classpathScope>compile</classpathScope>

</configuration>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<configuration>

<source>1.6</source>

<target>1.6</target>

</configuration>

</plugin>

</plugins>

</build>

</project>

2、实现过程:

1)通过kafka队列获取数据,createDirectStream实现

2)映射pojo与数据的关系,注册成sparksql的表

3)实现sql中的函数,这里大部分的函数都要自己实现udf,甚至length简单的函数

4) 编写sql语实现

5)保存入MySQL数据,供前端展示

具体代码如下(Scala版本):

[java] view
plain copy

package com.hexun.streaming

import java.sql.{DriverManager, Connection}

import java.util.Date

import java.util.regex.Pattern

import kafka.serializer.StringDecoder

import org.apache.commons.lang.time.DateFormatUtils

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.{SparkContext, SparkConf}

import scala.collection.mutable

import scala.collection.immutable.ListMap

/**

* Created by Administrator on 2015/11/26.

*/

object StockCntSumKafkaLPcnt {

case class Tracklog(dateday: String, datetime: String, ip: String, cookieid: String, userid: String, logserverip: String, referer: String, requesturl: String, remark1: String,

remark2: String, alexaflag: String, ua: String)

def main(args: Array[String]) {

val smap = new mutable.HashMap[String, Integer]()

val url = "jdbc:mysql://10.130.3.211:3306/charts"

val user = "dbcharts"

val password = "Abcd1234"

val conf = new SparkConf().setAppName("stocker") //.setMaster("local[2]")

val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(15))

// Kafka configurations

val topics = Set("teststreaming")

val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092"

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

// Create a direct stream

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {

Some(line.toString())

})

try {

val tmpdf = events.map(_.split(" ")).filter(_.length >= 11).map(x => Tracklog(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10), x(11)))

tmpdf.foreachRDD { rdd =>

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

val df = rdd.toDF().registerTempTable("tracklog")

sqlContext.udf.register("strLen", (str: String) => str.length())

sqlContext.udf.register("concat", (str1: String, str2: String, str3: String) => str1 + str2 + str3)

sqlContext.udf.register("regexp_extract", (str: String, pattern: String) => {

val matcher = Pattern.compile(pattern, 1).matcher(str)

var res = ""

while (matcher.find()) {

res = matcher.group()

}

res

})

val rcount = sqlContext.sql("SELECT substring(t.requesturl,strLen(regexp_extract(t.requesturl,'(.*?[^0-9][0|3|6][0][0-9][0-9][0-9][0-9]).*?'))-5,6) stock_code," +

"concat('http://stockdata.stock.hexun.com/', substring(t.requesturl,strLen(regexp_extract(t.requesturl,'(.*?[^0-9][0|3|6][0][0-9][0-9][0-9][0-9]).*?'))-5,6),'.shtml') url," +

"count(*) clickcnt " +

"FROM " +

"(select distinct dateday,datetime,ip,cookieid,userid,logserverip,referer,requesturl,remark1,remark2,alexaflag,ua from tracklog where strLen(datetime)=12) t " +

"WHERE " +

"regexp_extract(t.requesturl,'(.*?[^0-9][0|3|6][0][0-9][0-9][0-9][0-9]).*?') <>'' " +

"and t.requesturl like 'http://stockdata.stock.hexun.com/%shtml' " +

"and t.requesturl not like '%index%' " +

"and t.requesturl not like '%fund%' " +

"group by substring(t.requesturl,strLen(regexp_extract(t.requesturl,'(.*?[^0-9][0|3|6][0][0-9][0-9][0-9][0-9]).*?'))-5,6) " +

"order by clickcnt desc " +

"limit 150")

var flag:Int = 0

rcount.collect().foreach(data => {

flag = 1;

val stockerId = data.get(0).toString;

val cnt = smap.get(stockerId)

println("stockerId: " + stockerId + ", cnt:" + cnt)

if (cnt == null || cnt.toString.equals("None")) {

smap += (stockerId -> Integer.parseInt(data.get(2).toString))

} else if (cnt != null && !cnt.toString.equals("None")) {

val cntI = smap(stockerId)

val sum: Integer = Integer.parseInt(data.get(2).toString) + cntI

smap += (stockerId -> sum)

}

})

if(flag == 1){

// sort by value

var idx: Int = 1

val sortMap = ListMap(smap.toSeq.sortWith(_._2 > _._2): _*)

val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss")

val conn: Connection = DriverManager.getConnection(url, user, password)

val pstat = conn.prepareStatement("INSERT INTO stock_realtime_analysis_spark (stockId,url,clickcnt,type,recordtime) VALUES (?,?,?,?,?)")

sortMap foreach {

case (key, value) =>

if (idx <= 150) {

println(key + ",http://stockdata.stock.hexun.com/" + key + ".shtml," + value + "," + stattime)

pstat.setString(1, key)

pstat.setString(2, "http://stockdata.stock.hexun.com/" + key + ".shtml")

pstat.setInt(3, value)

pstat.setString(4, "01")

pstat.setString(5, stattime)

pstat.executeUpdate()

}

idx = idx + 1

}

pstat.close

conn.close

flag == 0

}

}

} catch {

case e: Exception =>

}

ssc.start()

ssc.awaitTermination()

}

}

3、任务提交执行脚本如下:

[plain] view
plain copy

#!/bin/bash

source /etc/profile

stocker=`ps -ef | grep spark |grep SparkStreaming.jar | awk '{print $2}'`

echo $stocker

kill -9 $stocker

nohup /opt/modules/spark/bin/spark-submit \

--master spark://10.130.2.20:7077 \

--driver-memory 3g \

--executor-memory 3g \

--total-executor-cores 24 \

--conf spark.ui.port=56689 \

--jars /opt/bin/sparkJars/kafka_2.10-0.8.2.1.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar,/opt/bin/sparkJars/metrics-core-2.2.0.jar,/opt/bin/sparkJars/mysql-connector-java-5.1.26-bin.jar,/opt/bin/sparkJars/spark-streaming-

kafka_2.10-1.4.1.jar \

--class com.hexun.streaming.StockCntSumKafkaLPcnt \

/opt/bin/UDF/SparkStreaming.jar \

>/opt/bin/initservice/stock.log 2>&1 & \
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark spark streaming