SODBASE CEP学习进阶篇(七):SODBASE CEP与Spark streaming集成
2015-11-07 17:19
465 查看
基于内存RDD的Spark框架相比Hadoop MapReduce框架有许多独特的优点,在越来越多项目中得到应用。Spark计算框架包括其Streaming组件,是批处理(Lamda架构中Batch Layer)的思路。若要在使用Spark的同时,
1)不修改Spark streaming代码和重启应用,实现多场景流式计算、规则管理
2)实现低延时关联模式实时分析
可将SODBASE CEP和Spark结合来使用。这样,可以方便地使用SODBASE EPL管理规则,也实现低延时,实现许多完全滑动窗口(非批滑动窗口)规则和复杂规则监测。
规则为Google的股票报价5000毫秒内出现了3次
通过SODBASE CEP运行此模型
也可以在Spark环境中运行此程序,将localhost改为SODBASE CEP运行的服务器IP地址
即可以接收SODBASE CEP传过来的流数据了
注:完全滑动窗口(非批量滑动窗口),例如监测一个人5分钟内登录系统三次则触发事件,是指他任意三次登录在5分钟完成内即满足规则,如三次登录的时间为10:50分30秒、10:53分25秒、10:55分05秒。而像批量滑动窗口如10:50~10:55,10:50~:11:00,或1分钟滑动量的10:50~10:55,10:51~10:57,10:52~10:57分批滑动窗口,Spark streaming都难以监测速度层(Speed Layer)这样的规则事件,需要和SODBASE CEP配合使用。
实际规则管理示例和Kafka运用 参考:
SODBASE
CEP学习进阶篇(七)续:SODBASE CEP与Spark streaming集成-规则管理
SODBASE CEP用于轻松、高效实施数据监测、监控类、实时交易类项目
。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。与Storm集成参见EPL与Storm集成。缓存扩展参见与分布式缓存集成。
1)不修改Spark streaming代码和重启应用,实现多场景流式计算、规则管理
2)实现低延时关联模式实时分析
可将SODBASE CEP和Spark结合来使用。这样,可以方便地使用SODBASE EPL管理规则,也实现低延时,实现许多完全滑动窗口(非批滑动窗口)规则和复杂规则监测。
示例操作步骤
本文通过实例介绍如何将SODBASE CEP的输出通过Spark streaming保存为HDFS文件。(1)使用SODBASE CEP的PubSub适配器输出数据
对应的适配器类为com.sodbase.outputadaptor.socket.pubsub.SocketPubSubStringOutputAdaptor,示例CEP模型如下规则为Google的股票报价5000毫秒内出现了3次
SELECT T1.name AS name,T1.price+T2.price+T3.price AS sumprice FROM T1:模拟股票,T2:模拟股票,T3:模拟股票 PATTERN T1;T2;T3 WHERE T1.name='Google' AND T2.name='Google'AND T3.name='Google' WITHIN 5000
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <GraphModelData> <CEPSoftwareVersion>2</CEPSoftwareVersion> <inputAdaptors> <inputAdaptorClassName>com.sodbase.inputadaptor.EventGeneratorInputAdaptor</inputAdaptorClassName> <adaptorParams>模拟股票</adaptorParams> <adaptorParams>1000</adaptorParams> <isExternal>false</isExternal> </inputAdaptors> <SODSQLs>CREATE QUERY socketouput SELECT T1.name AS name,T1.price+T2.price+T3.price AS sumprice FROM T1:模拟股票,T2:模拟股票,T3:模拟股票 PATTERN T1;T2;T3 WHERE T1.name='Google' AND T2.name='Google'AND T3.name='Google' WITHIN 5000 </SODSQLs> <outputAdaptors> <isOutputAsSelection>true</isOutputAsSelection> <outputAdaptorClassName>com.sodbase.outputadaptor.socket.pubsub.SocketPubSubStringOutputAdaptor</outputAdaptorClassName> <adaptorParams>19999</adaptorParams> <adaptorParams>-1</adaptorParams> <isExternal>false</isExternal> <queryName>socketouput</queryName> </outputAdaptors> <modelName>socketouput</modelName> <modelVersion>1.0</modelVersion> <modelDescription></modelDescription> </GraphModelData>
通过SODBASE CEP运行此模型
(2)Spark程序访问socket 19999端口,接收SODBASE CEP的数据
package example.streaming import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel object Main { def main(args:Array[String]) { val sparkConf = new SparkConf().setAppName("Main") val ssc = new StreamingContext(sparkConf, Seconds(60)) val lines = ssc.socketTextStream("localhost", 19999, StorageLevel.MEMORY_AND_DISK_SER) //lines.print lines.saveAsTextFiles("streamfile", "txt") ssc.start() ssc.awaitTermination() } }
也可以在Spark环境中运行此程序,将localhost改为SODBASE CEP运行的服务器IP地址
val lines = ssc.socketTextStream("ip", 19999, StorageLevel.MEMORY_AND_DISK_SER)
即可以接收SODBASE CEP传过来的流数据了
注:完全滑动窗口(非批量滑动窗口),例如监测一个人5分钟内登录系统三次则触发事件,是指他任意三次登录在5分钟完成内即满足规则,如三次登录的时间为10:50分30秒、10:53分25秒、10:55分05秒。而像批量滑动窗口如10:50~10:55,10:50~:11:00,或1分钟滑动量的10:50~10:55,10:51~10:57,10:52~10:57分批滑动窗口,Spark streaming都难以监测速度层(Speed Layer)这样的规则事件,需要和SODBASE CEP配合使用。
实际规则管理示例和Kafka运用 参考:
SODBASE
CEP学习进阶篇(七)续:SODBASE CEP与Spark streaming集成-规则管理
SODBASE CEP用于轻松、高效实施数据监测、监控类、实时交易类项目
。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。与Storm集成参见EPL与Storm集成。缓存扩展参见与分布式缓存集成。
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- 分布式版本管理git入门指南使用资料汇总及文章推荐
- Spark,一种快速数据分析替代方案
- Lua编程示例(一):select、debug、可变参数、table操作、error
- SQL学习笔记三 select语句的各种形式小结
- 一条select语句引起的瓶颈问题思考
- SQL Select语句完整的执行顺序
- mysql SELECT语句去除某个字段的重复信息
- 点击按钮后 文本框变为Select下拉列表框
- C#分布式事务的超时处理实例分析
- javascript 模拟select下拉列表特效
- javascript select options 排序(保持option 对象完整性)
- 用javascript和css模拟select的脚本
- js select常用操作控制代码
- mysql中insert与select的嵌套使用方法
- jquery的clone方法应用于textarea和select的bug修复
- Erlang分布式节点中的注册进程使用实例
- SQLServer中SELECT语句的执行顺序