您的位置:首页 > 编程语言 > C#

c# 多线程排队队列实现的源码

2013-08-09 11:26 281 查看
using System;

using System.Threading;

using System.Collections;

using System.Collections.Generic;

// 将线程同步事件封装在此类中,

// 以便于将这些事件传递给 Consumer 和

// Producer 类。

public class SyncEvents

{

public SyncEvents()

{

// AutoResetEvent 用于“新项”事件,因为

// 我们希望每当使用者线程响应此事件时,

// 此事件就会自动重置。

_newItemEvent = new AutoResetEvent(false);

// ManualResetEvent 用于“退出”事件,因为

// 我们希望发出此事件的信号时有多个线程响应。

// 如果使用 AutoResetEvent,事件

// 对象将在单个线程作出响应之后恢复为

// 未发信号的状态,而其他线程将

// 无法终止。

_exitThreadEvent = new ManualResetEvent(false);

// 这两个事件也放在一个 WaitHandle 数组中,以便

// 使用者线程可以使用 WaitAny 方法

// 阻塞这两个事件。

_eventArray = new WaitHandle[2];

_eventArray[0] = _newItemEvent;

_eventArray[1] = _exitThreadEvent;

}

// 公共属性允许对事件进行安全访问。

public EventWaitHandle ExitThreadEvent

{

get { return _exitThreadEvent; }

}

public EventWaitHandle NewItemEvent

{

get { return _newItemEvent; }

}

public WaitHandle[] EventArray

{

get { return _eventArray; }

}

private EventWaitHandle _newItemEvent;

private EventWaitHandle _exitThreadEvent;

private WaitHandle[] _eventArray;

}

// Producer 类(使用一个辅助线程)

// 将项异步添加到队列中,共添加 20 个项。

public class Producer

{

public Producer(Queue<int> q, SyncEvents e)

{

_queue = q;

_syncEvents = e;

}

public void ThreadRun()

{

int count = 0;

Random r = new Random();

while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))

{

lock (((ICollection)_queue).SyncRoot)

{

while (_queue.Count < 20)

{

_queue.Enqueue(r.Next(0, 100));

_syncEvents.NewItemEvent.Set();

count++;

}

}

}

Console.WriteLine("Producer thread: produced {0} items", count);

}

private Queue<int> _queue;

private SyncEvents _syncEvents;

}

// Consumer 类通过自己的辅助线程使用队列

// 中的项。Producer 类使用 NewItemEvent

// 将新项通知 Consumer 类。

public class Consumer

{

public Consumer(Queue<int> q, SyncEvents e)

{

_queue = q;

_syncEvents = e;

}

public void ThreadRun()

{

int count = 0;

while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)

{

lock (((ICollection)_queue).SyncRoot)

{

int item = _queue.Dequeue();

}

count++;

}

Console.WriteLine("Consumer Thread: consumed {0} items", count);

}

private Queue<int> _queue;

private SyncEvents _syncEvents;

}

public class ThreadSyncSample

{

private static void ShowQueueContents(Queue<int> q)

{

// 对集合进行枚举本来就不是线程安全的,

// 因此在整个枚举过程中锁定集合以防止

// 使用者和制造者线程修改内容

// 是绝对必要的。(此方法仅由

// 主线程调用。)

lock (((ICollection)q).SyncRoot)

{

foreach (int i in q)

{

Console.Write("{0} ", i);

}

}

Console.WriteLine();

}

static void Main()

{

// 配置结构,该结构包含线程同步

// 所需的事件信息。

SyncEvents syncEvents = new SyncEvents();

// 泛型队列集合用于存储要制造和使用的

// 项。此例中使用的是“int”。

Queue<int> queue = new Queue<int>();

// 创建对象,一个用于制造项,一个用于

// 使用项。将队列和线程同步事件传递给

// 这两个对象。

Console.WriteLine("Configuring worker threads...");

Producer producer = new Producer(queue, syncEvents);

Consumer consumer = new Consumer(queue, syncEvents);

// 为制造者对象和使用者对象创建线程

// 对象。此步骤并不创建或启动

// 实际线程。

Thread producerThread = new Thread(producer.ThreadRun);

Thread consumerThread = new Thread(consumer.ThreadRun);

// 创建和启动两个线程。

Console.WriteLine("Launching producer and consumer threads...");

producerThread.Start();

consumerThread.Start();

// 为制造者线程和使用者线程设置 10 秒的运行时间。

// 使用主线程(执行此方法的线程)

// 每隔 2.5 秒显示一次队列内容。

for (int i = 0; i < 4; i++)

{

Thread.Sleep(2500);

ShowQueueContents(queue);

}

// 向使用者线程和制造者线程发出终止信号。

// 这两个线程都会响应,由于 ExitThreadEvent 是

// 手动重置的事件,因此除非显式重置,否则将保持“设置”。

Console.WriteLine("Signaling threads to terminate...");

syncEvents.ExitThreadEvent.Set();

// 使用 Join 阻塞主线程,首先阻塞到制造者线程

// 终止,然后阻塞到使用者线程终止。

Console.WriteLine("main thread waiting for threads to finish...");

producerThread.Join();

consumerThread.Join();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: