您的位置:首页 > 编程语言 > C#

c# 安全队列

2016-09-22 21:05 176 查看
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AA
{
public class AsynQueue<T>
{
//队列是否正在处理数据
private int isProcessing;
//有线程正在处理数据
private const int Processing = 1;
//没有线程处理数据
private const int UnProcessing = 0;
//队列是否可用
private volatile bool enabled = true;
private Task currentTask;
public event Action<T> ProcessItemFunction;
public event EventHandler<EventArgs<Exception>> ProcessException;
private ConcurrentQueue<T> queue;

public AsynQueue()
{
queue = new ConcurrentQueue<T>();
Start();
}

public int Count
{
get
{
return queue.Count;
}
}

private void Start()
{
Thread process_Thread = new Thread(PorcessItem);
process_Thread.IsBackground = true;
process_Thread.Start();
}

public void Enqueue(T items)
{
if (items == null)
{
throw new ArgumentException("items");
}

queue.Enqueue(items);
DataAdded();
}

//数据添加完成后通知消费者线程处理
private void DataAdded()
{
if (enabled)
{
if (!IsProcessingItem())
{
currentTask = Task.Factory.StartNew(ProcessItemLoop);
}
}
}

//判断是否队列有线程正在处理
private bool IsProcessingItem()
{
return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0);
}

private void ProcessItemLoop()
{

if (!enabled && queue.IsEmpty)
{
Interlocked.Exchange(ref isProcessing, 0);
return;
}
T publishFrame;

if (queue.TryDequeue(out publishFrame))
{

try
{
ProcessItemFunction(publishFrame);
}
catch (Exception ex)
{
OnProcessException(ex);
}
}

if (enabled && !queue.IsEmpty)
{
currentTask = Task.Factory.StartNew(ProcessItemLoop);
}
else
{
Interlocked.Exchange(ref isProcessing, UnProcessing);
}
}

/// <summary>
///定时处理线程调用函数
///主要是监视入队的时候线程 没有来的及处理的情况
/// </summary>
private void PorcessItem(object state)
{
int sleepCount = 0;
int sleepTime = 1000;
while (enabled)
{
//如果队列为空则根据循环的次数确定睡眠的时间
if (queue.IsEmpty)
{
if (sleepCount == 0)
{
sleepTime = 1000;
}
else if (sleepCount <= 3)
{
sleepTime = 1000 * 3;
}
else
{
sleepTime = 1000 * 50;
}
sleepCount++;
Thread.Sleep(sleepTime);
}
else
{
//判断是否队列有线程正在处理
if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0)
{
if (!queue.IsEmpty)
{
currentTask = Task.Factory.StartNew(ProcessItemLoop);
}
else
{
Interlocked.Exchange(ref isProcessing, 0);
}
sleepCount = 0;
sleepTime = 1000;
}
}
}
}

public void Flsuh()
{
Stop();

if (currentTask != null)
{
currentTask.Wait();
}

while (!queue.IsEmpty)
{
try
{
T publishFrame;
if (queue.TryDequeue(out publishFrame))
{
ProcessItemFunction(publishFrame);
}
}
catch (Exception ex)
{
OnProcessException(ex);
}
}
currentTask = null;
}

public void Stop()
{
this.enabled = false;
}

private void OnProcessException(System.Exception ex)
{
var tempException = ProcessException;
Interlocked.CompareExchange(ref ProcessException, null, null);

if (tempException != null)
{
ProcessException(ex, new EventArgs<Exception>(ex));
}
}

[Serializable]
public class EventArgs<T> : System.EventArgs
{

public T Argument;

public EventArgs() : this(default(T))
{
}

public EventArgs(T argument)
{
Argument = argument;
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: