您的位置:首页 > 数据库

SODBASE CEP学习进阶篇(七)续:SODBASE CEP与Spark streaming集成-低延迟规则管理

2015-12-01 11:24 549 查看
许多大数据平台项目采用流式计算来处理实时数据,会涉及到一个环节:处理规则管理。因为用户经常有自己配置数据处理规则或策略的需求。同时,维护人员来也有也有将规则提取出来的需求,方便变更和维护的需求。我们知道Spark streaming作为数据归档备份时吞吐量高,与Hadoop集成相对方便。但是Spark streaming也存在高延时,框架过重带来策略规则修改复杂的问题。本文介绍Spark streaming加SODBASE SQL来实现规则管理的示例。

1.示例

1.1 示例简介

本示例的数据源是Kafka,从采集设备到Kafka的过程没有画出来。许多时候这种数据是做了二进制压缩的,本例中就是这样。数据解析规则采用了SODBASE SQL语句来表达规则,比如数据的第0位到第7位转化为整形,作为设备转速值。规则处理过的数据可以通过socket传给Spark streaming,也可以再通过Kafka传给Spark streaming,从而解决Spark streaming的实时性较弱和定制化能力弱的问题。通过socket的方式前面文章有介绍,本文示例是再通过Kafka传给Spark
streaming。



1.2 操作步骤

1.2.1 安装Kafka,建立两个topic(test,test2)

找一台linux机器,从官方网站下载Kafka,解压,启动

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

建立test topic, bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

1.2.2 下载SODBASE规则示例模型和软件

点击此处下载,

(1)SODBASE CEP Server 2.0.23(sp3)以上版本,在LInux下解压,启动

$chmod 777 catalina.sh
$./catalina.sh run
初始化状态服务器

$curl http://localhost:16111/sodbase-cep-server-webservice-1.0.1/install[/code] (2)SODBASE CEP Admin用于安装规则模型,在Linux下解压

(3)SODBASE Studio示例模型中的britork-actuator.soddata2规则模型文件到本地

1.2.3 使用SODBASE CEP Admin安装启动britork-actuator.soddata2

$ cd SODBASE-CEP-Server-Admin-2.0-u24/bin

$ ./installmodel.sh -h localhost -P 16111 -f "../example/britork-actuator.soddata2" -u admin -p cep
$ ./startmodel.sh -h localhost -P 16111 -m britork-actuator -v 1.0 -u admin -p cep


1.2.4运行Spark Streaming

编译运行一下Spark代码,就可以接收数据,接收到的数据这里作了屏幕打印。Spark streaming本身不需要经常修改,通过配置上文SODSQL语句就可以改变数据处理的逻辑。在实际项目中也可以落地到Hdfs,或者Hbase等存储中。

package example.streaming

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder

object Kafka {
def main(args:Array[String])
{

val sparkConf = new SparkConf().setAppName("KafkaTest")
val ssc =  new StreamingContext(sparkConf, Seconds(60))
ssc.checkpoint("checkpoint")
val lines =KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, Map[String, String]("group.id" -> "archive","metadata.broker.list" -> "localhost:9092"), Set("test"))

lines.foreachRDD { (rdd, time) =>{
// println(time)
rdd.foreach(x=>println(x._1+"----------"+x._2))
}}
ssc.start()
ssc.awaitTermination()
}
}


1.3 工作原理

1.3 模型文件

使用SODBASE Studio可以将.soddata2文件转为xml文件,其中的规则为

SELECT JAVASTATIC:udf.Bytes:bitsToInt(T1.message,'0','7') AS torque
FROM T1:DCSstream
完整XML规则文件如下

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<GraphModelData>
<CEPSoftwareVersion>2</CEPSoftwareVersion>
<inputAdaptors>
<inputAdaptorClassName>com.sodbase.inputadaptor.kafka.KafkaInputAdaptor</inputAdaptorClassName>
<adaptorParams>DCSstream</adaptorParams>
<adaptorParams>test</adaptorParams>
<adaptorParams>localhost:2181</adaptorParams>
<adaptorParams>message</adaptorParams>
<isExternal>false</isExternal>
</inputAdaptors>
<SODSQLs>CREATE QUERY britork-actuator SELECT JAVASTATIC:udf.Bytes:bitsToInt(T1.message,'0','7') AS torque FROM T1:DCSstream PATTERN T1 WHERE   WITHIN 0 </SODSQLs>
<outputAdaptors>
<isOutputAsSelection>true</isOutputAsSelection>
<outputAdaptorClassName>com.sodbase.outputadaptor.PrintEventOutputAdaptor</outputAdaptorClassName>
<adaptorParams>false</adaptorParams>
<adaptorParams>true</adaptorParams>
<isExternal>false</isExternal>
<queryName>britork-actuator</queryName>
</outputAdaptors>
<outputAdaptors>
<isOutputAsSelection>true</isOutputAsSelection>
<outputAdaptorClassName>com.sodbase.outputadaptor.kafka.KafkaOutputAdaptor</outputAdaptorClassName>
<adaptorParams>test2</adaptorParams>
<adaptorParams>localhost:9092</adaptorParams>
<isExternal>false</isExternal>
<queryName>britork-actuator</queryName>
</outputAdaptors>
<modelName>britork-actuator</modelName>
<modelVersion>1.0</modelVersion>
<modelDescription></modelDescription>
</GraphModelData>


参考:
SODBASE
CEP学习进阶篇(七):SODBASE CEP与Spark streaming集成
SODBASE CEP用于轻松、高效实施数据监测、监控类、实时交易类项目

。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。缓存扩展参见与分布式缓存集成

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息