SODBASE CEP学习进阶篇(七)续:SODBASE CEP与Spark streaming集成-低延迟规则管理
2015-12-01 11:24
549 查看
许多大数据平台项目采用流式计算来处理实时数据,会涉及到一个环节:处理规则管理。因为用户经常有自己配置数据处理规则或策略的需求。同时,维护人员来也有也有将规则提取出来的需求,方便变更和维护的需求。我们知道Spark streaming作为数据归档备份时吞吐量高,与Hadoop集成相对方便。但是Spark streaming也存在高延时,框架过重带来策略规则修改复杂的问题。本文介绍Spark streaming加SODBASE SQL来实现规则管理的示例。
1.示例
1.1 示例简介
本示例的数据源是Kafka,从采集设备到Kafka的过程没有画出来。许多时候这种数据是做了二进制压缩的,本例中就是这样。数据解析规则采用了SODBASE SQL语句来表达规则,比如数据的第0位到第7位转化为整形,作为设备转速值。规则处理过的数据可以通过socket传给Spark streaming,也可以再通过Kafka传给Spark streaming,从而解决Spark streaming的实时性较弱和定制化能力弱的问题。通过socket的方式前面文章有介绍,本文示例是再通过Kafka传给Spark
streaming。
1.2 操作步骤
1.2.1 安装Kafka,建立两个topic(test,test2)
找一台linux机器,从官方网站下载Kafka,解压,启动
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
建立test topic, bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1.2.2 下载SODBASE规则示例模型和软件
点击此处下载,
(1)SODBASE CEP Server 2.0.23(sp3)以上版本,在LInux下解压,启动
$chmod 777 catalina.sh
$./catalina.sh run
初始化状态服务器
1.示例
1.1 示例简介
本示例的数据源是Kafka,从采集设备到Kafka的过程没有画出来。许多时候这种数据是做了二进制压缩的,本例中就是这样。数据解析规则采用了SODBASE SQL语句来表达规则,比如数据的第0位到第7位转化为整形,作为设备转速值。规则处理过的数据可以通过socket传给Spark streaming,也可以再通过Kafka传给Spark streaming,从而解决Spark streaming的实时性较弱和定制化能力弱的问题。通过socket的方式前面文章有介绍,本文示例是再通过Kafka传给Spark
streaming。
1.2 操作步骤
1.2.1 安装Kafka,建立两个topic(test,test2)
找一台linux机器,从官方网站下载Kafka,解压,启动
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
建立test topic, bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1.2.2 下载SODBASE规则示例模型和软件
点击此处下载,
(1)SODBASE CEP Server 2.0.23(sp3)以上版本,在LInux下解压,启动
$chmod 777 catalina.sh
$./catalina.sh run
初始化状态服务器
$curl http://localhost:16111/sodbase-cep-server-webservice-1.0.1/install[/code] (2)SODBASE CEP Admin用于安装规则模型,在Linux下解压
(3)SODBASE Studio示例模型中的britork-actuator.soddata2规则模型文件到本地
1.2.3 使用SODBASE CEP Admin安装启动britork-actuator.soddata2$ cd SODBASE-CEP-Server-Admin-2.0-u24/bin $ ./installmodel.sh -h localhost -P 16111 -f "../example/britork-actuator.soddata2" -u admin -p cep $ ./startmodel.sh -h localhost -P 16111 -m britork-actuator -v 1.0 -u admin -p cep
1.2.4运行Spark Streaming
编译运行一下Spark代码,就可以接收数据,接收到的数据这里作了屏幕打印。Spark streaming本身不需要经常修改,通过配置上文SODSQL语句就可以改变数据处理的逻辑。在实际项目中也可以落地到Hdfs,或者Hbase等存储中。package example.streaming import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import kafka.serializer.StringDecoder object Kafka { def main(args:Array[String]) { val sparkConf = new SparkConf().setAppName("KafkaTest") val ssc = new StreamingContext(sparkConf, Seconds(60)) ssc.checkpoint("checkpoint") val lines =KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, Map[String, String]("group.id" -> "archive","metadata.broker.list" -> "localhost:9092"), Set("test")) lines.foreachRDD { (rdd, time) =>{ // println(time) rdd.foreach(x=>println(x._1+"----------"+x._2)) }} ssc.start() ssc.awaitTermination() } }
1.3 工作原理
1.3 模型文件
使用SODBASE Studio可以将.soddata2文件转为xml文件,其中的规则为SELECT JAVASTATIC:udf.Bytes:bitsToInt(T1.message,'0','7') AS torque FROM T1:DCSstream完整XML规则文件如下<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <GraphModelData> <CEPSoftwareVersion>2</CEPSoftwareVersion> <inputAdaptors> <inputAdaptorClassName>com.sodbase.inputadaptor.kafka.KafkaInputAdaptor</inputAdaptorClassName> <adaptorParams>DCSstream</adaptorParams> <adaptorParams>test</adaptorParams> <adaptorParams>localhost:2181</adaptorParams> <adaptorParams>message</adaptorParams> <isExternal>false</isExternal> </inputAdaptors> <SODSQLs>CREATE QUERY britork-actuator SELECT JAVASTATIC:udf.Bytes:bitsToInt(T1.message,'0','7') AS torque FROM T1:DCSstream PATTERN T1 WHERE WITHIN 0 </SODSQLs> <outputAdaptors> <isOutputAsSelection>true</isOutputAsSelection> <outputAdaptorClassName>com.sodbase.outputadaptor.PrintEventOutputAdaptor</outputAdaptorClassName> <adaptorParams>false</adaptorParams> <adaptorParams>true</adaptorParams> <isExternal>false</isExternal> <queryName>britork-actuator</queryName> </outputAdaptors> <outputAdaptors> <isOutputAsSelection>true</isOutputAsSelection> <outputAdaptorClassName>com.sodbase.outputadaptor.kafka.KafkaOutputAdaptor</outputAdaptorClassName> <adaptorParams>test2</adaptorParams> <adaptorParams>localhost:9092</adaptorParams> <isExternal>false</isExternal> <queryName>britork-actuator</queryName> </outputAdaptors> <modelName>britork-actuator</modelName> <modelVersion>1.0</modelVersion> <modelDescription></modelDescription> </GraphModelData>
参考:
SODBASE
CEP学习进阶篇(七):SODBASE CEP与Spark streaming集成
SODBASE CEP用于轻松、高效实施数据监测、监控类、实时交易类项目
。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。缓存扩展参见与分布式缓存集成。
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- SQL中的三值逻辑
- SQL Server 作业批量停止
- 结束SQL阻塞的进程
- 动态生成SQL Server视图作业
- SQL Server 语句操纵数据库
- Spark随谈——开发指南(译)
- SQL(结构化查询语句)
- oracle sql日期比较
- Spark,一种快速数据分析替代方案
- linux快速部署mysql服务器
- sql 存储过程分页
- 在WINXP系统上安装SQL Server企业版的方法
- 通过批处理调用SQL的方法(osql)
- SQL Server 存储过程的分页
- ASP程序与SQL存储过程结合使用详解
- SQL SERVER编写存储过程小工具