Java多线程编程核心技术---线程间通信(一)
2016-06-17 20:46
513 查看
线程是操作系统中独立的个体,但这些个体如果不经过特殊处理就不能成为一个整体。线程间的通信就是成为整体的必用方案之一。线程间通信可以使系统之间的交互性更强大,在大大提高CPU利用率的同时还会使程序员对各线程任务在处理的过程中进行有效的把控监督。
等待通知机制
在调用wait()之前,线程必须获得该对象的对象级别锁,即只能在同步方法或同步代码块中调用wait()方法。在执行wait()方法后,当前线程释放锁,在从wait()返回之前,线程与其他线程竞争重新获得锁。
控制台打印结果如下:
wait使线程停止运行,notify使停止的线程继续执行。
notify()方法执行后并不立即释放锁。看下面的例子:
控制台输出结果如下:
==原因待补充...==
关键字synchronized可以将任何一个Object对象作为同步对象来看待,Java为每一个对象都实现了wait()方法和notify()方法,他们必须用在被synchronized同步的Object的临界区内。调用wait()方法可以使处于临界区内的线程进入等待状态,同时释放被同步对象的锁。notify()方法可以唤醒一个因调用了wait()方法而处于阻塞状态中的线程,使其进入就绪状态。被重新唤醒的线程会试图重新获得临界区的控制权,也就是锁,并继续执行临界区内wait()之后的代码。如果发出notify()时没有处于阻塞状态中的线程,那么该命令会被忽略。
wait()方法可以使调用该方法的线程释放共享资源的锁,然后从运行状态退出,进入等待队列,知道被再次唤醒。
notify()方法可以随机唤醒等待队列中等待同一共享资源的“一个”线程,并使该线程退出等待队列,进入可运行状态,也就是notify()方法仅通知“一个”线程。
notifyAll()方法可以使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级更高的那个线程最先执行,但也有可能是随机执行,这要取决于JVM的具体实现。
新创建一个新的线程对象后,再调用它的start()方法,系统会为此线程分配CPU资源,使其处于Runnable(可运行)状态,这是一个准备运行的阶段。如果线程抢占到CPU资源,此线程就处于(Running)状态。
Runnable状态和Running状态可相互切换,因为有可能线程运行一段时间后,有其他高优先级的线程抢占了CPU资源,这时此线程就从Running状态变成Runnable状态。
线程进入Runnable状态一般有以下五种情况:
调用sleep()方法后经过的时间超过了指定的休眠时间
线程调用的阻塞IO已经返回,阻塞方法执行完毕
线程成功地获得了试图同步的监视器
线程正在等待某个通知,其他线程发出了通知
处于挂起状态的线程调用了resume()恢复方法
Blocked是阻塞的意思,例如遇到了一个IO操作,此时CPU处于空闲状态,可能会转而把CPU时间片分配给其他线程,这是也可以称为“暂停”状态。Blocked状态结束后,进入Runnable状态,等待系统重新分配资源。
出现阻塞的情况一般有以下五种:
线程调用sleep方法,主动放弃占用的CPU资源
线程调用了阻塞式IO方法,在该方法返回前,该线程被阻塞
线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有
线程等待某个通知
程序调用了suspend方法将该线程挂起。此方法容易导致死锁,尽量避免使用。
run()方法运行结束后进入销毁阶段,整个线程执行完毕。
每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度。反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。
方法wait()锁释放与notify()锁不释放
wait()方法被执行后,锁被自动释放。但是执行完notify()方法后,锁不自动释放。
控制台打印结果如下:
从打印结果来看,Thread-0执行wait()方法后立即释放对lock对象的锁,之后Thread-1获得lock对象锁。Thread-1发出notify通知之后并没有立即释放锁,而是执行完lock同步块之后才释放锁,之后Thread-0重新获得lock对象锁。Thread-0执行完毕释放lock对象锁,Thread-2获得lock对象锁并发出notify通知,但是此时没有处于阻塞状态的线程,notify被忽略。
当interrupt遇到wait方法
当线程呈wait()状态时,调用线程对象的interrupt()方法会出现InterruptedException异常。
控制台打印结果如下:
以上几个示例的结论:
执行完同步代码块就会释放对象的锁。
在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放。
在执行同步代码块的过程中,执行了所属对象的wait()方法后,这个线程会释放对象锁并进入线程等待池中,等待被唤醒。
只通知一个线程
调用notify()方法一次只随机通知一个线程进行唤醒。
控制台打印结果如下:
由打印结果看,Thread-2收到notify继续运行直至执行完毕,Thread-0和Thread-1一直收不到notify处于wait状态。
对NotifyThread做如下修改:
此时控制台打印结果如下:
此时三个线程全部收到notify退出等待继续运行。
对NotifyThread做如下修改:
此时控制台打印结果如下:
可见notifyAll()可以唤醒全部等待线程。
方法wait(long)的使用
控制台打印结果如下:
wait(long)方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒,看下面的例子。
控制台打印结果如下:
thread1只等待了2000ms就被thread2唤醒。
使用wait/notify模式时,wait等待条件发生变化会造成程序逻辑混乱
控制台打印结果如下:
出现异常的原因是threadSubtract2开始wait后释放了lock锁,threadSubtract1获取lock锁之后也进入wait。threadAdd执行了add方法后给所有线程发出notify通知,此时threadSubtract2和threadSubtract1都收到通知继续执行,所以subtract方法会被执行两次,导致抛出越界异常。
解决方法如下:
此时控制台打印结果如下:
程序不结束,threadSubtract1继续等待。
生产者/消费者模式实现
一生产一消费
控制台打印结果如下:
多生产与多消费---假死
“假死”现象其实就是线程进入WAITING状态。如果全部线程都进入WAITING状态,则程序不再执行任何业务功能了。
控制台打印结果如下:
最终四个线程都处于WAITING状态。这是由于生产者不仅可以唤醒消费者还可以唤醒另一个生产者,消费者不仅可以唤醒生产者还可以唤醒另一个消费者。这样情况运行的比例积少成多就会导致最终所有的线程都不能继续运行下去,所有的线程都处于WAITING状态。
要解决以上问题,只需要将生产者和消费者中的notify改成notifyAll。这样就能保证生产者一定会通知到消费者,消费者也一定能通知到生产者。
一生产与一消费:操作栈
控制台打印结果如下:
通过十一生产者/消费者模式,list的size在0和1之间交替,也就是生产和消费两个过程交替执行。
一生产与多消费---操作栈:解决wait条件改变与假死
将以上代码做如下修改:
运行结果如下:
出现异常的原因是MyStack中使用了if判断,条件发生改变是没有及时得到响应,导致多个呈wait状态的线程被唤醒。解决办法是将if换成while即可,如下。
重新运行程序发现程序出现假死情况,此时需要将notify修改成notifyAll,具体代码略。
多生产与一消费---操作栈
将上面例子中的main函数作如下修改:
控制台打印结果如下:
多生产与多消费
将以上例子中的main函数作如下修改:
控制台打印结果如下:
等待通知机制
在调用wait()之前,线程必须获得该对象的对象级别锁,即只能在同步方法或同步代码块中调用wait()方法。在执行wait()方法后,当前线程释放锁,在从wait()返回之前,线程与其他线程竞争重新获得锁。
public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { System.out.println("开始wait,time=" + System.currentTimeMillis()); lock.wait(); System.out.println("结束wait,time=" + System.currentTimeMillis()); } } catch (InterruptedException e) { e.printStackTrace(); } } } public class ThreadB extends Thread { private Object lock; public ThreadB(Object lock) { super(); this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println("开始notify,time=" + System.currentTimeMillis()); lock.notify(); System.out.println("结束notify,time=" + System.currentTimeMillis()); } } } public class Main { public static void main(String[] args) { try { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); Thread.sleep(1000); ThreadB b = new ThreadB(lock); b.start(); } catch (InterruptedException e) { e.printStackTrace(); } } }
控制台打印结果如下:
开始wait,time=1466072144818 开始notify,time=1466072145820 结束notify,time=1466072145820 结束wait,time=1466072145820
wait使线程停止运行,notify使停止的线程继续执行。
notify()方法执行后并不立即释放锁。看下面的例子:
public class MyList { private static List list = new ArrayList(); public static void add() { list.add("admin"); } public static int size() { return list.size(); } } public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (Exception e) { e.printStackTrace(); } } } public class ThreadB extends Thread { private Object lock; public ThreadB(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已发出通知..."); } System.out.println("添加了" + (i + 1) + "个元素"); Thread.sleep(500); } } } catch (Exception e) { e.printStackTrace(); } } } public class Main { public static void main(String[] args) { try { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); Thread.sleep(50); ThreadB b = new ThreadB(lock); b.start(); } catch (InterruptedException e) { e.printStackTrace(); } } }
控制台输出结果如下:
wait begin 1466128246574 添加了1个元素 添加了2个元素 添加了3个元素 添加了4个元素 已发出通知... 添加了5个元素 添加了6个元素 添加了7个元素 添加了8个元素 添加了9个元素 添加了10个元素 wait end 1466128251629
==原因待补充...==
关键字synchronized可以将任何一个Object对象作为同步对象来看待,Java为每一个对象都实现了wait()方法和notify()方法,他们必须用在被synchronized同步的Object的临界区内。调用wait()方法可以使处于临界区内的线程进入等待状态,同时释放被同步对象的锁。notify()方法可以唤醒一个因调用了wait()方法而处于阻塞状态中的线程,使其进入就绪状态。被重新唤醒的线程会试图重新获得临界区的控制权,也就是锁,并继续执行临界区内wait()之后的代码。如果发出notify()时没有处于阻塞状态中的线程,那么该命令会被忽略。
wait()方法可以使调用该方法的线程释放共享资源的锁,然后从运行状态退出,进入等待队列,知道被再次唤醒。
notify()方法可以随机唤醒等待队列中等待同一共享资源的“一个”线程,并使该线程退出等待队列,进入可运行状态,也就是notify()方法仅通知“一个”线程。
notifyAll()方法可以使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级更高的那个线程最先执行,但也有可能是随机执行,这要取决于JVM的具体实现。
新创建一个新的线程对象后,再调用它的start()方法,系统会为此线程分配CPU资源,使其处于Runnable(可运行)状态,这是一个准备运行的阶段。如果线程抢占到CPU资源,此线程就处于(Running)状态。
Runnable状态和Running状态可相互切换,因为有可能线程运行一段时间后,有其他高优先级的线程抢占了CPU资源,这时此线程就从Running状态变成Runnable状态。
线程进入Runnable状态一般有以下五种情况:
调用sleep()方法后经过的时间超过了指定的休眠时间
线程调用的阻塞IO已经返回,阻塞方法执行完毕
线程成功地获得了试图同步的监视器
线程正在等待某个通知,其他线程发出了通知
处于挂起状态的线程调用了resume()恢复方法
Blocked是阻塞的意思,例如遇到了一个IO操作,此时CPU处于空闲状态,可能会转而把CPU时间片分配给其他线程,这是也可以称为“暂停”状态。Blocked状态结束后,进入Runnable状态,等待系统重新分配资源。
出现阻塞的情况一般有以下五种:
线程调用sleep方法,主动放弃占用的CPU资源
线程调用了阻塞式IO方法,在该方法返回前,该线程被阻塞
线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有
线程等待某个通知
程序调用了suspend方法将该线程挂起。此方法容易导致死锁,尽量避免使用。
run()方法运行结束后进入销毁阶段,整个线程执行完毕。
每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度。反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。
方法wait()锁释放与notify()锁不释放
wait()方法被执行后,锁被自动释放。但是执行完notify()方法后,锁不自动释放。
public class Service { public void testWaitMethod(Object lock) { try { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " begin wait " + System.currentTimeMillis()); lock.wait(); System.out.println(Thread.currentThread().getName() + " end wait " + System.currentTimeMillis()); } } catch (InterruptedException e) { e.printStackTrace(); } } public void testNotifyMethod(Object lock) { try { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " begin notify " + System.currentTimeMillis()); lock.notify(); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " end notify " + System.currentTimeMillis()); } } catch (Exception e) { e.printStackTrace(); } } } public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { Service service = new Service(); service.testWaitMethod(lock); } } public class ThreadB extends Thread { private Object lock; public ThreadB(Object lock) { super(); this.lock = lock; } @Override public void run() { Service service = new Service(); service.testNotifyMethod(lock); } } public class ThreadC extends Thread { private Object lock; public ThreadC(Object lock) { super(); this.lock = lock; } @Override public void run() { Service service = new Service(); service.testNotifyMethod(lock); } } public class Main { public static void main(String[] args) { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); ThreadB b = new ThreadB(lock); b.start(); ThreadC c = new ThreadC(lock); c.start(); } }
控制台打印结果如下:
Thread-0 begin wait 1466149812036 Thread-1 begin notify 1466149812037 Thread-1 end notify 1466149814037 Thread-0 end wait 1466149814037 Thread-2 begin notify 1466149814038 Thread-2 end notify 1466149816038
从打印结果来看,Thread-0执行wait()方法后立即释放对lock对象的锁,之后Thread-1获得lock对象锁。Thread-1发出notify通知之后并没有立即释放锁,而是执行完lock同步块之后才释放锁,之后Thread-0重新获得lock对象锁。Thread-0执行完毕释放lock对象锁,Thread-2获得lock对象锁并发出notify通知,但是此时没有处于阻塞状态的线程,notify被忽略。
当interrupt遇到wait方法
当线程呈wait()状态时,调用线程对象的interrupt()方法会出现InterruptedException异常。
public class Service { public void testMethod(Object lock) { try { synchronized (lock) { System.out.println("begin wait..."); lock.wait(); System.out.println("end wait..."); } } catch (InterruptedException e) { e.printStackTrace(); System.out.println("出现异常了,因为呈wait状态的线程被interrupt了。"); } } } public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { Service service = new Service(); service.testMethod(lock); } } public class Main { public static void main(String[] args) { try { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); Thread.sleep(2000); a.interrupt(); } catch (Exception e) { e.printStackTrace(); } } }
控制台打印结果如下:
begin wait... java.lang.InterruptedException出现异常了,因为呈wait状态的线程被interrupt了。 at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:503) at com.umgsai.thread29.Service.testMethod(Service.java:8) at com.umgsai.thread29.ThreadA.run(ThreadA.java:13)
以上几个示例的结论:
执行完同步代码块就会释放对象的锁。
在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放。
在执行同步代码块的过程中,执行了所属对象的wait()方法后,这个线程会释放对象锁并进入线程等待池中,等待被唤醒。
只通知一个线程
调用notify()方法一次只随机通知一个线程进行唤醒。
public class Service { public void testMethod(Object lock) { try { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " begin wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " end wait"); } } catch (Exception e) { e.printStackTrace(); } } }
public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { Service service = new Service(); service.testMethod(lock); } } public class ThreadB extends Thread { private Object lock; public ThreadB(Object lock) { super(); this.lock = lock; } @Override public void run() { Service service = new Service(); service.testMethod(lock); } } public class ThreadC extends Thread { private Object lock; public ThreadC(Object lock) { super(); this.lock = lock; } @Override public void run() { Service service = new Service(); service.testMethod(lock); } }
public class NotifyThread extends Thread { private Object lock; public NotifyThread(Object lock) { super(); this.lock = lock; } @Override public void run() { synchronized (lock) { lock.notify(); } } }
public class Main { public static void main(String[] args) throws InterruptedException { Object lock = new Object(); ThreadA a = new ThreadA(lock); ThreadB b = new ThreadB(lock); ThreadC c = new ThreadC(lock); NotifyThread notifyThread = new NotifyThread(lock); c.start(); b.start(); a.start(); Thread.sleep(2000); notifyThread.start(); } }
控制台打印结果如下:
Thread-2 begin wait Thread-0 begin wait Thread-1 begin wait Thread-2 end wait
由打印结果看,Thread-2收到notify继续运行直至执行完毕,Thread-0和Thread-1一直收不到notify处于wait状态。
对NotifyThread做如下修改:
public class NotifyThread extends Thread { private Object lock; public NotifyThread(Object lock) { super(); this.lock = lock; } @Override public void run() { synchronized (lock) { lock.notify(); lock.notify(); lock.notify(); } } }
此时控制台打印结果如下:
Thread-2 begin wait Thread-0 begin wait Thread-1 begin wait Thread-2 end wait
Thread-1 end wait
Thread-0 end wait
此时三个线程全部收到notify退出等待继续运行。
对NotifyThread做如下修改:
public class NotifyThread extends Thread { private Object lock; public NotifyThread(Object lock) { super(); this.lock = lock; } @Override public void run() { synchronized (lock) { lock.notifyAll(); } } }
此时控制台打印结果如下:
Thread-2 begin wait Thread-0 begin wait Thread-1 begin wait Thread-1 end wait Thread-0 end wait Thread-2 end wait
可见notifyAll()可以唤醒全部等待线程。
方法wait(long)的使用
public class MyRunnable { static private Object lock = new Object(); static private Runnable runnable = new Runnable() { @Override public void run() { synchronized (lock) { System.out.println(System.currentTimeMillis() + " begin wait"); try { lock.wait(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + " end wait"); } } }; public static void main(String[] args) { Thread thread = new Thread(runnable); thread.start(); } }
控制台打印结果如下:
1466155949247 begin wait 1466155954247 end wait
wait(long)方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒,看下面的例子。
public class MyRunnable { static private Object lock = new Object(); static private Runnable runnable1 = new Runnable() { @Override public void run() { synchronized (lock) { System.out.println(System.currentTimeMillis() + " begin wait"); try { lock.wait(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + " end wait"); } } }; private static Runnable runnable2 = new Runnable() { @Override public void run() { synchronized (lock) { System.out.println(System.currentTimeMillis() + " begin notify"); lock.notify(); System.out.println(System.currentTimeMillis() + " end notify"); } } }; public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(runnable1); thread1.start(); Thread.sleep(2000); Thread thread2 = new Thread(runnable2); thread2.start(); } }
控制台打印结果如下:
1466156480465 begin wait 1466156482466 begin notify 1466156482466 end notify 1466156482466 end wait
thread1只等待了2000ms就被thread2唤醒。
使用wait/notify模式时,wait等待条件发生变化会造成程序逻辑混乱
public class ValueObject { public static List list = new ArrayList<>(); } public class Add { private String lock; public Add(String lock) { super(); this.lock = lock; } public void add() { synchronized (lock) { ValueObject.list.add("anyString"); lock.notifyAll(); } } } public class Subtract { private String lock; public Subtract(String lock) { super(); this.lock = lock; } public void subtract() { try { synchronized (lock) { if (ValueObject.list.size() == 0) { System.out.println(Thread.currentThread().getName() + " begin wait " + System.currentTimeMillis()); lock.wait();//释放锁 System.out.println(Thread.currentThread().getName() + " end wait " + System.currentTimeMillis()); } ValueObject.list.remove(0); System.out.println("list size = " + ValueObject.list.size()); } } catch (Exception e) { e.printStackTrace(); } } } public class ThreadAdd extends Thread { private Add add; public ThreadAdd(Add add) { super(); this.add = add; } @Override public void run() { add.add(); } } public class ThreadSubtract extends Thread { private Subtract subtract; public ThreadSubtract(Subtract subtract) { super(); this.subtract = subtract; } @Override public void run() { subtract.subtract(); } } public class Main { public static void main(String[] args) throws InterruptedException { String lock = new String(""); Add add = new Add(lock); Subtract subtract = new Subtract(lock); ThreadSubtract threadSubtract1 = new ThreadSubtract(subtract); threadSubtract1.setName("threadSubtract1"); threadSubtract1.start(); ThreadSubtract threadSubtract2 = new ThreadSubtract(subtract); threadSubtract2.setName("threadSubtract2"); threadSubtract2.start(); Thread.sleep(1000); ThreadAdd threadAdd = new ThreadAdd(add); threadAdd.setName("threadAdd"); threadAdd.start(); } }
控制台打印结果如下:
threadSubtract2 begin wait 1466386136869 threadSubtract1 begin wait 1466386136870 threadSubtract1 end wait 1466386137870 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 list size = 0 threadSubtract2 end wait 1466386137870 at java.util.ArrayList.rangeCheck(Unknown Source) at java.util.ArrayList.remove(Unknown Source) at com.umgsai.thread31.Subtract.subtract(Subtract.java:18) at com.umgsai.thread31.ThreadSubtract.run(ThreadSubtract.java:12)
出现异常的原因是threadSubtract2开始wait后释放了lock锁,threadSubtract1获取lock锁之后也进入wait。threadAdd执行了add方法后给所有线程发出notify通知,此时threadSubtract2和threadSubtract1都收到通知继续执行,所以subtract方法会被执行两次,导致抛出越界异常。
解决方法如下:
public class Subtract { private String lock; public Subtract(String lock) { super(); this.lock = lock; } public void subtract() { try { synchronized (lock) { while (ValueObject.list.size() == 0) { System.out.println(Thread.currentThread().getName() + " begin wait " + System.currentTimeMillis()); lock.wait(); System.out.println(Thread.currentThread().getName() + " end wait " + System.currentTimeMillis()); } ValueObject.list.remove(0); System.out.println("list size = " + ValueObject.list.size()); } } catch (Exception e) { e.printStackTrace(); } } }
此时控制台打印结果如下:
threadSubtract1 begin wait 1466387590950 threadSubtract2 begin wait 1466387590950 threadSubtract2 end wait 1466387591953 list size = 0 threadSubtract1 end wait 1466387591954 threadSubtract1 begin wait 1466387591954
程序不结束,threadSubtract1继续等待。
生产者/消费者模式实现
一生产一消费
public class ValueObject { public static String value = ""; } public class Producer {//生产者 private String lock; public Producer(String lock) { this.lock = lock; } public void setValue(){ try { synchronized (lock) { if (!ValueObject.value.equals("")) { lock.wait(); } String value = System.currentTimeMillis() + "-" + System.nanoTime(); System.out.println("set的值是" + value); ValueObject.value = value;//生产 lock.notify();//通知消费者 } } catch (Exception e) { e.printStackTrace(); } } } public class Consumer {//消费者 private String lock; public Consumer(String lock) { this.lock = lock; } public void getValue() { try { synchronized (lock) { if (ValueObject.value.equals("")) { lock.wait(); } System.out.println("get的值是" + ValueObject.value); ValueObject.value = "";//消费 lock.notify();//通知生产者 } } catch (Exception e) { e.printStackTrace(); } } } public class ThreadP extends Thread { private Producer producer; public ThreadP(Producer producer) { super(); this.producer = producer; } @Override public void run() { while (true) { producer.setValue(); } } } public class ThreadC extends Thread { private Consumer consumer; public ThreadC(Consumer consumer) { super(); this.consumer = consumer; } @Override public void run() { while (true) { consumer.getValue(); } } } public class Main { public static void main(String[] args) { String lock = new String(""); Producer producer = new Producer(lock); Consumer consumer = new Consumer(lock); ThreadP threadP = new ThreadP(producer); ThreadC threadC = new ThreadC(consumer); threadP.start(); threadC.start(); } }
控制台打印结果如下:
...... set的值是1466427617000-43104906339739 get的值是1466427617000-43104906339739 set的值是1466427617000-43104906350935 get的值是1466427617000-43104906350935 set的值是1466427617000-43104906361821 get的值是1466427617000-43104906361821 set的值是1466427617000-43104906374262 get的值是1466427617000-43104906374262 ......
多生产与多消费---假死
“假死”现象其实就是线程进入WAITING状态。如果全部线程都进入WAITING状态,则程序不再执行任何业务功能了。
public class ValueObject { public static String value = ""; } public class Producer { private String lock; public Producer(String lock) { super(); this.lock = lock; } public void setValue(){ try { synchronized (lock){ while (!ValueObject.value.equals("")){ System.out.println("生产者" + Thread.currentThread().getName() + " is waiting ★"); lock.wait(); // System.out.println(Thread.currentThread().getName() + " 收到通知" + System.currentTimeMillis()); } System.out.println("生产者" + Thread.currentThread().getName() + " is runnable"); String value = System.currentTimeMillis() + "-" + System.nanoTime(); ValueObject.value = value; lock.notify(); // System.out.println( Thread.currentThread().getName() + " 发出通知" + System.currentTimeMillis()); } }catch (Exception e){ e.printStackTrace(); } } } public class Consumer { private String lock; public Consumer(String lock) { super(); this.lock = lock; } public void getValue() { try { synchronized (lock) { while (ValueObject.value.equals("")) { System.out.println("消费者" + Thread.currentThread().getName() + " is waiting ☆"); lock.wait(); // System.out.println(Thread.currentThread().getName() + " 收到通知" + System.currentTimeMillis()); } System.out.println("消费者" + Thread.currentThread().getName() + " is runnable"); ValueObject.value = ""; lock.notify(); // System.out.println(Thread.currentThread().getName() + " 发出通知" + System.currentTimeMillis()); } } catch (Exception e) { e.printStackTrace(); } } } public class ThreadP extends Thread { private Producer producer; public ThreadP(Producer producer) { super(); this.producer = producer; } @Override public void run() { while (true){ producer.setValue(); } } } public class ThreadC extends Thread { private Consumer consumer; public ThreadC(Consumer consumer) { super(); this.consumer = consumer; } @Override public void run() { while (true){ consumer.getValue(); } } } public class Main { public static void main(String[] args) throws InterruptedException{ String lock = new String(""); Producer producer = new Producer(lock); Consumer consumer = new Consumer(lock); ThreadP[] threadPs = new ThreadP[2]; ThreadC[] threadCs = new ThreadC[2]; for (int i = 0; i < 2; i++){ threadPs[i] = new ThreadP(producer); threadPs[i].setName("生产者" + (i + 1)); threadCs[i] = new ThreadC(consumer); threadCs[i].setName("消费者" + (i + 1)); threadPs[i].start(); Thread.sleep(500); threadCs[i].start(); } Thread.sleep(5000); Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()]; Thread.currentThread().getThreadGroup().enumerate(threads); for (int i = 0; i < threads.length; i++){ System.out.println(threads[i].getName() + " " + threads[i].getState()); } } }
控制台打印结果如下:
...... 生产者生产者1 is waiting ★ 生产者生产者2 is waiting ★ 消费者消费者1 is runnable 消费者消费者1 is waiting ☆ 生产者生产者1 is runnable 生产者生产者1 is waiting ★ 生产者生产者2 is waiting ★ 消费者消费者2 is runnable 消费者消费者2 is waiting ☆ 消费者消费者1 is waiting ☆ main RUNNABLE Monitor Ctrl-Break RUNNABLE 生产者1 WAITING 消费者1 WAITING 生产者2 WAITING 消费者2 WAITING
最终四个线程都处于WAITING状态。这是由于生产者不仅可以唤醒消费者还可以唤醒另一个生产者,消费者不仅可以唤醒生产者还可以唤醒另一个消费者。这样情况运行的比例积少成多就会导致最终所有的线程都不能继续运行下去,所有的线程都处于WAITING状态。
要解决以上问题,只需要将生产者和消费者中的notify改成notifyAll。这样就能保证生产者一定会通知到消费者,消费者也一定能通知到生产者。
一生产与一消费:操作栈
public class MyStack { private List list = new ArrayList(); synchronized public void push(){ try { if (list.size() == 1) { this.wait();//等待消费者通知 } list.add("anyString=" + Math.random()); this.notify();//通知消费者消费 System.out.println("push=" + list.size()); } catch (Exception e) { e.printStackTrace(); } } synchronized public String pop(){ String returnValue = ""; try { if (list.size() == 0) { System.out.println("pop操作中的:" + Thread.currentThread().getName() + " 线程呈wait状态"); this.wait();//等待生产者通知 } returnValue = "" + list.get(0); list.remove(0); this.notify();//通知生产者生产 System.out.println("popo=" + list.size()); } catch (Exception e) { e.printStackTrace(); } return returnValue; } } public class Producer { private MyStack myStack; public Producer(MyStack myStack) { super(); this.myStack = myStack; } public void pushService() { myStack.push(); } } public class Consumer { private MyStack myStack; public Consumer(MyStack myStack) { super(); this.myStack = myStack; } public void popService() { System.out.println("pop=" + myStack.pop()); } } public class ThreadP extends Thread { private Producer producer; public ThreadP(Producer producer) { super(); this.producer = producer; } @Override public void run() { while (true) { producer.pushService(); } } } public class ThreadC extends Thread { private Consumer consumer; public ThreadC(Consumer consumer) { super(); this.consumer = consumer; } @Override public void run() { while (true) { consumer.popService(); } } } public class Main { public static void main(String[] args) { MyStack myStack = new MyStack(); Producer producer = new Producer(myStack); Consumer consumer = new Consumer(myStack); ThreadP threadP = new ThreadP(producer); ThreadC threadC = new ThreadC(consumer); threadP.start(); threadC.start(); } }
控制台打印结果如下:
...... push=1 popo=0 pop=anyString=0.7458428547380943 push=1 popo=0 pop=anyString=0.5743458836305334 push=1 popo=0 pop=anyString=0.7497839844319785 ......
通过十一生产者/消费者模式,list的size在0和1之间交替,也就是生产和消费两个过程交替执行。
一生产与多消费---操作栈:解决wait条件改变与假死
将以上代码做如下修改:
public class Main { public static void main(String[] args) { MyStack myStack = new MyStack(); Producer producer = new Producer(myStack); Consumer consumer1 = new Consumer(myStack); Consumer consumer2 = new Consumer(myStack); Consumer consumer3 = new Consumer(myStack); Consumer consumer4 = new Consumer(myStack); Consumer consumer5 = new Consumer(myStack); ThreadP threadP = new ThreadP(producer); ThreadC threadC1 = new ThreadC(consumer1); ThreadC threadC2 = new ThreadC(consumer1); ThreadC threadC3 = new ThreadC(consumer1); ThreadC threadC4 = new ThreadC(consumer1); ThreadC threadC5 = new ThreadC(consumer1); threadP.start(); threadC1.start(); threadC2.start(); threadC3.start(); threadC4.start(); threadC5.start(); } }
运行结果如下:
pop操作中的:Thread-1 线程呈wait状态 pop操作中的:Thread-3 线程呈wait状态 push=1 popo=0 pop=anyString=0.5057892188371899 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 pop= at java.util.ArrayList.rangeCheck(Unknown Source) at java.util.ArrayList.get(Unknown Source) at com.umgsai.thread.thread33.MyStack.pop(MyStack.java:28) at com.umgsai.thread.thread33.Consumer.popService(Consumer.java:11) at com.umgsai.thread.thread33.ThreadC.run(ThreadC.java:13) pop操作中的:Thread-2 线程呈wait状态 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(Unknown Source) at java.util.ArrayList.get(Unknown Source) at com.umgsai.thread.thread33.MyStack.pop(MyStack.java:28) pop= at com.umgsai.thread.thread33.Consumer.popService(Consumer.java:11) at com.umgsai.thread.thread33.ThreadC.run(ThreadC.java:13) pop操作中的:Thread-5 线程呈wait状态 pop操作中的:Thread-4 线程呈wait状态 pop操作中的:Thread-1 线程呈wait状态 pop操作中的:Thread-3 线程呈wait状态
出现异常的原因是MyStack中使用了if判断,条件发生改变是没有及时得到响应,导致多个呈wait状态的线程被唤醒。解决办法是将if换成while即可,如下。
public class MyStack { private List list = new ArrayList(); synchronized public void push(){ try { while (list.size() == 1) { this.wait();//等待消费者通知 } list.add("anyString=" + Math.random()); this.notify();//通知消费者消费 System.out.println("push=" + list.size()); } catch (Exception e) { e.printStackTrace(); } } synchronized public String pop(){ String returnValue = ""; try { while (list.size() == 0) { System.out.println("pop操作中的:" + Thread.currentThread().getName() + " 线程呈wait状态"); this.wait();//等待生产者通知 } returnValue = "" + list.get(0); list.remove(0); this.notify();//通知生产者生产 System.out.println("popo=" + list.size()); } catch (Exception e) { e.printStackTrace(); } return returnValue; } }
重新运行程序发现程序出现假死情况,此时需要将notify修改成notifyAll,具体代码略。
多生产与一消费---操作栈
将上面例子中的main函数作如下修改:
public class Main { public static void main(String[] args) { MyStack myStack = new MyStack(); Producer producer1 = new Producer(myStack);//多个生产者 Producer producer2 = new Producer(myStack); Producer producer3 = new Producer(myStack); Producer producer4 = new Producer(myStack); Producer producer5 = new Producer(myStack); ThreadP threadP1 = new ThreadP(producer1); ThreadP threadP2 = new ThreadP(producer2); ThreadP threadP3 = new ThreadP(producer3); ThreadP threadP4 = new ThreadP(producer4); ThreadP threadP5 = new ThreadP(producer5); threadP1.start(); threadP2.start(); threadP3.start(); threadP4.start(); threadP5.start(); Consumer consumer = new Consumer(myStack);//一个消费者 ThreadC threadC = new ThreadC(consumer); threadC.start(); } }
控制台打印结果如下:
...... push=1 popo=0 pop=anyString=0.2152252322693482 push=1 popo=0 pop=anyString=0.0532083036226767 push=1 popo=0 pop=anyString=0.4428728222597489 push=1 popo=0 pop=anyString=0.7830190996209108 push=1 ......
多生产与多消费
将以上例子中的main函数作如下修改:
public class Main { public static void main(String[] args) { MyStack myStack = new MyStack(); Producer producer1 = new Producer(myStack); Producer producer2 = new Producer(myStack); Producer producer3 = new Producer(myStack); Producer producer4 = new Producer(myStack); Producer producer5 = new Producer(myStack); ThreadP threadP1 = new ThreadP(producer1); ThreadP threadP2 = new ThreadP(producer2); ThreadP threadP3 = new ThreadP(producer3); ThreadP threadP4 = new ThreadP(producer4); ThreadP threadP5 = new ThreadP(producer5); threadP1.start(); threadP2.start(); threadP3.start(); threadP4.start(); threadP5.start(); Consumer consumer1 = new Consumer(myStack); Consumer consumer2 = new Consumer(myStack); Consumer consumer3 = new Consumer(myStack); Consumer consumer4 = new Consumer(myStack); Consumer consumer5 = new Consumer(myStack); ThreadC threadC1 = new ThreadC(consumer1); ThreadC threadC2 = new ThreadC(consumer2); ThreadC threadC3 = new ThreadC(consumer3); ThreadC threadC4 = new ThreadC(consumer4); ThreadC threadC5 = new ThreadC(consumer5); threadC1.start(); threadC2.start(); threadC3.start(); threadC4.start(); threadC5.start(); } }
控制台打印结果如下:
...... push=1 popo=0 pop=anyString=0.24963884163395456 pop操作中的:Thread-8 线程呈wait状态 pop操作中的:Thread-9 线程呈wait状态 pop操作中的:Thread-7 线程呈wait状态 pop操作中的:Thread-5 线程呈wait状态 pop操作中的:Thread-6 线程呈wait状态 push=1 popo=0 pop=anyString=0.8586956700280816 pop操作中的:Thread-5 线程呈wait状态 pop操作中的:Thread-7 线程呈wait状态 pop操作中的:Thread-9 线程呈wait状态 pop操作中的:Thread-8 线程呈wait状态 ......
相关文章推荐
- FileUpload系列:(3)文件上传和下载示例
- java中的Serializable接口一:适用环境
- java按比例压缩图片的源代码,用java如何把图片处理到指定大小
- Java 多线程编程之一 进程与线程,并发和并行的区别:吃馒头的比喻
- Java 多线程编程之五:一个理解 wait() 与 notify() 的例子
- Java 多线程编程之六:线程之间的通信(附源代码)
- java的nio实现
- 华为桌面云中和Java相关的问题
- 查看Java的默认字符编码
- Java 多线程编程之七:死锁(附源代码)
- java中的抽象类和接口
- java hashtable实现内存缓存
- Struts1内部原理详解
- FileUpload系列:(2)文件上传示例
- Java 进行 RSA 加解密的例子
- Java并发编程-正确理解volatile关键字的两层语义
- 关于springmvc跨域
- Java 理论和实践:线程池和工作队列
- FileUpload系列:(1)文件上传的思路和示例
- Java解析Json字符串