【原创】StreamInsight查询系列(二十四)——查询模式之模式匹配
2011-09-29 19:59
393 查看
上篇文章介绍了查询模式中如何实现指数平滑法,这篇博文将介绍StreamInsight中如何实现模式匹配。
接下去将上述数据源转变为点类型复杂事件流:
阅读过《StreamInsight查询系列(十八)——查询模式之趋势发现》的读者相信可以很快的写出类似下面的查询:
结果如下:
不错,这样的确能够解决上述问题。但是这里要介绍的是另外一种方法:使用用户自定义流运算符进行模式匹配,我们希望使用如下的语句就能得到结果:
下面是具体的实现过程:
这里稍微介绍一下从CepPointStreamOperator覆盖的属性和函数,它们分别是:
ProcessEvent 方法。生成输出并更新运算符的内部状态,以便响应各输入事件。 ProcessEvent 接收一个输入事件并且可以返回零个或多个输出负载。
IsEmpty 属性。指示运算符的内部状态是否为空。在为 true 时,StreamInsight 查询引擎可能会放弃运算符实例以降低内存使用率。
NextCti 属性。指示 CTI 事件将发送到运算符的下一个时间点。通过覆盖此属性,可以让用户定义的运算符在将来的特定时间点生成输出,或者在经过某个应用程序时间间隔后指示其内部状态为空。
最终的结果如下:
好了,至此StreamInsight查询模式所有部分介绍完毕!
测试数据准备
为了方便测试查询,我们首先准备一个静态的测试数据源:var startTime = new DateTime(2011, 9, 28).ToLocalTime(); var source = new[] { PointEvent.CreateInsert(startTime.AddSeconds(0), 1), PointEvent.CreateInsert(startTime.AddSeconds(20), 2), PointEvent.CreateInsert(startTime.AddSeconds(21), 1), PointEvent.CreateInsert(startTime.AddSeconds(40), 3), PointEvent.CreateCti<int>(startTime.AddSeconds(51)), };
接下去将上述数据源转变为点类型复杂事件流:
var xs = source.ToStream(Application);
模式匹配
问题:怎样在StreamInsight中实现找出如下模式“找出所有的1号事件,保证在其发生后的30秒内没有2号事件出现”?阅读过《StreamInsight查询系列(十八)——查询模式之趋势发现》的读者相信可以很快的写出类似下面的查询:
var ret = from e in xs.Where(e => e == 1) where xs.Where(e2 => e2 == 2) .AlterEventLifetime( e2 => e2.StartTime - TimeSpan.FromSeconds(30) + TimeSpan.FromTicks(1), e2 => TimeSpan.FromSeconds(30)) .IsEmpty() select e;
结果如下:
不错,这样的确能够解决上述问题。但是这里要介绍的是另外一种方法:使用用户自定义流运算符进行模式匹配,我们希望使用如下的语句就能得到结果:
xs.Scan(new SimplePatternMatcher());
下面是具体的实现过程:
/// <summary> /// 查找值为1的事件(1号事件)并且该事件在30秒内不后随值为2的事件(2号事件)。 /// </summary> [DataContract] public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime> { [DataMember] DateTimeOffset? _nextCti; [DataMember] // 记录下一个Cti出现之前的所有1号事件的时间戳 readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>(); public override bool IsEmpty { // 当没有任何1号事件出现时,标识运算符为空。 get { return _active.Count == 0; } } public override DateTimeOffset? NextCti { get { return _nextCti; } } public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent) { // 响应输入的时间戳产生输出。 // 任何30秒内没有匹配2号事件的1号事件模式的事件都将输出 while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime) { yield return _active.Dequeue().UtcDateTime; } // 根据新的输入事件更新流运算符的状态。 if (inputEvent.EventKind == EventKind.Insert) { if (inputEvent.Payload == 1) _active.Enqueue(inputEvent.StartTime); else if (inputEvent.Payload == 2) _active.Clear(); } // 如果存在1号事件,设置其30秒后输出 // if needed. if (_active.Count > 0) { _nextCti = _active.Peek().AddSeconds(30); } } }
这里稍微介绍一下从CepPointStreamOperator覆盖的属性和函数,它们分别是:
ProcessEvent 方法。生成输出并更新运算符的内部状态,以便响应各输入事件。 ProcessEvent 接收一个输入事件并且可以返回零个或多个输出负载。
IsEmpty 属性。指示运算符的内部状态是否为空。在为 true 时,StreamInsight 查询引擎可能会放弃运算符实例以降低内存使用率。
NextCti 属性。指示 CTI 事件将发送到运算符的下一个时间点。通过覆盖此属性,可以让用户定义的运算符在将来的特定时间点生成输出,或者在经过某个应用程序时间间隔后指示其内部状态为空。
最终的结果如下:
好了,至此StreamInsight查询模式所有部分介绍完毕!
相关文章推荐
- 【原创】StreamInsight查询系列(十八)——查询模式之趋势发现
- 【原创】StreamInsight查询系列(二十)——查询模式之检测间隙事件
- 【原创】StreamInsight查询系列(十三)——查询模式之基本模式
- 【原创】StreamInsight查询系列(十七)——查询模式之应对瞬变及报警泛滥
- 【原创】StreamInsight查询系列(十一)——查询模式之窗口对齐
- 【原创】StreamInsight查询系列(二十一)——查询模式之使用地理数据
- 【原创】StreamInsight查询系列(二十二)——查询模式之持续更新
- 【原创】StreamInsight查询系列(十六)——查询模式之左外联接
- 【原创】StreamInsight查询系列(十五)——查询模式之窗口比率
- 【原创】StreamInsight查询系列(二十三)——查询模式之指数平滑法
- 【原创】StreamInsight查询系列(十四)——查询模式之相异计数
- 【原创】StreamInsight查询系列(十九)——查询模式之检测异常
- 【原创】StreamInsight查询系列(十二)——查询模式之事件对齐
- 【原创】StreamInsight查询系列(二)——在LINQPad中输出查询结果
- 【原创】StreamInsight查询系列(一)——准备工作
- 【原创】StreamInsight查询系列(九)——基本查询操作之决胜排序
- 【原创】StreamInsight查询系列(三)——基本查询操作之过滤
- 【原创】StreamInsight查询系列(四)——基本查询操作之聚合
- 【原创】StreamInsight查询系列(七)——基本查询操作之基础排序
- 【原创】StreamInsight查询系列(十)——基本查询操作之联接