您的位置:首页 > 编程语言 > Java开发

一种线程交互模型的实现

2010-08-28 17:55 316 查看

本文介绍一种采用线程交互模型,即主线程执行队列的Task,其他线程投递Task进入主线程的任务队列,投递方式类似于Win32 SDK的PostMessage和SendMessage方法,提供异步投递和同步投递。
首先我们需要一个BlockAndAwaitableQueue类,该类的功能是提供除执行Task线程外的其他线程的任务投递功能,该类包含一个任务列表,即存放待执行的Task。同时要考虑到多线程的同步问题,需要采用同步锁进行同步。同时要保证BlockAndAwaitableQueue的主执行线程在无Task可执行时不占用过多的系统资源需要实现一种monitor模型。即在没有Task可执行时,需要使该线程进入等待队列中,而在有Task投入进来时,需要唤醒Task列表中的等待队列中的当前执行线程。该模型有个限制,即所提交的Task不应占用过多的执行时间。

Java代码



public class BlockAndAwaitableQueue{   

            private ArrayList taskList = new ArrayList();   

               

            private volatile boolean bStop = false;   

               

            private volatile boolean bRunning = false;   

                           

            private Thread runningThread;   

               

            Runnable finalTask = new Runnable()   

            {   

                public void run()   

                {   

                   System.out.println("runningThread will exit.");   

                     

                       synchronized (this) {                    

                                                      

                               bRunning = false;   

                                  

                               System.out.println("notify ...");   

                               synchronized (runningThread) {   

                                 runningThread.notifyAll();   

                            }             

                            

                        }          

                       System.out.println("runningThread exit.");   

                }   

            };   

            public BlockAndAwaitableQueue()   

            {   

                   

            }   

            /*  

             * This can be called by runningThread and other threads.  

             * @param task  The task wants to be executed asychronized.  

             */  

            public void addTask(Runnable task)   

            {   

                synchronized(taskList)   

                {   

                   taskList.add(task);   

                   // notify   

                   taskList.notifyAll();   

                }   

             }   

            /*  

             * This can not be called by the runningThread  

             */  

             public void addTaskAndAwait(final Runnable task)   

             {   

                 if(runningThread == Thread.currentThread())   

                 {   

                     System.out.println("This can not be called by the runningThread");   

                     return;   

                 }   

                    

                 final Object latchObject = new Object();   

                  synchronized(latchObject)   

                  {   

                      

                    addTask(new Runnable(){   

                     public void run()   

                     {   

                         task.run();   

                         synchronized(latchObject)   

                         {   

                             latchObject.notify();   

                         }                           

                      }                       

                     });   

                    try {   

                        latchObject.wait();   

                    } catch (InterruptedException e) {                 

                    }   

                   }       

             }   

             public void stopInvocation()   

             {   

                bStop = true;   

                addTask(finalTask);   

             }   

                

             public void stopInvocationAndAwait()   

             {   

                bStop = true;   

                addTaskAndAwait(finalTask);   

             }   

                

             private void doInvocations()   

             {   

                 synchronized(taskList)   

                 {   

                    while(taskList.size() == 0)   

                     {   

                        try {                              

                             taskList.wait();          

                 } catch (InterruptedException e) {                

                   }       

                             }    

                     for(int i=0; i < taskList.size(); i++)   

                     {   

                         Runnable task = (Runnable)taskList.get(i);   

                         task.run();   

                         task = null;   

                      }   

                     // clear the task list   

                     taskList.clear();     

                 }   

              }   

             public void run()   

             {   

                 synchronized (this) {   

                     while(bRunning)   

                     {   

                           

                        try {   

                            System.out.println("await ...");   

                            synchronized (runningThread) {   

                                runningThread.wait();   

                            }                          

                            System.out.println("await over");   

                        } catch (InterruptedException e) {   

                            // TODO Auto-generated catch block   

                            e.printStackTrace();   

                        }   

                               

                     }   

                     bRunning = true;   

                }   

                    

                 runningThread = Thread.currentThread();   

                    

                 bStop = false;   

     

                 while(!bStop)   

                  {   

                      doInvocations();   

                  }   

             }   

                

             public static void main(String[] args)   

             {   

                 final BlockAndAwaitableQueue taskQueue = new BlockAndAwaitableQueue();   

                 new Thread(new Runnable(){   

                     public void run()   

                     {   

                         taskQueue.run();   

                            

                     }   

                 }).start();   

                    

               

                    

                 new Thread(new Runnable(){   

                     public void run()   

                     {   

                         taskQueue.run();   

                     }   

                 }).start();   

              

                    

                 new Thread(new Runnable(){   

                     public void run()   

                     {   

                         taskQueue.addTaskAndAwait(new Runnable(){   

                             public void run()   

                             {   

                                 System.out.println("Task ID#1 has been executed.");   

                             }   

                         });   

                         System.out.println("After Task ID#1 has been executed.");   

                     }   

                 }).start();   

                 new Thread(new Runnable(){   

                     public void run()   

                     {   

                         taskQueue.addTask(new Runnable(){   

                             public void run()   

                             {   

                                 System.out.println("Task ID#2 has been executed.");          

                                 //stop the queue   

                             }   

                         });   

                     }   

                 }).start();   

                    

                 new Thread(new Runnable(){   

                     public void run()   

                     {   

                              

                        taskQueue.stopInvocationAndAwait();   

                        taskQueue.stopInvocationAndAwait();   

                               

                     }   

                       

                 }).start();   

                                    

             }   

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息