您的位置:首页 > 大数据

高并发大数据采集分析系统框架设计

2015-11-21 22:57 971 查看
这是2011年做的一个项目,数据由专业的仪器采集,单台仪器最高可能达到256K的采样频率和32通道同时采集和存储,支持多台同时采集。数据过来之后,或实时显示波形,或进算法计算模块,算法的输出也会同时实时显示。因此,需要极快速的数据处理功能。

子系统关系图如下:



使用一般的轮询是肯定达不到速度的要求了,于是决定使用事件触发的方式传递和处理数据。

1. 数据类EntityData.cs

namespace DHapp.Entities
{
/**
* Data Changed Event
*/
public class DataChangedEventArgs : EventArgs
{
public int index;
public double currentValue;
public double[] series;

public DataChangedEventArgs(int index,
double currentValue, double [] series)
{
this.index = index;
this.currentValue = currentValue;
this.series = series;
}
}   //End of DataChangedEventArgs

/**
* Data Entity Data Structure
*/
public class EntityData
{
//lock-free status
public static int IDEL = 0;
public static int BUSY = 1;

//data source type
public static int TimeStamps = 0;
public static int TimeSeries = 1;
public static int Spectrums  = 2;

//data-change event dispatching
public event EventHandler<DataChangedEventArgs> EventDataChanged;

//data source information
public int DataType = TimeSeries;

public int    Index = 0;
public string Name  = "A";

//data
public double    current = 0;
public double [] series  = null;

//clean data series
public void Clear() { series = null; }

//fire data changed event
public void FireDataChangedEvent()
{
if (EventDataChanged != null)
EventDataChanged(this,
new DataChangedEventArgs(Index, current, series));
}

//receive a copy of signal data
public void Set(double value, double [] signals)
{
current = value;
series  = signals;
//          FireDataChangedEvent();
}

//data copy
public        void Copy(double[] source) { Copy(source, ref series); }
public static void Copy(double[] source, ref double[] destination)
{
int size = source.Length;
if (destination == null ||
destination.Length != size) destination = new double[size];

Array.Copy(source, destination, size);
}
}   //End of EntityData
}


2. 信号输入模块基类EntitySensor.cs

namespace DHapp.Entities
{
public abstract class EntitySensor
{
public List<EntityData> dataSource = new List<EntityData>();
public void Clear() { dataSource.Clear(); }

//interfaces
public abstract void Start();
public abstract void Stop ();
}   //End of EntitySensor
}


3. 计算处理类基类EntityAlgorithm.cs

namespace DHapp.Entities
{
/**
* Abstract Signal Processing Algorithm
*/
public abstract class EntityAlgorithm
{
public List<EntityData> dataSource = new List<EntityData>();
public List<EntityData> outputData = new List<EntityData>();

//data source index and dataSource list position reference
Dictionary<int, int> sourceIndex = new Dictionary<int, int>();
int lastSourceIndex = 0;

//clear data sources and indexes
public void Clear()
{
dataSource.Clear ();
sourceIndex.Clear();
}

//link data source
public void LinkDataSource(EntityData data)
{
int index = data.Index;
if (sourceIndex.ContainsKey(index))
{
//replace existing data source link
int k = sourceIndex[index];
dataSource[k] = data;
}
else
{
//new data source
lastSourceIndex = index;
sourceIndex.Add(index, dataSource.Count);
dataSource.Add(data);
}
}

//data source updated - refresh display
public void UpdatedDataSource(int index)
{
if (lastSourceIndex == index)
Refresh();
}

//interfaces
public abstract bool VerifyDataSourceType(ref string message);
public abstract void Initialization();
public abstract void Refresh();
public abstract void Start();
public abstract void Stop();
}
}


4. 显示类基类EntityDisplay.cs

namespace DHapp.Entities
{
/**
* Abstract Display Device
*/
public abstract class EntityDisplay
{
public List<EntityData> dataSource = new List<EntityData>();

//data source index and dataSource list position reference
Dictionary<int, int> sourceIndex = new Dictionary<int, int>();
int lastSourceIndex = 0;

//clear data sources and indexes
public void Clear()
{
dataSource.Clear ();
sourceIndex.Clear();
}

//link data source
public void LinkDataSource(EntityData data)
{
int index = data.Index;

if (sourceIndex.ContainsKey(index))
{
//replace existing data source link
int k = sourceIndex[index];
dataSource[k] = data;
}
else
{
//new data source
lastSourceIndex = index;
sourceIndex.Add(index, dataSource.Count);
dataSource.Add(data);
}
}

//data source updated - refresh display
public void UpdatedDataSource(int index)
{
if (lastSourceIndex == index)
Refresh();
}

//interfaces
public abstract bool VerifyDataSourceType(ref string message);
public abstract void Initialization();
public abstract void Refresh();
public abstract void Start();
public abstract void Stop();
}
}


5. 核心任务类 TaskCenter.cs

各个模块在这里被连接起来

namespace DHapp.Tasks
{
public class TaskCenter
{
public List<EntityData>      dataSources;
public List<EntityAlgorithm> algorithms;
public List<EntityDisplay>   charts;

bool started = false;

/**
* Constructor
*/
public TaskCenter()
{
dataSources = new List<EntityData>();
algorithms  = new List<EntityAlgorithm>();
charts      = new List<EntityDisplay>();
}

/**
* Start | Stop
*/
public void Start() { started = true; }
public void Stop () { started = false; }

/**
* Link Data Sources
*/
public int LinkDataSource(EntityData source)
{
//set data source reference index
int index = dataSources.Count;
source.Index = index;
dataSources.Add(source);

//link data change event processor
source.EventDataChanged += new EventHandler<DataChangedEventArgs>(processDataChangedEvent);
return dataSources.Count;
}

/**
* Link Algorithm with Output Data as Data Source
*/
public int LinkAlgorithm(List<int>        sourceIndex,
List<EntityData> outputData,
EntityAlgorithm  algorithm)
{
//link data source to algorithm
foreach (int index in sourceIndex)
algorithm.LinkDataSource(dataSources[index]);

//set output data from algorithm to data source
for(int i=0; i<outputData.Count; i++) LinkDataSource(outputData[i]);

//add algorithm
algorithms.Add(algorithm);

return dataSources.Count;
}

/**
* Link Display Device
*/
public void LinkDisplayDevice(List<int> sourceIndex, EntityDisplay chart)
{
//link data source to display device
foreach (int index in sourceIndex)
chart.LinkDataSource(dataSources[index]);

//add chart
charts.Add(chart);
}

/**
* Data Source Data Change Event Process
*/
private void processDataChangedEvent(object sender, DataChangedEventArgs arg)
{
int i;
if (!started) return;

//flush algorithms
int index = arg.index;
for (i = 0; i < algorithms.Count; i++)
algorithms[i].UpdatedDataSource(index);

//flush charts
for (i = 0; i < charts.Count; i++)
charts[i].UpdatedDataSource(index);
}
}   //End of TaskCenter
}


在此框架上,我们还创建了:

SensorRandom类,继承自EntitySensor,按照128通道,100ms的采样间隔产生随机数据,数据buffersize设定为64。

AlgoKalman类,继承自EntityAlgorithm,对数据进行卡尔曼算法运算。

ChartSeries类,继承自EntityDisplay, 用于显示波形。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: