您的位置:首页 > 其它

Esper入门及示例(CEP引擎)

2016-07-14 11:47 169 查看

一,概述

        关系型数据用来查询相对静态的数据,如果执行一些复杂的查询,要降低查询的频度。传统关系型数据库普遍将数据存储在硬盘(也有内存数据库),它的检索性能受限于硬盘访问性能。受制于关系型数据库的设计,如果频繁查询数据来实现实时统计则需要在较短时间内构建多次查询语句(SQL),每次检索都耗时较长,关系型数据库会成为系统瓶颈。总而言之,传统关系型数据库并不适合每秒成百上千次的查询统计。
        Esper引擎可以使用类似于SQL的EPL语句来构建处理模型,处理每秒几万到几十万的实时数据的查询统计。它的工作有点像倒过来的关系型数据库,它不需要像数据库那样存储数据,而是先构建查询语句,引擎依据这些处理模型实时的输出符合的结果。而关系型数据要查询语句提交后才会输出结果。
Esper除核心jar包之外还有诸如io,jdbc,jmx等jar。本文只讲解核心的前几章基础知识。

二,例子

    先看一个Epser处理实时数据的例子:
    收集某网站的实时用户访问日志(accesslog),数据字段如下:
    ip(访客ip)、time(访问时间)、url(页面地址)、httpcode(状态码)、agent(浏览器头信息)、sizeinbytes(数据大小)、等等。

    场景1,分析1小时(周期)产生的状态码数量:
    select httpcode,count(*) as hz from accesslog.win:time_batch(1 hour) group by httpcode otder by hz desc;

    场景2,分析1小时(周期)访客对url的访问频率:
    select ip,url,count(*) as hz from accesslog.win:time_batch(1 hour) group by ip,url order by  hz desc;

    场景3,找到可能危险的请求(状态码403 404),分析1小时(周期)访客对url的访问频率:
    select ip,url,count(*) as hz from accesslog(httpcode in(403,404)).win:time_batch(1 hour) group by ip,url order by  hz desc;
    注:三个例子使用了时间批量窗口,它会收集此时间间隔数据一起统计,周期内只输出一次结果(这样更像是关系型数据库的输出形式),以便于理解。

三,入门

    1,Esper能做什么?优点有哪些?
        使用SQL的形式来处理实时数据,支持查询、聚合、连接、过滤等,非常容易上手。
        多种形式(时间/长度)窗口统计事件数据,事件之间可以关联分析。
        高吞吐(10万事件/秒)低延时(毫秒级别)。
        核心只有一个jar包,易于扩展和发布。经常和实时分布式框架Storm一起使用。
    2,基本概念
        事件(Event):就是要统计分析的源数据。 Esper支持多种结构(Java POJO、Map 、数组 、XML)。 POJO的性能最高,map和数组有利于扩展。
        处理模型:
        自定义类实现UpdateListener接口,用来接收epl产生的结果事件:



            public interface UpdateListener{
                public void update(EventBean[] newEvents, EventBean[] oldEvents);
            }

        EventBean类:
 



        EPL:
            首先我们看一个非常简单的EPL语句:select * from Withdrawal;这个语句选择所有的Withdrawal事件,每一次新的事件都会触发updatelistener的update方法。

 


            使用长度窗口EPL: select * from Withdrawal.win:length(5);这条语句选择最近5条Withdrawal数据。运行如下:



 
            使用过滤和where条件的EPL:
                1)select * from Withdrawal(amount>=200).win:length(5);
                2)select * from Withdrawal.win:length(5) where amount >= 200
                上面两条EPL语句表示选择符合amount>=200条件的最近5条数据,不同的是第一条是过滤,只有符合条件的事件才会进入引擎,第二条是where条件,不管是否               符合条件,事件都会进入引擎。仔细观察下图可看出区别。

   




                

                以上示例是长度窗口,类似的还有时间窗口,批量长度窗口和批量时间窗口。
                批量窗口收集数据,在给定的时间间隔或长度满足条件时再触发计算和输出。

            Aggregation and Grouping(聚合函数和分组统计):
            因为EPL与SQL很接近,所以语句语法基本和SQL一样。
            // 统计进入的1小时accesslog事件,独立ip(访客)数量;
            select count(distict(ip)) from accesslog.win:time_batch(1 hour)

            // 统计进入的1小时accesslog事件,按ip分组统计ip(访客)访问的url(页面)个数。 
            select ip, count(distict(url)) from accesslog.win: time_batch(1 hour) group by ip

            //统计进入的1小时accesslog事件,按ip分组统计访客产生的流量。
            select ip,sum(sizeinbytes) from accesslog.win: time_batch(1 hour) group ip

            当然,以上EPL只能解决部分简单场景。如果要深入使用还要了解Context、Patterns、Windows、View等知识。

四,实例

    这小节我们解决一个实际中(简化了分析场景)遇到的问题:
    为了更好的了解客户的流量情况,我们收集了客户主机的网络访问数据(netaccesslog);字段如下所示:
    sip(源主机地址)、dip(目的主机地址)、packets(包数量)、octets(流量)、dcust(目的客户)
        1)实时统计主机的访问数据包个数。并设置一个阈值(35000pps),将超出访问数据包频率的主机显示出来。
        2)实时统计客户主机的访问来源。以便统计访问情况和排查1)中问题产生的原因。

    场景1,
        select dcust ,dip,sum(packets)/30 as pps 
        from netaccesslog.win:time(30 sec) 
        group by dcust ,dip 
        having sum(packets) /30 >35000 
        output snapshot every 1 sec
    使用时间窗口,统计最近30秒,客户目的dip的被访问情况(pps),只输出pps大于35000的客户及dip信息,每1秒输出一次结果信息(保证时效性)。

    场景2,
        Select dcust,dip,sip,sum(packets)/30 as pps, sum(octets)/30 as bps
        From netaccesslog.win:time_batch (30 sec)
        Group by dcust,dip,sip
    使用时间批量窗口,统计30秒周期,客户的主机被其他主机访问情况。输出客户、目标地址、源地址、pps、bps信息。结果保存进数据库以便查询展现。

以下是源代码, 引入esper 的jar包就可以运行

<span style="font-family:Microsoft YaHei;">package udf.test;

import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

/**
* 简化的场景分析示例.
*
*/
public class Test {
static EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
static EPAdministrator admin = epService.getEPAdministrator();

public static void main(String[] args) throws Exception {
// 为了更好的了解客户的流量情况,我们收集了客户主机的网络访问数据(netaccesslog);字段如下所示:
// Sip(源主机地址)、Dip(目的主机地址)、Packets(包数量)、Octets(流量)、DCust(目的客户)
// 1) 实时统计主机的访问数据包个数。并设置一个阈值(35000pps),将超出访问数据包频率的主机显示出来。
// 2) 实时统计客户主机的访问来源。以便统计访问情况和排查1)中问题产生的原因。

// 以下示例, 随机生成网络访问数据,定时发出一条有异常的数据.用来触发展示场景1和场景2的结果.
String netAccessLog = NetAccessEvent.class.getName();
// 1, 编写EPL / 注册监听
String epl1Str = "select dcust ,dip,sum(packets)/30 as pps from " + netAccessLog+ ".win:time(30 sec) group by dcust ,dip having sum(packets) /30 >35000 output snapshot every 1 sec";
String epl2Str = "select dcust,dip,sip,sum(packets)/30 as pps, sum(octets)/30 as bps,count(*) as num from " + netAccessLog+ ".win:time_batch (30 sec) group by dcust,dip,sip order by dcust,dip,sip";
EPStatement epl1 = admin.createEPL(epl1Str);
EPStatement epl2 = admin.createEPL(epl2Str);
// 2,编写updatelistener监听结果
epl1.addListener(new NetAccessListener_epl1());
epl2.addListener(new NetAccessListener_epl2());
// 3,发送随机的网络访问数据 ,定时制造一条有异常的数据, 每100毫秒发送一条事件,每512条数据发送一条异常数据(来测试场景1)
EPRuntime runtime = epService.getEPRuntime();
SendEvent(runtime);
// 等待引擎处理数据
TimeUnit.SECONDS.sleep(600);
}

public static void SendEvent(final EPRuntime runtime) {
Thread sendThread = new Thread(new Runnable() {
@Override
public void run() {
String custPrefix = "客户";
String dstIpPrefix = "200.200.200.";
String srcIpPrefix = "100.100.";
int count = 1;
while (true) {
try {
NetAccessEvent nae = new NetAccessEvent();
nae.setDcust(custPrefix + "_" + ((int) (Math.random() * 10)));
nae.setDip(dstIpPrefix + ((int) (Math.random() * 20)));
nae.setSip(srcIpPrefix + ((int) (Math.random() * 10)) + "." + ((int) (Math.random() * 10)));
nae.setOctets(((long) (Math.random() * 100)));
nae.setPackets(((long) (Math.random() * 100)));
runtime.sendEvent(nae);
count++;
if (count % 100 == 0) {
System.out.println("发送了" + count + "条事件");
}
if (count % 512 == 0) {
NetAccessEvent naeAnomaly = new NetAccessEvent();
naeAnomaly.setDcust(custPrefix + "_" + ((int) (Math.random() * 10)));
naeAnomaly.setDip(dstIpPrefix + ((int) (Math.random() * 20)));
naeAnomaly.setSip(srcIpPrefix + ((int) (Math.random() * 10)) + "." + ((int) (Math.random() * 10)));
naeAnomaly.setOctets(35000 * 30l);
naeAnomaly.setPackets(35000 * 30l);
runtime.sendEvent(naeAnomaly);
}
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {

}
}
}
});
sendThread.start();
}
}

class NetAccessListener_epl1 implements UpdateListener {
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
if (newEvents != null) {
for (EventBean eventbean : newEvents) {
System.out.println("客户:" + eventbean.get("dcust") + "的主机:" + eventbean.get("dip") + "的访问超过了限制; " + "pps:" + eventbean.get("pps"));
}
}
}
}

class NetAccessListener_epl2 implements UpdateListener {
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
if (newEvents != null) {
System.out.println("\t 客户\t 目标主机\t 源主机\t pps\t bps\t 条目数\n");
DecimalFormat df = new DecimalFormat("#0.0000");
for (EventBean eventbean : newEvents) {
// 没有数据的项目不输出
if (eventbean.get("pps") == null) {
continue;
}
System.out.println(eventbean.get("dcust") + "\t" + eventbean.get("dip") + "\t" + eventbean.get("sip") + "\t" + df.format(eventbean.get("pps")) + "\t" + df.format(eventbean.get("bps")) + "\t" + eventbean.get("num"));
}
}
}
}

/**
* 代表网络访问事件数据
*/
class NetAccessEvent {
private String sip;
private String dip;
private Long packets;
private Long octets;
private String dcust;

public String getSip() {
return sip;
}

public void setSip(String sip) {
this.sip = sip;
}

public String getDip() {
return dip;
}

public void setDip(String dip) {
this.dip = dip;
}

public Long getPackets() {
return packets;
}

public void setPackets(Long packets) {
this.packets = packets;
}

public Long getOctets() {
return octets;
}

public void setOctets(Long octets) {
this.octets = octets;
}

public String getDcust() {
return dcust;
}

public void setDcust(String dcust) {
this.dcust = dcust;
}

}
</span>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: