您的位置:首页 > 大数据 > Hadoop

SODBASE CEP学习进阶篇(七):SODBASE CEP与Spark streaming集成

2015-11-07 17:19 465 查看
基于内存RDD的Spark框架相比Hadoop MapReduce框架有许多独特的优点,在越来越多项目中得到应用。Spark计算框架包括其Streaming组件,是批处理(Lamda架构中Batch Layer)的思路。若要在使用Spark的同时,

1)不修改Spark streaming代码和重启应用,实现多场景流式计算、规则管理

2)实现低延时关联模式实时分析

可将SODBASE CEP和Spark结合来使用。这样,可以方便地使用SODBASE EPL管理规则,也实现低延时,实现许多完全滑动窗口(非批滑动窗口)规则和复杂规则监测。

示例操作步骤

本文通过实例介绍如何将SODBASE CEP的输出通过Spark streaming保存为HDFS文件。

(1)使用SODBASE CEP的PubSub适配器输出数据

对应的适配器类为com.sodbase.outputadaptor.socket.pubsub.SocketPubSubStringOutputAdaptor,示例CEP模型如下

规则为Google的股票报价5000毫秒内出现了3次

SELECT T1.name AS name,T1.price+T2.price+T3.price AS sumprice
FROM T1:模拟股票,T2:模拟股票,T3:模拟股票
PATTERN T1;T2;T3
WHERE T1.name='Google' AND T2.name='Google'AND T3.name='Google'
WITHIN 5000


<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<GraphModelData>
<CEPSoftwareVersion>2</CEPSoftwareVersion>
<inputAdaptors>
<inputAdaptorClassName>com.sodbase.inputadaptor.EventGeneratorInputAdaptor</inputAdaptorClassName>
<adaptorParams>模拟股票</adaptorParams>
<adaptorParams>1000</adaptorParams>
<isExternal>false</isExternal>
</inputAdaptors>
<SODSQLs>CREATE QUERY socketouput SELECT T1.name AS name,T1.price+T2.price+T3.price AS sumprice FROM T1:模拟股票,T2:模拟股票,T3:模拟股票 PATTERN T1;T2;T3  WHERE T1.name='Google' AND T2.name='Google'AND T3.name='Google'  WITHIN 5000 </SODSQLs>
<outputAdaptors>
<isOutputAsSelection>true</isOutputAsSelection>
<outputAdaptorClassName>com.sodbase.outputadaptor.socket.pubsub.SocketPubSubStringOutputAdaptor</outputAdaptorClassName>
<adaptorParams>19999</adaptorParams>
<adaptorParams>-1</adaptorParams>
<isExternal>false</isExternal>
<queryName>socketouput</queryName>
</outputAdaptors>
<modelName>socketouput</modelName>
<modelVersion>1.0</modelVersion>
<modelDescription></modelDescription>
</GraphModelData>


通过SODBASE CEP运行此模型

(2)Spark程序访问socket 19999端口,接收SODBASE CEP的数据

package example.streaming

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

object Main {
def main(args:Array[String])
{
val sparkConf = new SparkConf().setAppName("Main")
val ssc = new StreamingContext(sparkConf, Seconds(60))

val lines = ssc.socketTextStream("localhost", 19999, StorageLevel.MEMORY_AND_DISK_SER)
//lines.print
lines.saveAsTextFiles("streamfile", "txt")
ssc.start()
ssc.awaitTermination()

}

}


也可以在Spark环境中运行此程序,将localhost改为SODBASE CEP运行的服务器IP地址

val lines = ssc.socketTextStream("ip", 19999, StorageLevel.MEMORY_AND_DISK_SER)


即可以接收SODBASE CEP传过来的流数据了

注:完全滑动窗口(非批量滑动窗口),例如监测一个人5分钟内登录系统三次则触发事件,是指他任意三次登录在5分钟完成内即满足规则,如三次登录的时间为10:50分30秒、10:53分25秒、10:55分05秒。而像批量滑动窗口如10:50~10:55,10:50~:11:00,或1分钟滑动量的10:50~10:55,10:51~10:57,10:52~10:57分批滑动窗口,Spark streaming都难以监测速度层(Speed Layer)这样的规则事件,需要和SODBASE CEP配合使用。

实际规则管理示例和Kafka运用 参考:

SODBASE
CEP学习进阶篇(七)续:SODBASE CEP与Spark streaming集成-规则管理

SODBASE CEP用于轻松、高效实施数据监测、监控类、实时交易类项目

。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。与Storm集成参见EPL与Storm集成。缓存扩展参见与分布式缓存集成
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息