您的位置:首页 > 其它

【原创】StreamInsight查询系列(二十四)——查询模式之模式匹配

2011-09-29 19:59 393 查看
上篇文章介绍了查询模式中如何实现指数平滑法,这篇博文将介绍StreamInsight中如何实现模式匹配。

测试数据准备

为了方便测试查询,我们首先准备一个静态的测试数据源:

var startTime = new DateTime(2011, 9, 28).ToLocalTime();
var source = new[]
{
PointEvent.CreateInsert(startTime.AddSeconds(0), 1),
PointEvent.CreateInsert(startTime.AddSeconds(20), 2),
PointEvent.CreateInsert(startTime.AddSeconds(21), 1),
PointEvent.CreateInsert(startTime.AddSeconds(40), 3),
PointEvent.CreateCti<int>(startTime.AddSeconds(51)),
};

接下去将上述数据源转变为点类型复杂事件流:

var xs = source.ToStream(Application);

模式匹配

问题:怎样在StreamInsight中实现找出如下模式“找出所有的1号事件,保证在其发生后的30秒内没有2号事件出现”?

阅读过《StreamInsight查询系列(十八)——查询模式之趋势发现》的读者相信可以很快的写出类似下面的查询:

var ret = from e in xs.Where(e => e == 1)
where xs.Where(e2 => e2 == 2)
.AlterEventLifetime(
e2 => e2.StartTime - TimeSpan.FromSeconds(30) + TimeSpan.FromTicks(1),
e2 => TimeSpan.FromSeconds(30))
.IsEmpty()
select e;


结果如下:



不错,这样的确能够解决上述问题。但是这里要介绍的是另外一种方法:使用用户自定义流运算符进行模式匹配,我们希望使用如下的语句就能得到结果:

xs.Scan(new SimplePatternMatcher());

下面是具体的实现过程:

/// <summary>
/// 查找值为1的事件(1号事件)并且该事件在30秒内不后随值为2的事件(2号事件)。
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
[DataMember]
DateTimeOffset? _nextCti;

[DataMember]
// 记录下一个Cti出现之前的所有1号事件的时间戳
readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

public override bool IsEmpty
{
// 当没有任何1号事件出现时,标识运算符为空。
get { return _active.Count == 0; }
}

public override DateTimeOffset? NextCti
{
get { return _nextCti; }
}

public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
{
// 响应输入的时间戳产生输出。
// 任何30秒内没有匹配2号事件的1号事件模式的事件都将输出
while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
{
yield return _active.Dequeue().UtcDateTime;
}

// 根据新的输入事件更新流运算符的状态。
if (inputEvent.EventKind == EventKind.Insert)
{
if (inputEvent.Payload == 1)
_active.Enqueue(inputEvent.StartTime);
else if (inputEvent.Payload == 2)
_active.Clear();
}

// 如果存在1号事件,设置其30秒后输出
// if needed.
if (_active.Count > 0)
{
_nextCti = _active.Peek().AddSeconds(30);
}
}
}

这里稍微介绍一下从CepPointStreamOperator覆盖的属性和函数,它们分别是:

ProcessEvent 方法。生成输出并更新运算符的内部状态,以便响应各输入事件。 ProcessEvent 接收一个输入事件并且可以返回零个或多个输出负载。

IsEmpty 属性。指示运算符的内部状态是否为空。在为 true 时,StreamInsight 查询引擎可能会放弃运算符实例以降低内存使用率。

NextCti 属性。指示 CTI 事件将发送到运算符的下一个时间点。通过覆盖此属性,可以让用户定义的运算符在将来的特定时间点生成输出,或者在经过某个应用程序时间间隔后指示其内部状态为空。

最终的结果如下:



好了,至此StreamInsight查询模式所有部分介绍完毕!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐