您的位置:首页 > 编程语言 > PHP开发

TPL Dataflow .Net 数据流组件,了解一下?

2019-07-12 19:04 2759 查看

回顾上文

  作为单体程序,依赖的第三方服务虽不多,但是2C的程序还是有不少内容可讲; 作为一个常规互联网系统,无外乎就是接受请求、处理请求,输出响应。

由于业务渐渐增长,数据处理的过程会越来越复杂和冗长,【连贯高效的处理数据】 越来越被看重,  .Net 提供了TPL  Dataflow组件使我们更高效的实现基于数据流和 流水线操作的代码

    下图是单体程序中 数据处理的用例图。

 

public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase
{
private readonly string _dirPath;
private readonly Timer _triggerBatchTimer;
private readonly Timer _openFileTimer;
private DateTime? _nextCheckpoint;
private TextWriter _currentWriter;
private readonly LogHead _logHead;
private readonly object _syncRoot = new object();
private readonly ILogger _logger;
private readonly BatchBlock<T> _packer;
private readonly ActionBlock<T[]> batchWriterBlock;
private readonly TimeSpan _logFileIntervalTimeSpan;

/// <summary>
/// Generate  request log file.
/// </summary>
public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<LogBatchBlock<T>>();

_dirPath = logConfig.DirPath;
if (!Directory.Exists(_dirPath))
{
Directory.CreateDirectory(_dirPath);
}
_logHead = logConfig.LogHead;

_packer = new BatchBlock<T>(logConfig.BatchSize);
batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models));
_packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });

_triggerBatchTimer = new Timer(state =>
{
_packer.TriggerBatch();
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));

_logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval);
_openFileTimer = new Timer(state =>
{
AlignCurrentFileTo(DateTime.Now);
}, null, TimeSpan.Zero, _logFileIntervalTimeSpan);
}

public ITargetBlock<T> InputBlock => _packer;

private void AlignCurrentFileTo(DateTime dt)
{
if (!_nextCheckpoint.HasValue)
{
OpenFile(dt);
}
if (dt >= _nextCheckpoint.Value)
{
CloseFile();
OpenFile(dt);
}
}

private void OpenFile(DateTime now, string fileSuffix = null)
{
string filePath = null;
try
{
var currentHour = now.Date.AddHours(now.Hour);
_nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan);
int hourConfiguration = _logFileIntervalTimeSpan.Hours;
int minuteConfiguration = _logFileIntervalTimeSpan.Minutes;
filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log";

var appendHead = !File.Exists(filePath);
if (filePath != null)
{
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write);
var sw = new StreamWriter(stream, Encoding.Default);
if (appendHead)
{
sw.Write(GenerateHead());
}
_currentWriter = sw;
_logger.LogDebug($"{currentHour} TextWriter has been created.");
}

}
catch (UnauthorizedAccessException ex)
{
_logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex);
throw;
}
catch (Exception e)
{
if (fileSuffix == null)
{
_logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace);
OpenFile(now, $"-{Guid.NewGuid()}");
}
else
{
_logger.LogError($"OpenFile failed after retry: {filePath}", e);
throw;
}
}
}

private void CloseFile()
{
if (_currentWriter != null)
{
_currentWriter.Flush();
_currentWriter.Dispose();
_currentWriter = null;
_logger.LogDebug($"{DateTime.Now} TextWriter has been disposed.");
}
_nextCheckpoint = null;
}

private string GenerateHead()
{
StringBuilder head = new StringBuilder();
head.AppendLine("#Software: " + _logHead.Software)
.AppendLine("#Version: " + _logHead.Version)
.AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}")
.AppendLine("#Fields: " + _logHead.Fields);
return head.ToString();
}

private void WriteToFile(T[] models)
{
try
{
lock (_syncRoot)
{
var flag = false;
foreach (var model in models)
{
if (model == null)
continue;
flag = true;
AlignCurrentFileTo(model.ServerLocalTime);
_currentWriter.WriteLine(model.ToString());
}
if (flag)
_currentWriter.Flush();
}
}
catch (Exception ex)
{
_logger.LogError("WriteToFile Error : {0}", ex.Message);
}
}

public bool AcceptLogModel(T model)
{
return _packer.Post(model);
}

public string GetDirPath()
{
return _dirPath;
}

public async Task CompleteAsync()
{
_triggerBatchTimer.Dispose();
_openFileTimer.Dispose();
_packer.TriggerBatch();
_packer.Complete();
await InputBlock.Completion;
lock (_syncRoot)
{
CloseFile();
}
}
}
仿IIS日志存储代码 ② 异常处理

  上述程序在部署时就遇到相关的坑位,在测试环境_eqid2ModelTransformBlock 内Func委托稳定执行,程序并未出现异样;

  部署到生产之后, 该Pipeline会运行一段时间就停止工作,一直很困惑, 后来通过监测_eqid2ModelTransformBlock.Completion 属性,该块提前进入“完成态”   :   程序在执行某次Func委托时报错,Block提前进入完成态

TransfomrBlock.Completion 一个Task对象,当TPL Dataflow不再处理消息并且能保证不再处理消息的时候,就被定义为完成态, Task对象的TaskStatus枚举值将标记此Block进入完成态的真实原因

- TaskStatus.RanToCompletion       根据Block定义的任务成功完成

- TaskStatus.Fault                            因为未处理的异常 导致"过早的完成"

- TaskStatus.Cancled                       因为取消操作 导致 "过早的完成"

  我们需要小心处理异常, 一般情况下我们使用try、catch包含所有的执行代码以确保所有的异常都被处理。

 

     可将TPL Dataflow 做为进程内消息队列,本文只是一个入门参考,更多复杂用法还是看官网, 你需要记住的是, 这是一个.Net 进程内数据流组件, 能让你专注于流程。

 

作者:JulianHuang

感谢您的认真阅读,如有问题请大胆斧正;觉得有用,请下方 或加关注。

本文欢迎转载,但请保留此段声明,且在文章页面明显位置注明本文的作者及原文链接。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: