您的位置:首页 > 编程语言 > Java开发

Java多线程之线程间通信--等待(wait)/通知(notify)机制,等待/通知之交叉备份实例

2017-08-27 15:45 1066 查看
1、什么是等待/通知机制

  等待/通知机制在生活中比比皆是,比如在就餐时就会出现,如图所示。

  


  厨师和服务员之间的交互要在“菜品传递台”上,在这期间会有几个问题:

  1).厨师做完一道菜的时间不确定,所以厨师将菜品放到‘菜品传递言,上的时间也不确定。

  2).服务员取到菜的时间取决于厨师,所以服务员就有“等待”(wait)的状态。

  3).服务员如何能取到菜呢?这又得取决于厨师,厨师将菜放在“菜品传递台”上,其实就相当于一种通知(notify),这时服务员才可以拿到菜并交给就餐者。

  4).在这个过程中出现了“等待/通知”机制。

  需要说明的是,前面章节中多个线程之间也可以实现通信,原因就是多个线程共同访问同一个变量,但那种通信机制不是“等待/通知”,两个线程完全是主动式地读取一个共享变量,在花费读取时间的基础上,读到的值是不是想要的,并不能完全确定。所以现在迫切需要一种“等待/通知”机制来满足上面的需求。

  

2、等待/通知机制的实现

  方法wait()的作用是使当前执行代码的线程进行等待,wait()方法是Object类的方法,该方法用来将当前线程置入“预执行队列”中,并且在wait()所在的代码行处停止执行,直到接到通知或被中断为止。在调用wait()之前,线程必须获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait方法。在执行wait()方法后,当前线程释放锁。在从wait()返回前,线程与其他线程竞争重新获得锁。如果调用wait()时没有持有适当的锁,则抛出IllegalMonitorStateException,它是RuntimeException的一个子类,因此,不需要try-catch语句进行捕捉异常。

  方法notify()也要在同步方法或同步块中调用,即在调用前,线程也必须获得该对象的对象级别锁。如果调用notify()时没有持有适当的锁,一也会抛出IllegalMonitorStateException。该方法用来通知那些可能等待该对象的对象锁的其他线程,如果有多个线程等待,则由线程规划器随机挑选出其中一个呈wait状态的线程,对其发出通知notify,并使它等待获取该对象的对象锁。需要说明的是,在执行notify方法后,当前线程不会马上释放该对象锁,呈wait状态的线程也并不能马上获取该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出synchronized代码块后,当前线程才会释放锁,而呈wait状态所在的线程才可以获取该对象锁。当第一个获得了该对象锁的wait线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用notify语句,则即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,还会继续阻塞在wait状态,直到这个对象发出一个notity或notifyAll.

  用一句话来总结一下wait和notify: wait使线程停止运行,而notify使停止的线程继续运行。

public class Test1 {
public static void main(String[] args) {
try {
String newString = new String("");
newString.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


Exception in thread “main” java.lang.IllegalMonitorStateException

at java.lang.Object.wait(Native Method)

at java.lang.Object.wait(Object.java:503)

出现的异常的原因是没有“对象监视器”,也就是没有同步加锁。

public class Test1 {
public static void main(String[] args) {
try {
String lock = new String("");
System.out.println("syn上面");
synchronized (lock) {
System.out.println("syn第一行");
lock.wait();
System.out.println("wait下的代码");
}
System.out.println("syn下面的代码");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


syn上面

syn第一行

  wait方法下面的代码不执行了。必须使用notify才行。

  关键字synchronized可以将任何一个Object对象作为同步对象来看待,而Java为每个Object都实现了wait()和notify()方法,它们必须用在被synchronized同步的Object的临界区内。通过调用wait()方法可以使处于临界区内的线程进人等待状态,同时释放被同步对象的锁。而notify操作可以唤醒一个因调用了wait操作而处于阻塞状态中的线程,使其进人就绪状态。被重新换醒的线程会试图重新获得临界区的控制权,也就是锁,并继续执行临界区内wait之后的代码。如果发出noti勿操作时没有处于阻塞状态中的线程,那么该命令会被忽略。

3、线程状态

  线程的状态转换是线程控制的基础。线程状态总的可分为五大状态:分别是生、死、可运行、运行、等待/阻塞。用一个图来描述如下:

  


  1).新建状态:线程对象已经创建,还没有在其上调用start()方法。

  2).可运行状态:当线程有资格运行,但调度程序还没有把它选定为运行线程时线程所处的状态。当start()方法调用时,线程首先进入可运行状态。在线程运行之后或者从阻塞、等待或睡眠状态回来后,也返回到可运行状态。

  3).运行状态:线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。

  4).等待/阻塞/睡眠状态:这是线程有资格运行时它所处的状态。实际上这个三状态组合为一种,其共同点是:线程仍旧是活的,但是当前没有条件运行。换句话说,它是可运行的,但是如果某件事件出现,他可能返回到可运行状态。

  5).死亡状态:当线程的run()方法完成时就认为它死去。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦死亡,就不能复生。如果在一个死去的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。

4、方法wait()锁释放与notify()锁不释放

  当方法wait()被执行后,锁被自动释放,但执行完notify()方法,锁却不自动释放。

public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait()");
lock.wait();
System.out.println("end wait()");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class MyThread1 extends Thread {
private Object lock;

public MyThread1(Object lock) {
super();
this.lock = lock;
}

public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class MyThread2 extends Thread{
private Object lock;

public MyThread2(Object lock) {
super();
this.lock = lock;
}

public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
MyThread1 a = new MyThread1(lock);
a.start();
MyThread2 b = new MyThread2(lock);
b.start();
}
}


begin wait()

begin wait()

  方法wait()自动释放锁

  如果将Service方法里的lock.wait()修改为Thread.sleep(2000);,就成了同步效果,输出结果为:

begin wait()

  原因是sleep方法不释放锁。

证明:方法notify()被执行后,不释放锁

将上面的service类修改为:

public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait() ThreadName="
+ Thread.currentThread().getName());
lock.wait();
System.out.println("  end wait()ThreadName="
+ Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void synNotifyMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait() ThreadName="
+ Thread.currentThread().getName());
lock.notify();
Thread.sleep(3000);
System.out.println("  end wait()ThreadName="
+ Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


MyThread2.java里的run 方法修改为:

public void run() {
Service service = new Service();
service.synNotifyMethod(lock);
}


begin wait() ThreadName=Thread-0

begin wait() ThreadName=Thread-1

end wait()ThreadName=Thread-1

end wait()ThreadName=Thread-0

  此实验说明:必须执行完notify()方法所在的同步synchronized代码块后才释放锁。

public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait() ThreadName="
+ Thread.currentThread().getName());
lock.wait();
System.out.println("  end wait()ThreadName="
+ Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
MyThread1.java同上不变
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
MyThread1 a = new MyThread1(lock);
a.start();
Thread.sleep(3000);
a.interrupt();
}
}


begin wait() ThreadName=Thread-0

java.lang.InterruptedException

 at java.lang.Object.wait(Native Method)

 at java.lang.Object.wait(Object.java:503)

出现异常:因为wait状态的线程被interrupt了

  通过上面的几个实验可以总结如下3点:

  1)执行完同步代码块就会释放对象的锁。

  2)在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放。

  3)在执行同步代码块的过程中,执行了锁所属对象的wait()方法,这个线程会释放对象锁,而此线程对象会进人线程等待池中,等待被唤醒。

  notify()一次只能唤醒一个线程,notify()可以唤醒全部线程。

5、方法wait(long)的使用

  带一个参数的wait(long)方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒。这里就不写例子了。

6、通过过早

  如果通过过早,则会打乱程序正常的运行逻辑。 下面是正常的

public class MyRunnable {
private String lock = new String("");

private Runnable runnableA = new Runnable() {
public void run() {
try {
synchronized (lock) {
System.out.println("begin wait");
lock.wait();
System.out.println("end wait");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

private Runnable runnableB = new Runnable() {
public void run() {
synchronized (lock) {
System.out.println("begin notify");
lock.notify();
System.out.println("end notify");
}
}
};

public static void main(String[] args) {
MyRunnable run = new MyRunnable();
Thread a = new Thread(run.runnableA);
a.start();
Thread b = new Thread(run.runnableB);
b.start();
}
}


begin wait

begin notify

end notify

end wait

将上面的main方法修改(修改线程的启动顺序)为如下

public static void main(String[] args) throws InterruptedException {
MyRunnable run = new MyRunnable();
Thread b = new Thread(run.runnableA);
b.start();

Thread a = new Thread(run.runnableB);
a.start();
}


begin notify

end notify

begin wait

  方法wait永远不能被通知。如果先通知,则wait方法也就没必要执行了。

7、等待wait的条件发生变化

  在使用wait/notify模式时,还需要注意另外一种情况,也就是wait等待的条件发生了变化,也容易造成程序逻辑的混乱。

public class ValueObjects {
public static List list = new ArrayList();
}
//这个类新增数据
public class Add {
private String lock;
public Add(String lock) {
this.lock = lock;
}
public void add() {
synchronized (lock) {
ValueObjects.list.add("anyString");
lock.notifyAll();
}
}
}
//这个类减去数据
public class Subtract {
private String lock;
public Subtract(String lock) {
this.lock = lock;
}
public void subtract() {
try {
synchronized (lock) {
if (ValueObjects.list.size() == 0){
System.out.println("wait begin ThreadName="
+ Thread.currentThread().getName());
lock.wait();
System.out.println("wait   end ThreadName="
+ Thread.currentThread().getName());
}
ValueObjects.list.remove(0);
System.out.println("list size=" + ValueObjects.list.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 两个线程类代码
public class MyThread1 extends Thread {
private Add p;

public MyThread1(Add p) {
super();
this.p = p;
}

public void run() {
p.add();
}
}
public class MyThread2 extends Thread{
private Subtract r;

public MyThread2(Subtract r) {
super();
this.r = r;
}

public void run() {
r.subtract();
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
String lock = new String("");
Add add = new Add(lock);
Subtract subtract = new Subtract(lock);

MyThread2 subThread1 = new MyThread2(subtract);
subThread1.setName("subtract1-Thread");
subThread1.start();

MyThread2 subThread12 = new MyThread2(subtract);
subThread12.setName("subtract2-Thread");
subThread12.start();

Thread.sleep(1000);
MyThread1 addThread = new MyThread1(add);
addThread.setName("add-Thread");
addThread.start();
}
}


wait begin ThreadName=subtract2-Thread

wait begin ThreadName=subtract1-Thread

wait end ThreadName=subtract1-Thread

list size=0

wait end ThreadName=subtract2-Thread

Exception in thread “subtract2-Thread” java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

at java.util.ArrayList.rangeCheck(ArrayList.java:635)

at java.util.ArrayList.remove(ArrayList.java:474)

  出现这样异常的原因是因为有两个实现删除removed操作的线程,它们在Thread.sleep(1000);之前都执行了wait()方法,呈等待状态,当加操作的线程在1秒之后被运行时,通知了所有呈wait等待状态的减操作的线程,那么第一个实现减操作的线程能正确地删除list中索引为0的数据,但第二个实现减操作的线程则出现索引溢出的异常,因为list中仅仅添加了一个数据,也只能删除一个数据,所以没有第二个数据可供删除。如何解决这样的问题呢?

  很简单,把Subtract.java中的subtract方法里的 if 代换为 while就行了,结果如下:

wait begin ThreadName=subtract2-Thread

wait begin ThreadName=subtract1-Thread

wait end ThreadName=subtract1-Thread

list size=0

wait end ThreadName=subtract2-Thread

wait begin ThreadName=subtract2-Thread

二、等待/通知之交叉备份实例

  题目:创建20个线程,其中10个线程是将数据备份到A数据库中,另外10个线程将数据备份到B数据库中,并且备份A数据库和B数据库是交叉进行的。

  首先创建出20个线程,效果如图3-41所示。

  通过一些手段将这20个线程的运行效果变成有序的,如图3-42所示。

  


使用的技术还是等待/通知。

public class DBTools {
//确保备份数据库A首先执行
private volatile boolean prevIsA = false;

public synchronized void backupA() {
try {
while (prevIsA == true) {
wait();
}
for (int i=0; i<5; i++) {
System.out.println("+++++");
}
prevIsA = true;
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public synchronized void backupB() {
try {
while (prevIsA == false) {
wait();
}
for (int i=0; i<5; i++) {
System.out.println("^^^^^");
}
prevIsA = false;
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


两个自定义线程

public class BackupA extends Thread {
private DBTools dbTools;

public BackupA(DBTools dbTools) {
this.dbTools = dbTools;
}
public void run() {
dbTools.backupA();
}
}
public class BackupB extends Thread {
private DBTools dbTools;

public BackupB(DBTools dbTools) {
this.dbTools = dbTools;
}
public void run() {
dbTools.backupB();
}
}


public class Run {
public static void main(String[] args) throws InterruptedException {
DBTools dbTools = new DBTools();
for (int i=0; i<20; i++) {
BackupB output = new BackupB(dbTools);
output.start();
BackupA input = new BackupA(dbTools);
input.start();
}
}
}


+++++

+++++

+++++

+++++

+++++

^^^^^

^^^^^

^^^^^

^^^^^

^^^^^

+++++

+++++

+++++

+++++

+++++

^^^^^

^^^^^

^^^^^

^^^^^

^^^^^

……

打印的效果是交替运行的。

  交替打印的原理就是使用如下代码作为标记:

  private boolean volatile prevIsA = false;

  实现了A和B线程交替备份的效果
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息