您的位置:首页 > 职场人生

程序员的量化交易之路(35)--Lean之DataFeed数据槽3

2015-06-19 12:37 507 查看
转载需注明出处:http://blog.csdn.net/minimicallhttp://cloudtrade.top/

Lean引擎的模块划分非常的规范。其中DataFeed是数据槽,就是供应数据的模块。

1. IDataFeed 接口

模块的接口为:

namespace QuantConnect.Lean.Engine.DataFeeds
{
/// <summary>
/// Datafeed interface for creating custom datafeed sources.
/// 数据供应的借口
/// </summary>
public interface IDataFeed
{
/********************************************************
* INTERFACE PROPERTIES
*********************************************************/
/// <summary>
/// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
/// 订阅列表
/// </summary>
List<SubscriptionDataConfig> Subscriptions
{
get;
}

/// <summary>
/// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
/// 实时价格
/// </summary>
/// <remarks>Indexed in order of the subscriptions</remarks>
List<decimal> RealtimePrices
{
get;
}

/// <summary>
/// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
/// 跨线程使用的队列,datafeed线程放入数据,算法主线程读出数据
/// </summary>
ConcurrentQueue<List<BaseData>>[] Bridge
{
get;
set;
}

/// <summary>
/// Boolean flag indicating there is no more data in any of our subscriptions.
/// </summary>
bool EndOfBridges
{
get;
}

/// <summary>
/// Array of boolean flags indicating the data status for each queue/subscription we're tracking.
/// </summary>
bool[] EndOfBridge
{
get;
}

/// <summary>
/// Set the source of the data we're requesting for the type-readers to know where to get data from.
/// </summary>
/// <remarks>Live or Backtesting Datafeed</remarks>
DataFeedEndpoint DataFeed
{
get;
set;
}

/// <summary>
/// Public flag indicator that the thread is still busy.
/// 设置该线程是否活跃
/// </summary>
bool IsActive
{
get;
}

/// <summary>
/// The most advanced moment in time for which the data feed has completed loading data
/// </summary>
DateTime LoadedDataFrontier { get; }

/// <summary>
/// Data has completely loaded and we don't expect any more.
/// </summary>
bool LoadingComplete
{
get;
}

/********************************************************
* INTERFACE METHODS
*********************************************************/
/// <summary>
/// Primary entry point.
/// </summary>
void Run();

/// <summary>
/// External controller calls to signal a terminate of the thread.
/// </summary>
void Exit();

/// <summary>
/// Purge all remaining data in the thread.
/// </summary>
void PurgeData();
}
}
IDataFeed是数据槽接口,是其他实现类必须实现的。
2. BaseDataFeed 数据槽基类

它实现IDataFeed,并且是其他派生类的一个基类。

namespace QuantConnect.Lean.Engine.DataFeeds
{
/// <summary>
/// Common components of a data feed allowing the extender to implement only the parts which matter.
/// 数据槽的基类,允许派生类定制部分
/// </summary>
public abstract class BaseDataFeed : IDataFeed
{
/********************************************************
* CLASS VARIABLES
*********************************************************/
private IAlgorithm _algorithm;
private BacktestNodePacket _job;
private bool _endOfStreams = false;
private int _subscriptions = 0;
private int _bridgeMax = 500000;
private bool _exitTriggered = false;

private DateTime[] _frontierTime;

/********************************************************
* CLASS PROPERTIES
*********************************************************/
/// <summary>
/// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
/// 订阅列表信息
/// </summary>
public List<SubscriptionDataConfig> Subscriptions { get; private set; }

/// <summary>
/// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
/// 实时价格
/// </summary>
/// <remarks>Indexed in order of the subscriptions</remarks>
public List<decimal> RealtimePrices { get; private set; }

/// <summary>
/// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
/// 桥
/// </summary>
public ConcurrentQueue<List<BaseData>>[] Bridge { get; set; }

/// <summary>
/// Stream created from the configuration settings.
/// 配置产生的流
/// </summary>
public SubscriptionDataReader[] SubscriptionReaderManagers { get; set; }

/// <summary>
/// Set the source of the data we're requesting for the type-readers to know where to get data from.
/// </summary>
/// <remarks>Live or Backtesting Datafeed</remarks>
public DataFeedEndpoint DataFeed { get; set; }

/// <summary>
/// Flag indicating the hander thread is completely finished and ready to dispose.
/// </summary>
public bool IsActive { get; private set; }

/// <summary>
/// Flag indicating the file system has loaded all files.
/// </summary>
public bool LoadingComplete { get; private set; }

/// <summary>
/// Furthest point in time that the data has loaded into the bridges.
/// </summary>
public DateTime LoadedDataFrontier { get; private set; }

/// <summary>
/// Signifying no more data across all bridges
/// </summary>
public bool EndOfBridges
{
get
{
for (var i = 0; i < Bridge.Length; i++)
{
if (Bridge[i].Count != 0 || EndOfBridge[i] != true)
{
return false;
}
}
return true;
}
}

/// <summary>
/// End of Stream for Each Bridge:
/// </summary>
public bool[] EndOfBridge { get; set; }

/********************************************************
* CLASS CONSTRUCTOR
*********************************************************/
/// <summary>
/// Create an instance of the base datafeed.
/// </summary>
public BaseDataFeed(IAlgorithm algorithm, BacktestNodePacket job)
{
//Save the data subscriptions
Subscriptions = algorithm.SubscriptionManager.Subscriptions;//是一个链表,每个节点代表了对一种证券资产数据的订阅
_subscriptions = Subscriptions.Count;//订阅了证券数目

//Public Properties:
DataFeed = DataFeedEndpoint.FileSystem;//默认赋予从文件系统读取
IsActive = true;//线程是否活跃
Bridge = new ConcurrentQueue<List<BaseData>>[_subscriptions];//桥是一个链表的链表
EndOfBridge = new bool[_subscriptions];
SubscriptionReaderManagers = new SubscriptionDataReader[_subscriptions];//初始化读者列表
RealtimePrices = new List<decimal>(_subscriptions);//初始化实时价格数据列表
_frontierTime = new DateTime[_subscriptions];

//Class Privates:
_job = job;//相关任务
_algorithm = algorithm;//相关算法
_endOfStreams = false;
_bridgeMax = _bridgeMax / _subscriptions;

//Initialize arrays:
for (var i = 0; i < _subscriptions; i++)
{
_frontierTime[i] = job.PeriodStart;
EndOfBridge[i] = false;
Bridge[i] = new ConcurrentQueue<List<BaseData>>();//分配每个订阅桥节点的数据链表
//为每个订阅分配读者
SubscriptionReaderManagers[i] = new SubscriptionDataReader(Subscriptions[i], algorithm.Securities[Subscriptions[i].Symbol], DataFeedEndpoint.Database, job.PeriodStart, job.PeriodFinish);

}
}

/// <summary>
/// Launch the primary data thread.
/// 读数据的线程主函数
/// </summary>
public virtual void Run()
{
while (!_exitTriggered && IsActive && !EndOfBridges)
{
for (var i = 0; i < Subscriptions.Count; i++)
{
//With each subscription; fetch the next increment of data from the queues:
//为每一个订阅,读取下一个数据
var subscription = Subscriptions[i];//第i个证券订阅

if (Bridge[i].Count < 10000 && !EndOfBridge[i])//确定该证券读取的数据个数没有超出界限
{
var data = GetData(subscription);//读取数据的函数,返回数据

//Comment out for live databases, where we should continue asking even if no data.
if (data.Count == 0)//如果这个订阅没有数据,那么这个订阅就读取结束,跳到下一个订阅读取
{
EndOfBridge[i] = true;//本订阅读取结束
continue;
}

////Insert data into bridge, each list is time-grouped. Assume all different time-groups.
foreach (var obj in data)
{
Bridge[i].Enqueue(new List<BaseData>() { obj });
}

////Record the furthest moment in time.
_frontierTime[i] = data.Max(bar => bar.Time);
}
}
//Set the most backward moment in time we've loaded
LoadedDataFrontier = _frontierTime.Min();
}

IsActive = false;
}

/// <summary>
/// Get the next set of data for this subscription
/// 获取该订阅的下一集合数据
/// </summary>
/// <param name="subscription"></param>
/// <returns></returns>
public abstract List<BaseData> GetData(SubscriptionDataConfig subscription);

/// <summary>
/// Send an exit signal to the thread.
/// </summary>
public virtual void Exit()
{
_exitTriggered = true;
PurgeData();
}

/// <summary>
/// Loop over all the queues and clear them to fast-quit this thread and return to main.
/// </summary>
public virtual void PurgeData()
{
foreach (var t in Bridge)
{
t.Clear();
}
}
}
}
3  FileSystemDataFeed文件系统数据槽
namespace QuantConnect.Lean.Engine.DataFeeds
{
/********************************************************
* CLASS DEFINITIONS
*********************************************************/
/// <summary>
/// Historical datafeed stream reader for processing files on a local disk.
/// 从本地磁盘加载历史数据
/// </summary>
/// <remarks>Filesystem datafeeds are incredibly fast</remarks>
public class FileSystemDataFeed : IDataFeed
{
/********************************************************
* CLASS VARIABLES
*********************************************************/
// Set types in public area to speed up:
private IAlgorithm _algorithm;
private BacktestNodePacket _job;
private bool _endOfStreams = false;
private int _subscriptions = 0;
private int _bridgeMax = 500000;
private bool _exitTriggered = false;

/********************************************************
* CLASS PROPERTIES
*********************************************************/
/// <summary>
/// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
/// </summary>
public List<SubscriptionDataConfig> Subscriptions { get; private set; }

/// <summary>
/// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
/// </summary>
/// <remarks>Indexed in order of the subscriptions</remarks>
public List<decimal> RealtimePrices { get; private set; }

/// <summary>
/// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
/// </summary>
public ConcurrentQueue<List<BaseData>>[] Bridge { get; set; }

/// <summary>
/// Set the source of the data we're requesting for the type-readers to know where to get data from.
/// </summary>
/// <remarks>Live or Backtesting Datafeed</remarks>
public DataFeedEndpoint DataFeed { get; set; }

/// <summary>
/// Flag indicating the hander thread is completely finished and ready to dispose.
/// </summary>
public bool IsActive { get; private set; }

/// <summary>
/// Flag indicating the file system has loaded all files.
/// </summary>
public bool LoadingComplete { get; private set; }

/// <summary>
/// Furthest point in time that the data has loaded into the bridges.
/// </summary>
public DateTime LoadedDataFrontier { get; private set; }

/// <summary>
/// Stream created from the configuration settings.
/// </summary>
private SubscriptionDataReader[] SubscriptionReaders { get; set; }

/// <summary>
/// Signifying no more data across all bridges
/// </summary>
public bool EndOfBridges
{
get
{
for (var i = 0; i < Bridge.Length; i++)
{
if (Bridge[i].Count != 0 || EndOfBridge[i] != true || _endOfStreams != true)
{
return false;
}
}
return true;
}
}

/// <summary>
/// End of Stream for Each Bridge:
/// </summary>
public bool[] EndOfBridge { get; set; }

/// <summary>
/// Frontiers for each fill forward high water mark
/// </summary>
public DateTime[] FillForwardFrontiers;

/********************************************************
* CLASS CONSTRUCTOR
*********************************************************/
/// <summary>
/// Create a new backtesting data feed.
/// </summary>
/// <param name="algorithm">Instance of the algorithm</param>
/// <param name="job">Algorithm work task</param>
public FileSystemDataFeed(IAlgorithm algorithm, BacktestNodePacket job)
{
Console.WriteLine("FileSystemDataFeed,algorithm:" + algorithm + ",job: " + job);
Subscriptions = algorithm.SubscriptionManager.Subscriptions;
Console.WriteLine("Subscriptions.count:" + Subscriptions.Count);
_subscriptions = Subscriptions.Count;

//Public Properties:
DataFeed = DataFeedEndpoint.FileSystem;
IsActive = true;
Bridge = new ConcurrentQueue<List<BaseData>>[_subscriptions];
EndOfBridge = new bool[_subscriptions];
SubscriptionReaders = new SubscriptionDataReader[_subscriptions];
FillForwardFrontiers = new DateTime[_subscriptions];
RealtimePrices = new List<decimal>(_subscriptions);

//Class Privates:
_job = job;
_algorithm = algorithm;
_endOfStreams = false;
_bridgeMax = _bridgeMax / _subscriptions; //Set the bridge maximum count:

for (var i = 0; i < _subscriptions; i++)
{
//Create a new instance in the dictionary:
Bridge[i] = new ConcurrentQueue<List<BaseData>>();
EndOfBridge[i] = false;

SubscriptionReaders[i] = new SubscriptionDataReader(Subscriptions[i], _algorithm.Securities[Subscriptions[i].Symbol], DataFeed, _job.PeriodStart, _job.PeriodFinish);
FillForwardFrontiers[i] = new DateTime();
}
}

/********************************************************
* CLASS METHODS
*********************************************************/
/// <summary>
/// Main routine for datafeed analysis.
/// </summary>
/// <remarks>This is a hot-thread and should be kept extremely lean. Modify with caution.</remarks>
public void Run()
{
Log.Trace("debug FileSystemDataFeed.run()");
Console.WriteLine("FileSystemDataFeed.run()");
//Calculate the increment based on the subscriptions:
var tradeBarIncrements = CalculateIncrement(includeTick: false);
var increment = CalculateIncrement(includeTick: true);

//Loop over each date in the job
foreach (var date in Time.EachTradeableDay(_algorithm.Securities, _job.PeriodStart, _job.PeriodFinish))
{
Log.Trace("in trading date:"+date+",PeriodStart:"+_job.PeriodStart+",PeriodFinish:"+_job.PeriodFinish);
//Update the source-URL from the BaseData, reset the frontier to today. Update the source URL once per day.
// this is really the next frontier in the future
var frontier = date.Add(increment);
var activeStreams = _subscriptions;
Log.Trace("subscription:" + _subscriptions);
//Initialize the feeds to this date:
for (var i = 0; i < _subscriptions; i++)
{
//Don't refresh source when we know the market is closed for this security:
Log.Trace("i:"+i+"subscription");
var success = SubscriptionReaders[i].RefreshSource(date);

//If we know the market is closed for security then can declare bridge closed.
if (success) {
EndOfBridge[i] = false;
}
else
{
ProcessMissingFileFillForward(SubscriptionReaders[i], i, tradeBarIncrements, date);
EndOfBridge[i] = true;
}
}

//Pause the DataFeed
var bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax);
var bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0);
var active = GetActiveStreams();

//Pause here while bridges are full, but allow missing files to pass
while (bridgeFullCount > 0 && ((_subscriptions - active) == bridgeZeroCount) && !_exitTriggered)
{
bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax);
bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0);
Thread.Sleep(5);
}

// for each smallest resolution
var datePlusOneDay = date.Date.AddDays(1);
while ((frontier.Date == date.Date || frontier.Date == datePlusOneDay) && !_exitTriggered)
{
var cache = new List<BaseData>[_subscriptions];

//Reset Loop:
long earlyBirdTicks = 0;

//Go over all the subscriptions, one by one add a minute of data to the bridge.
//对所订阅的证券进行一个个的加载,加载到数据桥中
for (var i = 0; i < _subscriptions; i++)
{
//Get the reader manager:获得第i个证券的读者
var manager = SubscriptionReaders[i];

//End of the manager stream set flag to end bridge: also if the EOB flag set, from the refresh source method above
if (manager.EndOfStream || EndOfBridge[i])
{
EndOfBridge[i] = true;
activeStreams = GetActiveStreams();
if (activeStreams == 0)
{
frontier = frontier.Date + TimeSpan.FromDays(1);
}
continue;
}

//Initialize data store:
cache[i] = new List<BaseData>(2);

//Add the last iteration to the new list: only if it falls into this time category
//下面这个代码很关键,它把当前读到的数据条放到该证券对应的链表里面
var cacheAtIndex = cache[i];
while (manager.Current.EndTime < frontier)
{
Log.Trace("Current:symbol:" + manager.Current.Symbol + ",price" + manager.Current.Price);
cacheAtIndex.Add(manager.Current);//放Current到该证券对应的链表里面
Log.Trace(string.Format("FileSystemDataFeed,Current: {0}", manager.Current));
if (!manager.MoveNext()) break;//读取下一个数据
}

//Save the next earliest time from the bridges: only if we're not filling forward.
if (manager.Current != null)
{
if (earlyBirdTicks == 0 || manager.Current.EndTime.Ticks < earlyBirdTicks)
{
earlyBirdTicks = manager.Current.EndTime.Ticks;
}
}
}

if (activeStreams == 0)
{
break;
}

//Add all the lists to the bridge, release the bridge
//we push all the data up to this frontier into the bridge at once
for (var i = 0; i < _subscriptions; i++)
{
if (cache[i] != null && cache[i].Count > 0)
{
FillForwardFrontiers[i] = cache[i][0].EndTime;
Bridge[i].Enqueue(cache[i]);
}
ProcessFillForward(SubscriptionReaders[i], i, tradeBarIncrements);
}

//This will let consumers know we have loaded data up to this date
//So that the data stream doesn't pull off data from the same time period in different events
LoadedDataFrontier = frontier;

if (earlyBirdTicks > 0 && earlyBirdTicks > frontier.Ticks)
{
//Jump increment to the nearest second, in the future: Round down, add increment
frontier = (new DateTime(earlyBirdTicks)).RoundDown(increment) + increment;
}
else
{
//Otherwise step one forward.
frontier += increment;
}

} // End of This Day.

if (_exitTriggered) break;

} // End of All Days:

Log.Trace(DataFeed + ".Run(): Data Feed Completed.");
LoadingComplete = true;

//Make sure all bridges empty before declaring "end of bridge":
while (!EndOfBridges && !_exitTriggered)
{
for (var i = 0; i < _subscriptions; i++)
{
//Nothing left in the bridge, mark it as finished
if (Bridge[i].Count == 0)
{
EndOfBridge[i] = true;
}
}
if (GetActiveStreams() == 0) _endOfStreams = true;
Thread.Sleep(100);
}

//Close up all streams:
for (var i = 0; i < Subscriptions.Count; i++)
{
SubscriptionReaders[i].Dispose();
}

Log.Trace(DataFeed + ".Run(): Ending Thread... ");
IsActive = false;
}

/// <summary>
/// Send an exit signal to the thread.
/// 退出该线程
/// </summary>
public void Exit()
{
_exitTriggered = true;
PurgeData();
}

/// <summary>
/// Loop over all the queues and clear them to fast-quit this thread and return to main.
/// 清除缓存
/// </summary>
public void PurgeData()
{
foreach (var t in Bridge)
{
t.Clear();
}
}

private void ProcessMissingFileFillForward(SubscriptionDataReader manager, int i, TimeSpan increment, DateTime dateToFill)
{
// we'll copy the current into the next day
var subscription = Subscriptions[i];
if (!subscription.FillDataForward || manager.Current == null) return;

var start = dateToFill.Date + manager.Exchange.MarketOpen;
if (subscription.ExtendedMarketHours)
{
start = dateToFill.Date + manager.Exchange.ExtendedMarketOpen;
}

// shift the 'start' time to the end of the bar by adding the increment, this makes 'date'
// the end time which also allows the market open functions to behave as expected

var current = manager.Current;
for (var endTime = start.Add(increment); endTime.Date == dateToFill.Date; endTime = endTime + increment)
{
if (manager.IsMarketOpen(endTime) || (subscription.ExtendedMarketHours && manager.IsExtendedMarketOpen(endTime)))
{
EnqueueFillForwardData(i, current, endTime);
}
else
{
// stop fill forwarding when we're no longer open
break;
}
}
}

/// <summary>
/// If this is a fillforward subscription, look at the previous time, and current time, and add new
/// objects to queue until current time to fill up the gaps.
/// </summary>
/// <param name="manager">Subscription to process</param>
/// <param name="i">Subscription position in the bridge ( which queue are we pushing data to )</param>
/// <param name="increment">Timespan increment to jump the fillforward results</param>
private void ProcessFillForward(SubscriptionDataReader manager, int i, TimeSpan increment)
{
// If previous == null cannot fill forward nothing there to move forward (e.g. cases where file not found on first file).
if (!Subscriptions[i].FillDataForward || manager.Previous == null || manager.Current == null) return;

//Last tradebar and the current one we're about to add to queue:
var previous = manager.Previous;
var current = manager.Current;

// final two points of file that ends at midnight, causes issues in the day rollover/fill forward
if (current.EndTime.TimeOfDay.Ticks == 0 && previous.EndTime == current.Time)
{
return;
}

//Initialize the frontier:
if (FillForwardFrontiers[i].Ticks == 0) FillForwardFrontiers[i] = previous.EndTime;

// using the previous to fill forward since 'current' is ahead the frontier
var whatToFill = previous;
// using current.EndTime as fill until because it's the next piece of data we have for this subscription
var fillUntil = current.EndTime;

//Data ended before the market closed: premature ending flag - continue filling forward until market close.
if (manager.EndOfStream && manager.IsMarketOpen(current.EndTime))
{
//Make sure we only fill forward to end of *today* -- don't fill forward tomorrow just because its also open
fillUntil = FillForwardFrontiers[i].Date.AddDays(1);
// since we ran out of data, use the current as the clone source, it's more recent than previous
whatToFill = current;
}

// loop from our last time (previous.EndTime) to our current.EndTime, filling in all missing day during
// request market hours
for (var endTime = FillForwardFrontiers[i] + increment; (endTime < fillUntil); endTime = endTime + increment)
{
if (Subscriptions[i].ExtendedMarketHours)
{
if (!manager.IsExtendedMarketOpen(endTime.Subtract(increment)))
{
//If we've asked for extended hours, and the security is no longer inside extended market hours, skip:
continue;
}
}
else
{
// if the market isn't open skip to the current.EndTime and rewind until the market is open
// this is the case where the previous value is from yesterday but we're trying to fill forward
// the next day, so instead of zooming through 18 hours of off-market hours, skip to our current data
// point and rewind the market open.
//
// E.g, Current.EndTime = 9:40am and Previous.EndTime = 2:00pm, so fill in from 2->4pm
// and then skip to 9:40am, reverse to 9:30am and fill from 9:30->9:40
if (!manager.IsMarketOpen(endTime.Subtract(increment)) && Subscriptions[i].Resolution != Resolution.Daily)
{
// Move fill forward so we don't waste time in this tight loop.
endTime = fillUntil;
do
{
endTime = endTime - increment;
}
// is market open assumes start time of bars, open at 9:30 closed at 4:00
// so decrement our date to use the start time
while (manager.IsMarketOpen(endTime.Subtract(increment)));
continue;
}
}

// add any overlap condition here
if (Subscriptions[i].Resolution == Resolution.Daily)
{
// handle fill forward on lower resolutions
var barStartTime = endTime - increment;
if (manager.Exchange.IsOpenDuringBar(barStartTime, endTime, Subscriptions[i].ExtendedMarketHours))
{
EnqueueFillForwardData(i, previous, endTime);
}
// special case catch missing days
else if (endTime.TimeOfDay.Ticks == 0 && manager.Exchange.DateIsOpen(endTime.Date.AddDays(-1)))
{
EnqueueFillForwardData(i, previous, endTime);
}
continue;
}

EnqueueFillForwardData(i, whatToFill, endTime);
}
}

private void EnqueueFillForwardData(int i, BaseData previous, DateTime dataEndTime)
{
var cache = new List<BaseData>(1);
var fillforward = previous.Clone(true);
fillforward.Time = dataEndTime.Subtract(Subscriptions[i].Increment);
fillforward.EndTime = dataEndTime;
FillForwardFrontiers[i] = dataEndTime;
cache.Add(fillforward);
Bridge[i].Enqueue(cache);
}

/// <summary>
/// Get the number of active streams still EndOfBridge array.
/// </summary>
/// <returns>Count of the number of streams with data</returns>
private int GetActiveStreams()
{
//Get the number of active streams:
var activeStreams = (from stream in EndOfBridge
where stream == false
select stream).Count();
return activeStreams;
}

/// <summary>
/// Calculate the minimum increment to scan for data based on the data requested.
/// </summary>
/// <param name="includeTick">When true the subscriptions include a tick data source, meaning there is almost no increment.</param>
/// <returns>Timespan to jump the data source so it efficiently orders the results</returns>
private TimeSpan CalculateIncrement(bool includeTick)
{
var increment = TimeSpan.FromDays(1);
foreach (var config in Subscriptions)
{
switch (config.Resolution)
{
//Hourly TradeBars:
case Resolution.Hour:
if (increment > TimeSpan.FromHours(1))
{
increment = TimeSpan.FromHours(1);
}
break;

//Minutely TradeBars:
case Resolution.Minute:
if (increment > TimeSpan.FromMinutes(1))
{
increment = TimeSpan.FromMinutes(1);
}
break;

//Secondly Bars:
case Resolution.Second:
if (increment > TimeSpan.FromSeconds(1))
{
increment = TimeSpan.FromSeconds(1);
}
break;

//Ticks: No increment; just fire each data piece in as they happen.
case Resolution.Tick:
if (increment > TimeSpan.FromMilliseconds(1) && includeTick)
{
increment = new TimeSpan(0, 0, 0, 0, 1);
}
break;
}
}
return increment;
}

} // End FileSystem Local Feed Class:
} // End Namespace


4. BackTestingDataFeed 回归测试数据槽
namespace QuantConnect.Lean.Engine.DataFeeds
{
/********************************************************
* CLASS DEFINITIONS
*********************************************************/
/// <summary>
/// Backtesting data feed extends the filesystem data feed with almost no modifications. Later this method can
/// be used for implementing alternative sources/generation for backtesting data.
/// 回归测试数据槽是文件系统数据槽的派生类
/// </summary>
public class BacktestingDataFeed : FileSystemDataFeed
{
/********************************************************
* CLASS VARIABLES
*********************************************************/

/********************************************************
* CLASS PROPERTIES
*********************************************************/

/********************************************************
* CLASS CONSTRUCTOR
*********************************************************/
/// <summary>
/// Pass through the backtesting datafeed to the underlying file system datafeed implementation.
/// </summary>
/// <param name="algorithm">Algorithm we're operating with</param>
/// <param name="job">Algorithm worker job</param>
public BacktestingDataFeed(IAlgorithm algorithm, BacktestNodePacket job) : base(algorithm, job)
{
DataFeed = DataFeedEndpoint.Backtesting;
}
} // End Backtesting Feed Class:
} // End Namespace


此外还有数据库数据槽DataBaseDataFeed和LiveTradingDataFeed实时交易数据槽。在这里就不在说明。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息