java编程思想之并发(线程之间的协作)
2017-12-28 17:21
411 查看
当你使用多线程来同时运行多个任务时,可以通过使用锁来同步两个任务的行为,从而使的一个任务不会干涉另一个任务的资源。也就是说,如果两个任务交替的步入某项共享资源,你可以使用互斥来保证任何时刻只有一个任务可以访问这项资源。
当任务协作时,关键问题是这些任务之间的握手。为了实现握手,我们使用了相同的基础特性:互斥。在这种情况下,互斥能够确保只有一个任务可以响应某个信号,这样就能根除任何可能的竞争条件。在互斥上,我们为任务添加了一种途径,可以将自身挂起,直至某些外部条件发生变化,表示是时候让这个任务开始为止。
调用 sleep() 时候锁并没有被释放,调用 yield() 也是一样。当一个任务在方法里遇到对 wait() 调用时,线程执行被挂起,对象的锁被释放。这就意味着另一个任务可以获得锁,因此在改对象中的其他 synchronized 方法可以在 wait() 期间被调用。因此,当你在调用 wait() 时,就是在声明:“我已经做完了所有的事情,但是我希望其他的 synchronized 操作在条件何时的情况下能够被执行”。
有两种形式的 wait():
第一种接受毫秒作为参数:指再次暂停的时间。
在 wait() 期间对象锁是被释放的。
可以通过 notif() 或 notifAll(),或者指令到期,从 wait() 中恢复执行。
第二种不接受参数的 wait().
这种 wait() 将无线等待下去,直到线程接收到 notif() 或 notifAll()。
wait()、notif()以及 notifAll() 有一个比较特殊的方面,那就是这些方法是基类 Object 的一部分,而不是属于 Thread 类。仅仅作为线程的功能却成为了通用基类的一部分。原因是这些方法操作的锁,也是所有对象的一部分。所以你可以将 wait() 放进任何同步控制方法里,而不用考虑这个类是继承自 Thread 还是 Runnable。实际上,只能在同步方法或者同步代码块里调用 wait()、notif() 或者 notifAll()。如果在非同步代码块里操作这些方法,程序可以通过编译,但是在运行时会得到 IllegalMonitorStateException 异常。意思是,在调用 wait()、notif() 或者 notifAll() 之前必须拥有获取对象的锁。
比如,如果向对象 x 发送 notifAll(),那就必须在能够得到 x 的锁的同步控制块中这么做:
我们看一个示例:一个是将蜡涂到 Car 上,一个是抛光它。抛光任务在涂蜡任务完成之前,是不能执行其工作的,而涂蜡任务在涂另一层蜡之前必须等待抛光任务完成。
开始打蜡的任务:
开始抛光的任务:
测试类:
执行结果:
在 waitForWaxing() 中检查 WaxOn 标志,如果它是 false,那么这个调用任务将会被挂起。这个行为发生在 synchronized 方法中这一点很重要。因为在这个方法中任务已经获得了锁。当你调用 wait() 时,线程被挂起,而锁被释放。释放锁是本质所在,因为为了安全的改变对象的状态,其他某个任务就必须能够获得这个锁。
WaxOn.run() 表示给汽车打蜡的第一个步骤,它执行他的操作:调用 sleep() 模拟打蜡的时间,然后告知汽车打蜡结束,并且调用 waitForWaxing(),这个方法会调用 wait() 挂起当前打蜡的任务。直到 WaxOff 任务调用这两车的 buffed(),从而改变状态并且调用 notfiAll() 重新唤醒为止。翻过来也是一样的,在运行程序时,你可以看到控制权在两个任务之间来回的传递,这两个步骤过程在不断的重复。
错失的信号
当两个线程使用 notif()/wait() 或者 notifAll()/wait() 进行协作时,有可能会错过某个信号。假设线程 T1 是通知 T2 的线程,而这两个线程都使用下面的方式实现:
以上的例子假设 T2 对 someCondition 发现其为 true()。在执行 Potit 其中线程调度器可能切换到了 T1。而 T1 将会执行重新设置 condition,并且调用唤醒。当 T2 继续执行时,以至于不能意识到条件已经发生变化,因此会盲目的进入 wait()。此时唤醒在之前已经调用过了,而 T2 将无限的等待下去唤醒的信号。
解决该问题的方案是防止 someCondition 变量上产生竞争条件:
在 Java 的线程机制中,有一个描述是这样的:notifyAll() 将唤醒所有正在等待的任务。这是否意味着在程序中任何地方,任何处于 wait() 状态中的任务都将被任何对 notifyAll() 的调用唤醒呢?在下面的实例中说明了情况并非如此,当 notifyAll() 因某个特定锁被调用时,只有等待这个锁的任务才会被唤醒:
创建任务 Task:
创建任务 Task2:
测试类:
测试结果:
从上面输出的结果可以看出,我们启动了三个 Task 任务线程,一个 Task2 线程。使用 timer 做了一个定时器,每间隔 4 毫秒就轮换启动 Task.blocker 的 notify() 和 notifyAll()方法。我们看到 Task 和 Task2 都有 Blocker 对象,他们调用 Blocker 对象的时候都会被阻塞。我们看到当调用 Task.prod() 的时候只有一个在等待锁的任务被唤醒,其余两个继续挂起。当调用 Task.prodAll() 的时候等待的三个线程都会被唤醒。当调用 Task2。prodAll() 的时候 只有 Task2 的线程任务被唤醒。其余的三个 Task 任务继续挂起。
膳食类:
服务生类:
厨师类:
测试类:
执行结果:
**使用显示的 Lock 和 Condition 对象
在 java SE5 的类库中还有额外的显示工具。我们来重写我们的打蜡和抛光类。使用互斥并允许任务挂起的基本类是 Condition,你可以通过在 Condition 上调用 await() 来挂起一个任务。当外部条件发生变化时,意味着某个任务应该继续执行,你可以通过调用 signal() 来通知这个任务,从而唤醒一个任务,或者调用 signalAll() 来唤醒所有在这个 Condition 上被挂起的任务。(signalAll() 比 notifAll() 是更安全的方式)
下面是重写版本:
在 Car 的构造器中单个的 Lock 将产生一个 Condition 对象,这个对象被用来管理任务之间的通信。但是这个 Condition 不包含任何有关处理状态的信息,因此你需要额外的表示处理状态的信息,即 Boolean waxOn。
如果消费者任务试图从队列中获取对象,而该队列为空时,那么这些队列就可以挂起这些任务,并且当有更多的元素可用时恢复这些消费任务。阻塞队列可以解决非常大的问题,而其方式与 wait() 和 notifyAll() 相比,则简单切可靠。
下面是一个简单的测试,它将多个 LiftOff 对象执行串行化。消费者 LiftOffRunner 将每个 LiftOff 对象从 BlockIngQueue 中推出并直接运行。它通过显示的调用 run() 而是用自己的线程来运行,而不是为每个任务启动一个线程。
首先把之前写过的 LiftOff 类贴出来:
LiftOffRunner 类:
最后是测试类:
吐司 BlockingQueue
下面是一个示例,每一台机器都有三个任务:一个只做吐司、一个给吐司抹黄油、另一个在涂抹黄油的吐司上抹果酱。我们来示例如果使用 BlockIngQueue 来运行这个示例:
这个示例中没有任何显示的同步,因为同步队列和系统的设计隐式的管理了每片 Toast 在任何时刻都只有一个任务在操作。因为队列的阻塞,使得处理过程将被自动挂起和恢复。
我的博客
扫描关注公众号免费获取全套的 Java 编程思想读书笔记。
我的微信公号:Android开发吹牛皮
线程之间的协作
上面的问题已经解决了,下一步是如何使得任务彼此之间可以协作,使得多个任务可以一起工作去解决某个问题。现在的问题不是彼此之间的干涉,而是彼此之间的协作。解决这类问题的关键是某些部分必须在其他部分被解决之前解决。当任务协作时,关键问题是这些任务之间的握手。为了实现握手,我们使用了相同的基础特性:互斥。在这种情况下,互斥能够确保只有一个任务可以响应某个信号,这样就能根除任何可能的竞争条件。在互斥上,我们为任务添加了一种途径,可以将自身挂起,直至某些外部条件发生变化,表示是时候让这个任务开始为止。
wait() 与 notifyAll()
wait() 可以使你等待某个条件发生变化,而改变这个条件通常是由另一个任务来改变。你肯定不想在你的任务测试这个条件的同时,不断的进行空循环,这被称为忙等待,是一种不良的 cpu 使用方式。因此 wait() 会在外部条件发生变化的时候将任务挂起,并且只有在 notif() 或 notifAll() 发生时,这个任务才会被唤醒并去检查所发生的变化。因此,wait() 提供了一种在任务之间对活动同步的方式。调用 sleep() 时候锁并没有被释放,调用 yield() 也是一样。当一个任务在方法里遇到对 wait() 调用时,线程执行被挂起,对象的锁被释放。这就意味着另一个任务可以获得锁,因此在改对象中的其他 synchronized 方法可以在 wait() 期间被调用。因此,当你在调用 wait() 时,就是在声明:“我已经做完了所有的事情,但是我希望其他的 synchronized 操作在条件何时的情况下能够被执行”。
有两种形式的 wait():
第一种接受毫秒作为参数:指再次暂停的时间。
在 wait() 期间对象锁是被释放的。
可以通过 notif() 或 notifAll(),或者指令到期,从 wait() 中恢复执行。
第二种不接受参数的 wait().
这种 wait() 将无线等待下去,直到线程接收到 notif() 或 notifAll()。
wait()、notif()以及 notifAll() 有一个比较特殊的方面,那就是这些方法是基类 Object 的一部分,而不是属于 Thread 类。仅仅作为线程的功能却成为了通用基类的一部分。原因是这些方法操作的锁,也是所有对象的一部分。所以你可以将 wait() 放进任何同步控制方法里,而不用考虑这个类是继承自 Thread 还是 Runnable。实际上,只能在同步方法或者同步代码块里调用 wait()、notif() 或者 notifAll()。如果在非同步代码块里操作这些方法,程序可以通过编译,但是在运行时会得到 IllegalMonitorStateException 异常。意思是,在调用 wait()、notif() 或者 notifAll() 之前必须拥有获取对象的锁。
比如,如果向对象 x 发送 notifAll(),那就必须在能够得到 x 的锁的同步控制块中这么做:
synchronized(x){ x.notifAll(); }
我们看一个示例:一个是将蜡涂到 Car 上,一个是抛光它。抛光任务在涂蜡任务完成之前,是不能执行其工作的,而涂蜡任务在涂另一层蜡之前必须等待抛光任务完成。
public class Car { //涂蜡和抛光的状态 private boolean waxOn = false; //打蜡 public synchronized void waxed() { waxOn = true; notifyAll(); } //抛光 public synchronized void buffed() { waxOn = false; notifyAll(); } //抛光结束被挂起即将开始打蜡任务 public synchronized void waitForWaxing() throws InterruptedException{ while (waxOn == false) { wait(); } } //打蜡结束被挂起即将开始抛任务 public synchronized void waitForBuffing() throws InterruptedException{ while (waxOn == true) { wait(); } } }
开始打蜡的任务:
public class WaxOn implements Runnable{ private Car car; protected WaxOn(Car car) { super(); this.car = car; } @Override public void run() { try { while (!Thread.interrupted()) { System.out.println("Wax one"); TimeUnit.MICROSECONDS.sleep(200); //开始打蜡 car.waxed(); //当前任务被挂起 car.waitForBuffing(); } } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println(" Exiting via interrupt"); } System.out.println("Ending wax on task"); } }
开始抛光的任务:
public class WaxOff implements Runnable{ private Car car; protected WaxOff(Car car) { super(); this.car = car; } @Override public void run() { // TODO Auto-generated method stub try { while (!Thread.interrupted()) { //如果还是在打蜡就挂起 car.waitForWaxing(); System.out.println("Wax off"); TimeUnit.MICROSECONDS.sleep(200); //开始抛光 car.buffed(); } } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("Wxtiing via interrupt"); } System.out.println("Ending wax off task"); } }
测试类:
public class WaxOmatic { public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub Car car = new Car(); ExecutorService service = Executors.newCachedThreadPool(); service.execute(new WaxOff(car)); service.execute(new WaxOn(car)); //暂停2秒钟 TimeUnit.SECONDS.sleep(1); //关闭所有的任务 service.shutdownNow(); } }
执行结果:
/..... Wax one Wax off Wax one Wax off Wax one Wax off Exiting via interrupt Wxtiing via interrupt Ending wax on task Ending wax off task
在 waitForWaxing() 中检查 WaxOn 标志,如果它是 false,那么这个调用任务将会被挂起。这个行为发生在 synchronized 方法中这一点很重要。因为在这个方法中任务已经获得了锁。当你调用 wait() 时,线程被挂起,而锁被释放。释放锁是本质所在,因为为了安全的改变对象的状态,其他某个任务就必须能够获得这个锁。
WaxOn.run() 表示给汽车打蜡的第一个步骤,它执行他的操作:调用 sleep() 模拟打蜡的时间,然后告知汽车打蜡结束,并且调用 waitForWaxing(),这个方法会调用 wait() 挂起当前打蜡的任务。直到 WaxOff 任务调用这两车的 buffed(),从而改变状态并且调用 notfiAll() 重新唤醒为止。翻过来也是一样的,在运行程序时,你可以看到控制权在两个任务之间来回的传递,这两个步骤过程在不断的重复。
错失的信号
当两个线程使用 notif()/wait() 或者 notifAll()/wait() 进行协作时,有可能会错过某个信号。假设线程 T1 是通知 T2 的线程,而这两个线程都使用下面的方式实现:
T1: synchronized(X){ //设置 T2 的一个条件 <setup condition for T2> x.notif(); } T2: while(someCondition){ //Potit synchronized(x){ x.wait(); } }
以上的例子假设 T2 对 someCondition 发现其为 true()。在执行 Potit 其中线程调度器可能切换到了 T1。而 T1 将会执行重新设置 condition,并且调用唤醒。当 T2 继续执行时,以至于不能意识到条件已经发生变化,因此会盲目的进入 wait()。此时唤醒在之前已经调用过了,而 T2 将无限的等待下去唤醒的信号。
解决该问题的方案是防止 someCondition 变量上产生竞争条件:
synchronized(x){ while(someCondition){ x.wait(); } }
notif() 与 notifAll()
可能有多个任务在单个 Car 对象上被挂起处于 wait() 状态,因此调用 notifyAll() 比调用 notify() 更安全。使用 notify() 而不是 notifyAll() 是一种优化。使用 notify() 时,在众多等待同一个锁的任务中只有一个被唤醒,因此如果你希望使用 notify(),就必须保证被唤醒的是恰当的任务。另外使用 notify() ,所有任务都必须等待相同的条件,因为如果你有多个任务在等待不同的条件,那你就不会知道是否唤醒了恰当的任务。如果使用 notfiy(),当条件发生变化时,必须只有一个任务能从中收益。最后,这些限制对所有可能存在的子类都必须总起作用。如果这些规则任何一条不满足都必须使用 notifyAll()。在 Java 的线程机制中,有一个描述是这样的:notifyAll() 将唤醒所有正在等待的任务。这是否意味着在程序中任何地方,任何处于 wait() 状态中的任务都将被任何对 notifyAll() 的调用唤醒呢?在下面的实例中说明了情况并非如此,当 notifyAll() 因某个特定锁被调用时,只有等待这个锁的任务才会被唤醒:
public class Blocker { synchronized void waitingCall() { try { while(!Thread.interrupted()) { wait(); System.out.print(Thread.currentThread() + " "); } } catch(InterruptedException e) { // OK to exit this way } } synchronized void prod() { notify(); } synchronized void prodAll() { notifyAll(); } }
创建任务 Task:
public class Task implements Runnable { static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); } }
创建任务 Task2:
public class Task2 implements Runnable { // A separate Blocker object: static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); } }
测试类:
public class NotifyVsNotifyAll { public static void main(String[] args) throws Exception{ ExecutorService service = Executors.newCachedThreadPool(); for (int i = 0; i < 3; i++) { service.execute(new Task()); } service.execute(new Task2()); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { boolean prod = true; @Override public void run() { // TODO Auto-generated method stub if (prod) { System.out.println("notify"); Task.blocker.prod(); prod = false; }else { System.out.println("notifyAll"); Task.blocker.prodAll(); prod = true; } } }, 400, 400); TimeUnit.SECONDS.sleep(5); timer.cancel(); System.out.println("Time cancle"); TimeUnit.MILLISECONDS.sleep(500); System.out.println("Task2.blocker.prodAll() "); Task2.blocker.prodAll(); TimeUnit.MILLISECONDS.sleep(500); System.out.println("\nShutting down"); service.shutdownNow(); // Interrupt all tasks } }
测试结果:
notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] Time cancle Task2.blocker.prodAll() Thread[pool-1-thread-4,5,main] Shutting down
从上面输出的结果可以看出,我们启动了三个 Task 任务线程,一个 Task2 线程。使用 timer 做了一个定时器,每间隔 4 毫秒就轮换启动 Task.blocker 的 notify() 和 notifyAll()方法。我们看到 Task 和 Task2 都有 Blocker 对象,他们调用 Blocker 对象的时候都会被阻塞。我们看到当调用 Task.prod() 的时候只有一个在等待锁的任务被唤醒,其余两个继续挂起。当调用 Task.prodAll() 的时候等待的三个线程都会被唤醒。当调用 Task2。prodAll() 的时候 只有 Task2 的线程任务被唤醒。其余的三个 Task 任务继续挂起。
生产者与消费者
请考虑这样一种情况,在饭店有一个厨师和一个服务员。这个服务员必须等待厨师做好膳食。当厨师准备好时会通知服务员,之后服务员上菜,然后返回继续等待。这是一个任务协作示例:厨师代表生产者,而服务员代表消费者。两个任务必须在膳食被生产和消费时进行握手,而系统必须是以有序的方式关闭。膳食类:
public class Meal { private final int orderNum; public Meal(int orderNum) { this.orderNum = orderNum; } public String toString() { return "Meal " + orderNum; } }
服务生类:
public class WaitPerson implements Runnable { private Restaurant restaurant; public WaitPerson(Restaurant r) { restaurant = r; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { while(restaurant.meal == null) wait(); // ... for the chef to produce a meal } Print.print("Waitperson got " + restaurant.meal); synchronized(restaurant.chef) { restaurant.meal = null; restaurant.chef.notifyAll(); // Ready for another } } } catch(InterruptedException e) { Print.print("WaitPerson interrupted"); } } }
厨师类:
public class Chef implements Runnable { private Restaurant restaurant; private int count = 0; public Chef(Restaurant r) { restaurant = r; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { while(restaurant.meal != null) wait(); // ... for the meal to be taken } if(++count == 10) { Print.print("Out of food, closing"); restaurant.exec.shutdownNow(); } Print.printnb("Order up! "); synchronized(restaurant.waitPerson) { restaurant.meal = new Meal(count); restaurant.waitPerson.notifyAll(); } TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { Print.print("Chef interrupted"); } } }
测试类:
public class Restaurant { Meal meal; ExecutorService exec = Executors.newCachedThreadPool(); WaitPerson waitPerson = new WaitPerson(this); Chef chef = new Chef(this); public Restaurant() { exec.execute(chef); exec.execute(waitPerson); } public static void main(String[] args) { new Restaurant(); } }
执行结果:
Order up! Waitperson got Meal 1 Order up! Waitperson got Meal 2 Order up! Waitperson got Meal 3 Order up! Waitperson got Meal 4 Order up! Waitperson got Meal 5 Order up! Waitperson got Meal 6 Order up! Waitperson got Meal 7 Order up! Waitperson got Meal 8 Order up! Waitperson got Meal 9 Out of food, closing Order up! WaitPerson interrupted Chef interrupted
**使用显示的 Lock 和 Condition 对象
在 java SE5 的类库中还有额外的显示工具。我们来重写我们的打蜡和抛光类。使用互斥并允许任务挂起的基本类是 Condition,你可以通过在 Condition 上调用 await() 来挂起一个任务。当外部条件发生变化时,意味着某个任务应该继续执行,你可以通过调用 signal() 来通知这个任务,从而唤醒一个任务,或者调用 signalAll() 来唤醒所有在这个 Condition 上被挂起的任务。(signalAll() 比 notifAll() 是更安全的方式)
下面是重写版本:
class Car { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean waxOn = false; public void waxed() { lock.lock(); try { waxOn = true; // Ready to buff condition.signalAll(); } finally { lock.unlock(); } } public void buffed() { lock.lock(); try { waxOn = false; // Ready for another coat of wax condition.signalAll(); } finally { lock.unlock(); } } public void waitForWaxing() throws InterruptedException { lock.lock(); try { while(waxOn == false) condition.await(); } finally { lock.unlock(); } } public void waitForBuffing() throws InterruptedException{ lock.lock(); try { while(waxOn == true) condition.await(); } finally { lock.unlock(); } } } class WaxOn implements Runnable { private Car car; public WaxOn(Car c) { car = c; } public void run() { try { while(!Thread.interrupted()) { printnb("Wax On! "); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch(InterruptedException e) { print("Exiting via interrupt"); } print("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; public WaxOff(Car c) { car = c; } public void run() { try { while(!Thread.interrupted()) { car.waitForWaxing(); printnb("Wax Off! "); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch(InterruptedException e) { print("Exiting via interrupt"); } print("Ending Wax Off task"); } } public class WaxOMatic2 { public static void main(String[] args) throws Exception { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }
在 Car 的构造器中单个的 Lock 将产生一个 Condition 对象,这个对象被用来管理任务之间的通信。但是这个 Condition 不包含任何有关处理状态的信息,因此你需要额外的表示处理状态的信息,即 Boolean waxOn。
生产者消费者与队列
wait() 和 notifAll() 方法以一种非常低级的方式解决了任务的互操作的问题,即每次交互时都握手。许多时候我们可以使用同步队列来解决协作的问题,同步队列在任何时刻只允许一个任务插入或移除元素。在 Java.util.concurrent.BlockingQueue 接口中提供了这个队列,这个接口有大量的标准实现。可以使用 LinkedBlockingQueue 他是一个无界队列,还可以使用 ArrayBlockingQueue,它具有固定的尺寸,可以在它被阻塞之前向其中放置有限数量的元素。如果消费者任务试图从队列中获取对象,而该队列为空时,那么这些队列就可以挂起这些任务,并且当有更多的元素可用时恢复这些消费任务。阻塞队列可以解决非常大的问题,而其方式与 wait() 和 notifyAll() 相比,则简单切可靠。
下面是一个简单的测试,它将多个 LiftOff 对象执行串行化。消费者 LiftOffRunner 将每个 LiftOff 对象从 BlockIngQueue 中推出并直接运行。它通过显示的调用 run() 而是用自己的线程来运行,而不是为每个任务启动一个线程。
首先把之前写过的 LiftOff 类贴出来:
public class LiftOff implements Runnable{ protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() {} public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while(countDown-- > 0) { System.out.print(status()); Thread.yield(); } } }
LiftOffRunner 类:
public class LiftOffRunner implements Runnable{ private BlockingQueue<LiftOff> rockets; protected LiftOffRunner(BlockingQueue<LiftOff> rockets) { super(); this.rockets = rockets; } public void add(LiftOff lo) { try { rockets.put(lo); } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("添加失败"); } } @Override public void run() { // TODO Auto-generated method stub try { while (!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); } } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("运行中断"); } System.out.println("退出运行"); } }
最后是测试类:
public class TestBlockingQueues { static void getkey(){ try { new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } static void getkey(String message) { Print.print(message); getkey(); } static void test(String msg,BlockingQueue<LiftOff> queue){ LiftOffRunner runner = new LiftOffRunner(queue); Thread thread = new Thread(runner); thread.start(); //启动了,但是内容是空的,就一直挂起,等待有新的内容进去 for (int i = 0; i < 5; i++) { runner.add(new LiftOff(5)); } getkey("Press Enter "+ msg); thread.interrupt(); } public static void main(String[] args) { test("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>()); test("ArrayBlockingQueue", new ArrayBlockingQueue<>(3)); test("SynchronousQueue", new SynchronousQueue<>()); } }
吐司 BlockingQueue
下面是一个示例,每一台机器都有三个任务:一个只做吐司、一个给吐司抹黄油、另一个在涂抹黄油的吐司上抹果酱。我们来示例如果使用 BlockIngQueue 来运行这个示例:
class Toast { public enum Status { DRY, BUTTERED, JAMMED } private Status status = Status.DRY; private final int id; public Toast(int idn) { id = idn; } public void butter() { status = Status.BUTTERED; } public void jam() { status = Status.JAMMED; } public Status getStatus() { return status; } public int getId() { return id; } public String toString() { return "Toast " + id + ": " + status; } } class ToastQueue extends LinkedBlockingQueue<Toast> {} class Toaster implements Runnable { private ToastQueue toastQueue; private int count = 0; private Random rand = new Random(47); public Toaster(ToastQueue tq) { toastQueue = tq; } public void run() { try { while(!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep( 100 + rand.nextInt(500)); // Make toast Toast t = new Toast(count++); print(t); // Insert into queue toastQueue.put(t); } } catch(InterruptedException e) { print("Toaster interrupted"); } print("Toaster off"); } } // Apply butter to toast: class Butterer implements Runnable { private ToastQueue dryQueue, butteredQueue; public Butterer(ToastQueue dry, ToastQueue buttered) { dryQueue = dry; butteredQueue = buttered; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = dryQueue.take(); t.butter(); print(t); butteredQueue.put(t); } } catch(InterruptedException e) { print("Butterer interrupted"); } print("Butterer off"); } } // Apply jam to buttered toast: class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; public Jammer(ToastQueue buttered, ToastQueue finished) { butteredQueue = buttered; finishedQueue = finished; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = butteredQueue.take(); t.jam(); print(t); finishedQueue.put(t); } } catch(InterruptedException e) { print("Jammer interrupted"); } print("Jammer off"); } } // Consume the toast: class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0; public Eater(ToastQueue finished) { finishedQueue = finished; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = finishedQueue.take(); // Verify that the toast is coming in order, // and that all pieces are getting jammed: if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { print(">>>> Error: " + t); System.exit(1); } else print("Chomp! " + t); } } catch(InterruptedException e) { print("Eater interrupted"); } print("Eater off"); } } public class ToastOMatic { public static void main(String[] args) throws Exception { ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue)); exec.execute(new Butterer(dryQueue, butteredQueue)); exec.execute(new Jammer(butteredQueue, finishedQueue)); exec.execute(new Eater(finishedQueue)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }
这个示例中没有任何显示的同步,因为同步队列和系统的设计隐式的管理了每片 Toast 在任何时刻都只有一个任务在操作。因为队列的阻塞,使得处理过程将被自动挂起和恢复。
我的博客
扫描关注公众号免费获取全套的 Java 编程思想读书笔记。
我的微信公号:Android开发吹牛皮
相关文章推荐
- java编程思想笔记-并发之线程协作(三)
- Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)
- JAVA 并发编程-多个线程之间共享数据(六)
- Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition
- Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)
- Java并发编程---线程之间的通信问题
- Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition
- Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition
- (转载)Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition
- Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)
- Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)
- JAVA 并发编程-多个线程之间共享数据(六)
- Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)
- Java 并发编程:线程间的协作(wait/notify/sleep/yield/join) (r)
- Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition
- Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition
- java 并发编程:线程间的协作 wait()+notify()/notifyall() 和 Condition
- Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition
- JAVA并发编程-线程间协作(Object监视器方法与Condition)
- Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)