TPL Dataflow .Net 数据流组件,了解一下?
回顾上文
作为单体程序,依赖的第三方服务虽不多,但是2C的程序还是有不少内容可讲; 作为一个常规互联网系统,无外乎就是接受请求、处理请求,输出响应。
由于业务渐渐增长,数据处理的过程会越来越复杂和冗长,【连贯高效的处理数据】 越来越被看重, .Net 提供了TPL Dataflow组件使我们更高效的实现基于数据流和 流水线操作的代码。
下图是单体程序中 数据处理的用例图。
public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase { private readonly string _dirPath; private readonly Timer _triggerBatchTimer; private readonly Timer _openFileTimer; private DateTime? _nextCheckpoint; private TextWriter _currentWriter; private readonly LogHead _logHead; private readonly object _syncRoot = new object(); private readonly ILogger _logger; private readonly BatchBlock<T> _packer; private readonly ActionBlock<T[]> batchWriterBlock; private readonly TimeSpan _logFileIntervalTimeSpan; /// <summary> /// Generate request log file. /// </summary> public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger<LogBatchBlock<T>>(); _dirPath = logConfig.DirPath; if (!Directory.Exists(_dirPath)) { Directory.CreateDirectory(_dirPath); } _logHead = logConfig.LogHead; _packer = new BatchBlock<T>(logConfig.BatchSize); batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models)); _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true }); _triggerBatchTimer = new Timer(state => { _packer.TriggerBatch(); }, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period)); _logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval); _openFileTimer = new Timer(state => { AlignCurrentFileTo(DateTime.Now); }, null, TimeSpan.Zero, _logFileIntervalTimeSpan); } public ITargetBlock<T> InputBlock => _packer; private void AlignCurrentFileTo(DateTime dt) { if (!_nextCheckpoint.HasValue) { OpenFile(dt); } if (dt >= _nextCheckpoint.Value) { CloseFile(); OpenFile(dt); } } private void OpenFile(DateTime now, string fileSuffix = null) { string filePath = null; try { var currentHour = now.Date.AddHours(now.Hour); _nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan); int hourConfiguration = _logFileIntervalTimeSpan.Hours; int minuteConfiguration = _logFileIntervalTimeSpan.Minutes; filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log"; var appendHead = !File.Exists(filePath); if (filePath != null) { var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write); var sw = new StreamWriter(stream, Encoding.Default); if (appendHead) { sw.Write(GenerateHead()); } _currentWriter = sw; _logger.LogDebug($"{currentHour} TextWriter has been created."); } } catch (UnauthorizedAccessException ex) { _logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex); throw; } catch (Exception e) { if (fileSuffix == null) { _logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace); OpenFile(now, $"-{Guid.NewGuid()}"); } else { _logger.LogError($"OpenFile failed after retry: {filePath}", e); throw; } } } private void CloseFile() { if (_currentWriter != null) { _currentWriter.Flush(); _currentWriter.Dispose(); _currentWriter = null; _logger.LogDebug($"{DateTime.Now} TextWriter has been disposed."); } _nextCheckpoint = null; } private string GenerateHead() { StringBuilder head = new StringBuilder(); head.AppendLine("#Software: " + _logHead.Software) .AppendLine("#Version: " + _logHead.Version) .AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}") .AppendLine("#Fields: " + _logHead.Fields); return head.ToString(); } private void WriteToFile(T[] models) { try { lock (_syncRoot) { var flag = false; foreach (var model in models) { if (model == null) continue; flag = true; AlignCurrentFileTo(model.ServerLocalTime); _currentWriter.WriteLine(model.ToString()); } if (flag) _currentWriter.Flush(); } } catch (Exception ex) { _logger.LogError("WriteToFile Error : {0}", ex.Message); } } public bool AcceptLogModel(T model) { return _packer.Post(model); } public string GetDirPath() { return _dirPath; } public async Task CompleteAsync() { _triggerBatchTimer.Dispose(); _openFileTimer.Dispose(); _packer.TriggerBatch(); _packer.Complete(); await InputBlock.Completion; lock (_syncRoot) { CloseFile(); } } }仿IIS日志存储代码 ② 异常处理
上述程序在部署时就遇到相关的坑位,在测试环境_eqid2ModelTransformBlock 内Func委托稳定执行,程序并未出现异样;
部署到生产之后, 该Pipeline会运行一段时间就停止工作,一直很困惑, 后来通过监测_eqid2ModelTransformBlock.Completion 属性,该块提前进入“完成态” : 程序在执行某次Func委托时报错,Block提前进入完成态
TransfomrBlock.Completion 一个Task对象,当TPL Dataflow不再处理消息并且能保证不再处理消息的时候,就被定义为完成态, Task对象的TaskStatus枚举值将标记此Block进入完成态的真实原因
- TaskStatus.RanToCompletion 根据Block定义的任务成功完成
- TaskStatus.Fault 因为未处理的异常 导致"过早的完成"
- TaskStatus.Cancled 因为取消操作 导致 "过早的完成"
我们需要小心处理异常, 一般情况下我们使用try、catch包含所有的执行代码以确保所有的异常都被处理。
可将TPL Dataflow 做为进程内消息队列,本文只是一个入门参考,更多复杂用法还是看官网, 你需要记住的是, 这是一个.Net 进程内数据流组件, 能让你专注于流程。
作者:JulianHuang
感谢您的认真阅读,如有问题请大胆斧正;觉得有用,请下方 或加关注。
本文欢迎转载,但请保留此段声明,且在文章页面明显位置注明本文的作者及原文链接。
- 为SSIS编写自定义数据流组件(DataFlow Component)之进阶篇:数据源组件
- 为SSIS编写自定义数据流组件(DataFlow Component)之进阶篇:数据源组件
- 最近在研究 .NET 组件编程,感觉不错,对底层更了解一些了
- .NET 反编译调试神器:dnSpy了解一下
- 在.net 4.0程序中使用TPL Dataflow
- .Net常用事务(了解一下)
- 为SSIS编写自定义数据流组件(DataFlow Component)之入门篇
- 为SSIS编写自定义数据流组件(DataFlow Component)之入门篇
- 为SSIS编写自定义数据流组件(DataFlow Component)之进阶篇:自定义编辑器
- 为SSIS编写自定义数据流组件(DataFlow Component)之进阶篇:自定义编辑器
- .net爬虫了解一下
- ADO.NET .net core2.0添加json文件并转化成类注入控制器使用 简单了解 iTextSharp实现HTML to PDF ASP.NET MVC 中 Autofac依赖注入DI 控制反转IOC 了解一下 C# AutoMapper 了解一下
- .NET 使用 ODP.NET 组件访问Oracle提示 外部组件发生异常 错误的解决办法
- 在.net开发中使用Log4Net组件
- MeeGo系“.NET研究”统1.2版本新组件
- 了解一下gant
- RDIFramework.NET ━ .NET快速信息化系统开发框架 ━ 工作流程组件介绍
- .NET使用NPOI组件将数据导出Excel(转)
- .net重要的开源组件[更新中]
- process.argv简单了解一下