基于阻塞队列的生产者消费者C#并发设计
2018-01-18 20:49
661 查看
这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>>,我改成适合我的版本了,直接给code:
调用code:
封装的队列:
调用code:
static void Main(string[] args) { ProcessQueue<int> processQueue = new ProcessQueue<int>(); processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent; processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent; for (int i = 0; i < 50; i++) { processQueue.Enqueue(i); } Console.WriteLine("阻塞队列的数量: {0}", processQueue.GetInternalItemCount()); processQueue.Flush(); Console.Read(); } /// <summary> /// 该方法对入队的每个元素进行处理 /// </summary> /// <param name="value"></param> private static void ProcessQueue_ProcessItemEvent(int value) { Console.WriteLine("输出: {0}", value); } /// <summary> /// 处理异常 /// </summary> /// <param name="obj">队列实例</param> /// <param name="ex">异常对象</param> /// <param name="value">出错的数据</param> private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value) { Console.WriteLine(ex.ToString()); }
封装的队列:
public class ProcessQueue<T> { private BlockingCollection<T> _queue; private CancellationTokenSource _cancellationTokenSource; private CancellationToken _cancellToken; //内部线程池 private List<Thread> _threadCollection; //队列是否正在处理数据 private int _isProcessing; //有线程正在处理数据 private const int Processing = 1; //没有线程处理数据 private const int UnProcessing = 0; //队列是否可用 单个线程下用while来判断,多个线程下用if判断,随后while循环队列的数量 private volatile bool _enabled = true; //内部处理线程数量 private int _internalThreadCount; // 消费者处理事件 public event Action<T> ProcessItemEvent; //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据 public event Action<dynamic, Exception, T> ProcessExceptionEvent; public ProcessQueue() { _queue = new BlockingCollection<T>(); _cancellationTokenSource = new CancellationTokenSource(); _internalThreadCount = 3; _cancellToken = _cancellationTokenSource.Token; _threadCollection = new List<Thread>(); } public ProcessQueue(int internalThreadCount) : this() { this._internalThreadCount = internalThreadCount; } /// <summary> /// 队列内部元素的数量 /// </summary> public int GetInternalItemCount() { //return _queue.Count; return _threadCollection.Count; } //生产者生产 public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } _queue.Add(items); DataAdded(); } public void Flush() { StopProcess(); while (_queue.Count != 0) { T item = default(T); if (_queue.TryTake(out item)) { try { ProcessItemEvent(item); } catch (Exception ex) { OnProcessException(ex, item); } } } } // 通知消费者消费队列元素 private void DataAdded() { if (_enabled) { if (!IsProcessingItem()) { Console.WriteLine("DataAdded"); ProcessRangeItem(); StartProcess(); } } } //判断是否队列有线程正在处理 private bool IsProcessingItem() { // 替换第一个参数, 如果相等 //int x = Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing); return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing); } // 多消费者消费 private void ProcessRangeItem() { for (int i = 0; i < this._internalThreadCount; i++) { ProcessItem(); } } // 开启消费处理 private void ProcessItem() { Thread currentThread = new Thread((state) => { T item = default(T); while (_enabled) { try { try { if (!_queue.TryTake(out item)) { //Console.WriteLine("阻塞队列为0时的item: {0}", item); //Console.WriteLine("ok!!!"); break; } // 处理事件 ProcessItemEvent(item); } catch (OperationCanceledException ex) { DebugHelper.DebugView(ex.ToString()); } } catch (Exception ex) { OnProcessException(ex, item); } } }); _threadCollection.Add(currentThread); } // 开启消费者 private void StartProcess() { //Console.WriteLine("线程的数量: {0}", _threadCollection.Count); foreach (var thread in _threadCollection) { thread.Start(); thread.IsBackground = true; } } // 终止运行 private void StopProcess() { this._enabled = false; foreach (var thread in _threadCollection) { if (thread.IsAlive) { thread.Join(); } } _threadCollection.Clear(); } private void OnProcessException(Exception ex, T item) { var tempException = ProcessExceptionEvent; Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null); if (tempException != null) { ProcessExceptionEvent(this, ex, item); } } }
相关文章推荐
- 基于异步队列的生产者消费者C#并发设计
- 生产者/消费者模式(阻塞队列) 一个经典的并发模型
- 基于阻塞队列实现消费者和生产者
- 【Java并发】阻塞队列BlockingQueue和生产者-消费者模型笔记
- 并发编程 03—— 阻塞队列和生产者-消费者模式
- 生产者/消费者模式(阻塞队列) 一个经典的并发模型
- 并发:阻塞队列与生产者消费者模型
- 生产者与消费者的快速实现——并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
- 基于阻塞队列实现消费者和生产者
- 11.python并发入门(part8 基于线程队列实现生产者消费者模型)
- Java并发编程笔记 使用阻塞队列实现生产者-消费者模型
- 高并发:阻塞队列 实现生产者-消费者模式
- 生产者/消费者模式(阻塞队列) 一个经典的并发模型
- 基于Java阻塞队列实现生产者与消费者模式
- 生产者/消费者模式(阻塞队列) 一个类似于监听者模式的并发模型
- 生产者/消费者模式(阻塞队列)
- 生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)
- 同步同容器类和工具类2——阻塞队列和生产者-消费者模式
- 基于并发包同步机制实现生产者-消费者模式
- 架构设计:生产者-缓存队列-消费者(5)