高并发大数据采集分析系统框架设计
2015-11-21 22:57
971 查看
这是2011年做的一个项目,数据由专业的仪器采集,单台仪器最高可能达到256K的采样频率和32通道同时采集和存储,支持多台同时采集。数据过来之后,或实时显示波形,或进算法计算模块,算法的输出也会同时实时显示。因此,需要极快速的数据处理功能。
子系统关系图如下:
使用一般的轮询是肯定达不到速度的要求了,于是决定使用事件触发的方式传递和处理数据。
1. 数据类EntityData.cs
2. 信号输入模块基类EntitySensor.cs
3. 计算处理类基类EntityAlgorithm.cs
4. 显示类基类EntityDisplay.cs
5. 核心任务类 TaskCenter.cs
各个模块在这里被连接起来
在此框架上,我们还创建了:
SensorRandom类,继承自EntitySensor,按照128通道,100ms的采样间隔产生随机数据,数据buffersize设定为64。
AlgoKalman类,继承自EntityAlgorithm,对数据进行卡尔曼算法运算。
ChartSeries类,继承自EntityDisplay, 用于显示波形。
子系统关系图如下:
使用一般的轮询是肯定达不到速度的要求了,于是决定使用事件触发的方式传递和处理数据。
1. 数据类EntityData.cs
namespace DHapp.Entities { /** * Data Changed Event */ public class DataChangedEventArgs : EventArgs { public int index; public double currentValue; public double[] series; public DataChangedEventArgs(int index, double currentValue, double [] series) { this.index = index; this.currentValue = currentValue; this.series = series; } } //End of DataChangedEventArgs /** * Data Entity Data Structure */ public class EntityData { //lock-free status public static int IDEL = 0; public static int BUSY = 1; //data source type public static int TimeStamps = 0; public static int TimeSeries = 1; public static int Spectrums = 2; //data-change event dispatching public event EventHandler<DataChangedEventArgs> EventDataChanged; //data source information public int DataType = TimeSeries; public int Index = 0; public string Name = "A"; //data public double current = 0; public double [] series = null; //clean data series public void Clear() { series = null; } //fire data changed event public void FireDataChangedEvent() { if (EventDataChanged != null) EventDataChanged(this, new DataChangedEventArgs(Index, current, series)); } //receive a copy of signal data public void Set(double value, double [] signals) { current = value; series = signals; // FireDataChangedEvent(); } //data copy public void Copy(double[] source) { Copy(source, ref series); } public static void Copy(double[] source, ref double[] destination) { int size = source.Length; if (destination == null || destination.Length != size) destination = new double[size]; Array.Copy(source, destination, size); } } //End of EntityData }
2. 信号输入模块基类EntitySensor.cs
namespace DHapp.Entities { public abstract class EntitySensor { public List<EntityData> dataSource = new List<EntityData>(); public void Clear() { dataSource.Clear(); } //interfaces public abstract void Start(); public abstract void Stop (); } //End of EntitySensor }
3. 计算处理类基类EntityAlgorithm.cs
namespace DHapp.Entities { /** * Abstract Signal Processing Algorithm */ public abstract class EntityAlgorithm { public List<EntityData> dataSource = new List<EntityData>(); public List<EntityData> outputData = new List<EntityData>(); //data source index and dataSource list position reference Dictionary<int, int> sourceIndex = new Dictionary<int, int>(); int lastSourceIndex = 0; //clear data sources and indexes public void Clear() { dataSource.Clear (); sourceIndex.Clear(); } //link data source public void LinkDataSource(EntityData data) { int index = data.Index; if (sourceIndex.ContainsKey(index)) { //replace existing data source link int k = sourceIndex[index]; dataSource[k] = data; } else { //new data source lastSourceIndex = index; sourceIndex.Add(index, dataSource.Count); dataSource.Add(data); } } //data source updated - refresh display public void UpdatedDataSource(int index) { if (lastSourceIndex == index) Refresh(); } //interfaces public abstract bool VerifyDataSourceType(ref string message); public abstract void Initialization(); public abstract void Refresh(); public abstract void Start(); public abstract void Stop(); } }
4. 显示类基类EntityDisplay.cs
namespace DHapp.Entities { /** * Abstract Display Device */ public abstract class EntityDisplay { public List<EntityData> dataSource = new List<EntityData>(); //data source index and dataSource list position reference Dictionary<int, int> sourceIndex = new Dictionary<int, int>(); int lastSourceIndex = 0; //clear data sources and indexes public void Clear() { dataSource.Clear (); sourceIndex.Clear(); } //link data source public void LinkDataSource(EntityData data) { int index = data.Index; if (sourceIndex.ContainsKey(index)) { //replace existing data source link int k = sourceIndex[index]; dataSource[k] = data; } else { //new data source lastSourceIndex = index; sourceIndex.Add(index, dataSource.Count); dataSource.Add(data); } } //data source updated - refresh display public void UpdatedDataSource(int index) { if (lastSourceIndex == index) Refresh(); } //interfaces public abstract bool VerifyDataSourceType(ref string message); public abstract void Initialization(); public abstract void Refresh(); public abstract void Start(); public abstract void Stop(); } }
5. 核心任务类 TaskCenter.cs
各个模块在这里被连接起来
namespace DHapp.Tasks { public class TaskCenter { public List<EntityData> dataSources; public List<EntityAlgorithm> algorithms; public List<EntityDisplay> charts; bool started = false; /** * Constructor */ public TaskCenter() { dataSources = new List<EntityData>(); algorithms = new List<EntityAlgorithm>(); charts = new List<EntityDisplay>(); } /** * Start | Stop */ public void Start() { started = true; } public void Stop () { started = false; } /** * Link Data Sources */ public int LinkDataSource(EntityData source) { //set data source reference index int index = dataSources.Count; source.Index = index; dataSources.Add(source); //link data change event processor source.EventDataChanged += new EventHandler<DataChangedEventArgs>(processDataChangedEvent); return dataSources.Count; } /** * Link Algorithm with Output Data as Data Source */ public int LinkAlgorithm(List<int> sourceIndex, List<EntityData> outputData, EntityAlgorithm algorithm) { //link data source to algorithm foreach (int index in sourceIndex) algorithm.LinkDataSource(dataSources[index]); //set output data from algorithm to data source for(int i=0; i<outputData.Count; i++) LinkDataSource(outputData[i]); //add algorithm algorithms.Add(algorithm); return dataSources.Count; } /** * Link Display Device */ public void LinkDisplayDevice(List<int> sourceIndex, EntityDisplay chart) { //link data source to display device foreach (int index in sourceIndex) chart.LinkDataSource(dataSources[index]); //add chart charts.Add(chart); } /** * Data Source Data Change Event Process */ private void processDataChangedEvent(object sender, DataChangedEventArgs arg) { int i; if (!started) return; //flush algorithms int index = arg.index; for (i = 0; i < algorithms.Count; i++) algorithms[i].UpdatedDataSource(index); //flush charts for (i = 0; i < charts.Count; i++) charts[i].UpdatedDataSource(index); } } //End of TaskCenter }
在此框架上,我们还创建了:
SensorRandom类,继承自EntitySensor,按照128通道,100ms的采样间隔产生随机数据,数据buffersize设定为64。
AlgoKalman类,继承自EntityAlgorithm,对数据进行卡尔曼算法运算。
ChartSeries类,继承自EntityDisplay, 用于显示波形。
相关文章推荐
- CLSRSC-184 CLSRSC-258: Failed to configure and start ASM
- 怎样通过MSG_WAITALL设置阻塞时间
- 如何提高大数据业务系统的性能
- 云计算三种服务模式SaaS、PaaS和IaaS及其之间关系
- [AlwaysOn Availability Groups]DMV和系统目录视图
- Codeforces 272C Dima and Staircase
- Postfix之mail.cf
- OC中内存管理 assign, retain, copy 的 setter 和 getter 方法
- POJ 1804 Brainman
- [AlwaysOn Availability Groups]AlwaysOn健康诊断日志
- [AlwaysOn Availability Groups]CLUSTER.LOG(AG)
- HMC创建AIX
- UVA - 253 Cube painting(骰子涂色)
- 统计HDFS 上字节数据统计
- Paint及Canvas的简单应用
- 大数据技术学习之路,从今天开始
- 人工智能与数据挖掘顶级会议
- LINK1123:failure during conversion to COFF:file invalid or corrupt
- SQL大数据量查询的优化
- 2014_acmicpc_shanghai_google