您的位置:首页 > 编程语言 > C#

c#线程实现生产者消费者

2012-05-05 16:03 295 查看
public interface IThreadWorker : IDisposable
{
void RealWorker();
}

public class ThreadController
{
private Queue<IThreadWorker> WorkQueue = new Queue<IThreadWorker>();
private int mMaxThreadCount = 5;
private ManualResetEvent[] mCustomerSyncEvents = null;
private ManualResetEvent[] mCustomerFinishEvents = null;
public ManualResetEvent ProducterFinishEvent = new ManualResetEvent(false);
private string ThreadName = string.Empty;
//private bool Started = false;

public ThreadController()
: this(5, string.Empty)
{ }

public ThreadController(int maxThreadCount)
: this(maxThreadCount, string.Empty)
{ }

public ThreadController(int maxThreadCount, string name)
{
ThreadName = name;
mMaxThreadCount = maxThreadCount;
mCustomerSyncEvents = new ManualResetEvent[maxThreadCount];
mCustomerFinishEvents = new ManualResetEvent[maxThreadCount];
for (int i = 0; i < maxThreadCount; i++)
{
mCustomerSyncEvents[i] = new ManualResetEvent(true);
mCustomerFinishEvents[i] = new ManualResetEvent(false);
}
StartRun();
}

public void AddJobQueue(IThreadWorker info)
{
lock (((ICollection)WorkQueue).SyncRoot)
{
WorkQueue.Enqueue(info);
}
//if (!Started)
//{
//    StartRun();
//    Started = true;
//}
}

private void StartRun()
{
for (int i = 0; i < mMaxThreadCount; i++)
{
CustomerSyncEventController sync = new CustomerSyncEventController(mCustomerSyncEvents[i], mCustomerFinishEvents[i], ProducterFinishEvent);
Thread thread = new Thread(new ParameterizedThreadStart(RealWork));
thread.Start(sync);
}
}

public bool Completed()
{
return WaitHandle.WaitAll(mCustomerFinishEvents);
}

private void RealWork(object obj)
{
CustomerSyncEventController sync = obj as CustomerSyncEventController;

while (sync.CustomerSyncEvent.WaitOne())
{
sync.CustomerSyncEvent.Reset();
try
{
IThreadWorker instance = null;
if (sync.ProducterFinishEvent.WaitOne(10, true))
{
lock (((ICollection)WorkQueue).SyncRoot)
{
if (WorkQueue.Count == 0)
{
//Console.WriteLine("{0}, set", sync.i);
sync.CustomerFinishEvent.Set();
break;
}
else
{
instance = WorkQueue.Dequeue();
}
}
}
else
{
// Console.WriteLine("{0}, false", sync.i);
lock (((ICollection)WorkQueue).SyncRoot)
{
if (WorkQueue.Count == 0)
{
//break;
}
else
{
instance = WorkQueue.Dequeue();
}
}

}
using (instance)
{
//Console.WriteLine("{0}, {1}", sync.i, info.info);
instance.RealWorker();
}
}
catch (Exception e)
{
}
finally
{
//Console.WriteLine("{0}, set", sync.i);
sync.CustomerSyncEvent.Set();
}
}

}

internal class CustomerSyncEventController
{
public ManualResetEvent CustomerSyncEvent { get; set; }
public ManualResetEvent CustomerFinishEvent { get; set; }
public ManualResetEvent ProducterFinishEvent { get; set; }
public CustomerSyncEventController()
{
CustomerSyncEvent = new ManualResetEvent(false);
CustomerFinishEvent = new ManualResetEvent(false);
ProducterFinishEvent = new ManualResetEvent(true);
}
public CustomerSyncEventController(ManualResetEvent customerSync, ManualResetEvent customerFinish, ManualResetEvent producterFinish)
{
CustomerSyncEvent = customerSync;
CustomerFinishEvent = customerFinish;
ProducterFinishEvent = producterFinish;
}
}
}


 

public class ImplementClass : IThreadWorker
{
public string Content { get; set; }
public ImplementClass(string content)
{
Content = content;
}

#region IThreadWorker Members

public void RealWorker()
{
Console.WriteLine(Content);
}

#endregion

#region IDisposable Members

public void Dispose()
{
}

#endregion
}


 

static void Main(string[] args)
{
List<ImplementClass> list = new List<ImplementClass>();
for (int i = 0; i <= 100; i++)
{
list.Add(new ImplementClass(i.ToString()));
}
ThreadController monitor = new ThreadController(32);
foreach (ImplementClass ic in list)
{
monitor.AddJobQueue(ic);
}
monitor.ProducterFinishEvent.Set();
if (monitor.Completed())
{ }

}

实现自己的IThreadWorker,

不要忘记调用

monitor.ProducterFinishEvent.Set(),证明队列中不会再增加新的内容

monitor.Completed()保证所有的消费者都完成了工作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息