您的位置:首页 > 其它

【原创】StreamInsight查询系列(六)——基本查询操作之分组聚合

2011-08-22 23:11 633 查看
上篇博文介绍了StreamInsight基础查询操作中的用户自定义聚合部分。这篇文章将主要介绍如何在StreamInsight查询中使用分组聚合。

测试数据准备

为了方便测试查询,我们首先准备一个静态的测试数据源:

var weatherData = new[]
{
new { Timestamp = new DateTime(2010, 1, 1, 0, 00, 00, DateTimeKind.Utc), Temperature = -9.0, StationCode = 71395, WindSpeed = 4},
new { Timestamp = new DateTime(2010, 1, 1, 0, 30, 00, DateTimeKind.Utc), Temperature = -4.5, StationCode = 71801, WindSpeed = 41},
new { Timestamp = new DateTime(2010, 1, 1, 1, 00, 00, DateTimeKind.Utc), Temperature = -8.8, StationCode = 71395, WindSpeed = 6},
new { Timestamp = new DateTime(2010, 1, 1, 1, 30, 00, DateTimeKind.Utc), Temperature = -4.4, StationCode = 71801, WindSpeed = 39},
new { Timestamp = new DateTime(2010, 1, 1, 2, 00, 00, DateTimeKind.Utc), Temperature = -9.7, StationCode = 71395, WindSpeed = 9},
new { Timestamp = new DateTime(2010, 1, 1, 2, 30, 00, DateTimeKind.Utc), Temperature = -4.6, StationCode = 71801, WindSpeed = 59},
new { Timestamp = new DateTime(2010, 1, 1, 3, 00, 00, DateTimeKind.Utc), Temperature = -9.6, StationCode = 71395, WindSpeed = 9},
};

weatherData代表了一系列的天气信息(时间戳、温度、气象站编码以及风速)。

接下去将weatherData转变为点类型复杂事件流:

var weatherStream = weatherData.ToPointStream(Application,
t => PointEvent.CreateInsert(t.Timestamp, t),
AdvanceTimeSettings.IncreasingStartTime);

分组聚合

问题1:怎样计算过去2小时每个组内事件的平均值?

放在上面的例子中,我们可以把问题转变为“怎样计算过去2小时每个气象站内所有气象事件的平均温度和平均风速?”。相信熟悉LINQ的读者一定记得group..by子句,这里我们可以结合group..by和翻转窗口TumblingWindow解决上述问题,代码如下:

var averageGroupQuery = from e in weatherStream
group e by e.StationCode into stationGroups
from win in stationGroups.TumblingWindow(TimeSpan.FromHours(2), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
StationCode = stationGroups.Key,
AverageTemperature = win.Avg(e => e.Temperature),
AverageWindspeed = win.Avg(e => e.WindSpeed)
};

LINQPad中的结果如下:



问题2:怎样每隔1小时的计算过去2小时每个组内事件的平均值?

与问题1较为类似,这里是group..by子句与跳跃窗口HoppingWindow之间的组合。

var averageGroupQuery2 = from e in weatherStream
group e by e.StationCode into stationGroups
from win in stationGroups
.HoppingWindow(TimeSpan.FromHours(2),
TimeSpan.FromHours(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
StationCode = stationGroups.Key,
AverageTemperature = win.Avg(e => e.Temperature),
AverageWindspeed = win.Avg(e => e.WindSpeed)
};

LINQPad输出结果如下:



问题3:怎样在每当一个新事件到达时,计算过去2小时每个组内事件的平均值?

var averageGroupQuery3 = from e in weatherStream
.AlterEventDuration(e => TimeSpan.FromHours(2))
group e by e.StationCode into stationGroups
from win in stationGroups
.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new
{
StationCode = stationGroups.Key,
AverageTemperature = win.Avg(e => e.Temperature),
AverageWindspeed = win.Avg(e => e.WindSpeed)
};

结果如下:



问题4:怎样计算过去2小时的分组数目?

这个问题可以分成两个阶段处理:第1个阶段将2小时内的所有时间归到各自的组,而后第2个阶段统计这个时间段内的分组的数目。

第1个阶段代码如下(将事件归组):

var groupQuery = from e in weatherStream
group e by e.StationCode into stationGroups
from win in stationGroups
.TumblingWindow(TimeSpan.FromHours(2), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
StationCode = stationGroups.Key,
EventCount = win.Count()
};

第2个阶段代码如下(统计事件分组数):

var groupCountQuery = from win in groupQuery.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new
{
GroupCount = win.Count()
};

LINQPad中的输出结果如下:



下一篇将介绍StreamInsight基础查询操作中的基础排序(TopK)部分。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐