您的位置:首页 > 其它

转载一篇老外的消息队列服务文章

2014-10-25 12:55 246 查看
接着上一篇:/article/1740840.html

原文:http://www.codeproject.com/Articles/821220/Throwing-a-Great-Block

using System;

namespace MessagingQueueing
{
    using System.Collections.Concurrent;
    using System.Threading;

    class Program
    {
        static void Main(string[] args)
        {
            using (MyQueue queue = new MyQueue())
            {
                for (int msgIdx = 1; msgIdx < 21; msgIdx++)
                {
                    queue.AddLog(new MyMessage
                    {
                        MessageId = msgIdx,
                        Message = string.Format("Message text # {0:#,##0}", msgIdx)
                    });
                }
            }

            Console.ReadKey();
        }
    }

    class MyQueue : IDisposable
    {
        private BlockingCollection<MyMessage> messageQueue;
        private Thread dequeueThread;

        bool stopped = true;
        bool isStopping = false;

        public MyQueue()
        {
            messageQueue = new BlockingCollection<MyMessage>(new ConcurrentQueue<MyMessage>());
            dequeueThread = new Thread(new ThreadStart(DequeueMessageThread));
            dequeueThread.Name = "TransactionPostThread";
            dequeueThread.Start();
            stopped = false;
        }

        private void DequeueMessageThread()
        {
            try
            {
                while (true)
                {
                    MyMessage message = messageQueue.Take();
                    Console.WriteLine("Dequeueing: " + message.ToString());

                    if (messageQueue.IsCompleted)
                    {
                        break;
                    }
                }
            }
            catch (InvalidOperationException)
            {
                // if invalid op it's because queue was completed
            }
            catch (ThreadAbortException)
            {
                // Thread aborted due to queue issue, ignore
            }
            catch (Exception)
            {
                throw;
            }
        }

        public void AddLog(MyMessage message)
        {
            Console.WriteLine("Enqueueing: " + message.ToString());
            messageQueue.Add(message);
        }

        /// <summary>
        /// 实现IDisposable接口方法
        /// </summary>
        public void Dispose()
        {
            Dispose(false);
        }

        private void Dispose(bool fromDestructor)
        {
            isStopping = true;
            int logShutdownTimeout = 30000;

            Console.WriteLine("Shutting down queue. Waiting for dequeue thread completion.");

            // Signal queue that we're shutting down
            messageQueue.CompleteAdding();

            // Wait for thread to complete before exiting
            do
            {
                if (!dequeueThread.Join(logShutdownTimeout))
                {
                    // Queue thread may be stuck. Check for items in queue and kill thread if empty

                    if (messageQueue.Count == 0)
                    {
                        System.Diagnostics.Debug.Print("Aborting thread");
                        dequeueThread.Abort();
                        break;
                    }
                }
            } while (dequeueThread.IsAlive);

            Console.WriteLine("Dequeue thread complete.");

            if (!fromDestructor)
            {
                GC.SuppressFinalize(this);
            }

            stopped = true;
            isStopping = false;
        }

        ~MyQueue()
        {
            Dispose(true);
        }
    }

    class MyMessage
    {
        public int MessageId { get; set; }
        public string Message { get; set; }

        public override string ToString()
        {
            return string.Format("Message with ID {0:#,##0} and value {1}.", MessageId, Message);
        }

    }

}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: