您的位置:首页 > 其它

【多线程】线程通信之等待/通知机制

2018-01-28 22:17 573 查看
一、概念

       线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体。线程间的通信就是成为整体的必用方案之一。可以说,使线程进行通信后,系统之间的交互性会更强大,在大大提高CPU利用率的同时还会使开发人员对各线程任务在处理的过程中进行有效的把控和监督。

二、线程的执行模式

       1、互不通信的多线程模式

       多个线程会在系统中并发执行。线程之间不需要处理共享的数据,也不需要进行动作协调,也就是多个独立的线程各自完成自己线程中的工作。如下图所示的两个线程,没有交集,各自执行各自的任务和逻辑。老死不相往来。

                                           


       2、基于共享容器协同的多线程模式

       在多个线程之间对共享的数据进行处理,例如生产者和消费者的例子,我们有一个队列用于生产和消费,那么这个队列就是多个线程共享的一个容器或者是数据对象,多个线程会并发地访问这个队列。

                                        


        像这种在多线程环境下对同一份数据的访问,我们需要保证线程安全

        3、通过事件协同的多线程模式

         除了并发访问的控制,线程间会存在着协调的需求,例如A、B两个线程,B线程需要等到某个状态或事件发生后才能继续自己的工作,而这个状态改变或者事件产生和A线程有关。那么在这个场景下,就需要完成线程间的协调。

                                   


       如上图所示,右侧的线程,在执行到某个步骤时需要等待一个事件,而这个事件由左侧线程触发。右侧线程一直阻塞直到事件通知到达后才继续自己的执行。

三、等待/通知机制

       等待/通知机制在生活中比比皆是,比如在“马玉涛麻辣烫”就餐时就会出现。  顾客自选菜品成功付款之后,服务员会给顾客一个卡号。然后顾客就处于"等待"(wait)的状态。片刻之后,顾客听到“叮咚,请XX号顾客到餐口取餐”的声音,这就相当于一种通知(notify)。这时顾客才会到餐口把自己的餐取走,并大快朵颐。

       方法wait()的的作用是使当前执行代码的线程进行等待,使当前线程置入“等待队列”,直到接收到通知或被中断为止。在调用wait()方法之前,线程必须获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法。在执行wait()方法后,当前线程立即释放锁。

       方法notify()也要在同步方法或同步代码块中调用,即在调用前,线程也必须获得该对象的对象级别锁。该方法用来通知那些处于wait状态的线程,对其发出通知notify,和wait不同的是,在执行notify方法后,当前线程不会马上释放该对象锁,呈wait状态的线程也就不能马上获取该对象锁,要等待执行notify()方法的线程将呈现执行完,或者退出synchronized代码块后,当前线程才会释放锁。而呈wait状态所在的线程才可以获取该对象锁。第一个抢到锁的wait线程运行完毕后,就会释放该对象锁,此时如果该对象没有再次使用notify语句,则其他呈wait状态等待的线程由于没有得到该对象的通知,还会继续阻塞在wait状态。直到这个对象再次发出一个notify或notifyAll。

        来总结一下wait和notify:

       ①wait使当前线程停止运行,而notify使处于wait状态的线程继续运行。

       ②wait和notify必须配合synchronized关键字进行使用

       ③wait方法立即释放锁,notify方法不立即释放锁。

       ④wait和notify都是Object的类方法,换句话说,为JDK为所有对象提供了这两个方法。

public class Test01 {

public static void main(String[] args) {
try {
String newString=new String("");
newString.wait();

} catch (Exception e) {
e.printStackTrace();
}

}

      程序运行后的效果如下图所示:

      


      出现异常的原因是没有“对象监视器”,也就是没有同步加锁。将newString.wait()代码块进行加锁。

public class Test01 {

public static void main(String[] args) {
try {
String newString=new String("");
synchronized (newString) {
newString.wait();
}
} catch (Exception e) {
e.printStackTrace();
}

}

}

     


         当前main线程呈wait状态,如果不进行notify,main线程将永远处于阻塞状态。

         

         接下来看一个完整的等待/通知例子。声明两个自定义线程类:MyThread1和MyThread2。MyThread1执行wait操作,MyThread2执行notify操作。

public class MyThread1 extends Thread {
private Object lock;
public MyThread1(Object lock){
this.lock=lock;
}
@Override
public void run(){
try {
synchronized (lock) {
System.out.println("开始           wait  time="+System.currentTimeMillis());
lock.wait();
System.out.println("结束            wait  time="+System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

public class MyThread2 extends Thread {
private Object lock;
public MyThread2(Object lock){
super();
this.lock=lock;
}
@Override
public void run(){
synchronized (lock) {
System.out.println("开始           notify  time="+System.currentTimeMillis());
lock.notify();
System.out.println("结束            notify  time="+System.currentTimeMillis());
}
}
}


public class Test {

public static void main(String[] args) {
try {
Object lock=new Object();
MyThread1 t1=new MyThread1(lock);
t1.start();//执行wait的线程
Thread.sleep(3000);
MyThread2 t2=new MyThread2(lock);
t2.start();//执行notify的线程

} catch (Exception e) {
e.printStackTrace();
}

}

}

     输出结果如下:

    


       从控制台打印结果可知:3秒后线程被notify通知唤醒。日志信息  结束  wait在最后输出,这也说明notify方法执行后并不立即释放锁。这个例子应该属于 通过事件协同的多线程模式

      分析一下锁变化的过程:首先t1线程获得锁,之后t1线程进入阻塞状态,释放锁,t2线程获得锁,t2执行完毕后释放锁,t1重新获得锁,进入就绪状态.

       Java为每个Object都实现了wait和notify方法,他们必须用在被synchronized同步的Object的临界区内。通过调用wait可以说处于临界区内的线程进入等待状态,同时释放同步对象的锁。而notify操作可以唤醒一个因调用了wait操作而处于阻塞状态中的线程,使其进入就绪状态。被重新唤醒的线程会视图重新获得临界区的控制权,也就是锁,并继续执行临界区内wait之后的代码。如果发出notify操作时没有处于阻塞状态中的线程,该命令自动被忽略。

       wait方法可以使调用该方法的线程释放共享资源的锁,进入等待队列,直到被再次唤醒。

       notify方法可以随机唤醒等待队列中等待同一共享资源的“一个”线程,,进入可运行状态,也就是notify方法仅仅通知“一个”线程。

        notifyAll方法可以使所有正在等待队列中等待同一共享资源的“全部”线程进入可运行状态。

        每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调用;泛指,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。

四、生产者/消费者模式实现

       等待/通知模式最经典的案例就是“生产者/消费者”模式。但此模式在使用上有几种“变形”

       1、一生产一消费:操作值

       创建生产者P.java类,代码如下:

public class P {
private String lock;
public P(String lock){
//super();
this.lock=lock;
}
public void setValue(){
try {
synchronized (lock) {
if (!ValueObject.value.equals("")) {
lock.wait();
}
String value=System.currentTimeMillis()+"_"+System.nanoTime();
System.out.println("set的值是"+value);
ValueObject.value=value;
lock.notify();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

     创建消费者类C.java,代码如下:

      

public class C {
private String lock;
public C(String lock){
super();
this.lock=lock;
}
public void getValue(){
try {
synchronized (lock) {
if (ValueObject.value.equals("")) {
lock.wait();
}
System.out.println("get的值是"+ValueObject.value );
ValueObject.value="";
lock.notify();
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

     创建工具类,存储值的对象ValueObject。

     

public class ValueObject {
public static String value="";
}

     创建两个自定义线程类,一个是生产者线程,另一个是消费者线程。

     ThreadP:生产者线程类

     

public class ThreadP extends Thread {
//创建两个线程对象,一个是生产者线程,另外一个是消费者线程
private P p;
public ThreadP(P p){
super();
this.p=p;
}
@Override
public void run(){
while (true) {
p.setValue();
}
}
}

     消费者线程类:ThreadC

     

public class ThreadC extends Thread {
private C r;
public ThreadC(C r){
super();
this.r=r;
}
@Override
public void run(){
while (true) {
r.getValue();
}
}
}

     运行类Run.Java代码如下:

     

public class Run {

public static void main(String[] args) {
String lock=new String("");
P p=new P(lock);
C r=new C(lock);
ThreadP pThread=new ThreadP(p);
ThreadC rThread=new ThreadC(r);
pThread.start();
rThread.start();

}

}

       简单说一下代码。生产者负责放值,并通知消费者,并判断容器ValueObject.valueSetValue是否为“”,如果不为“”,也就是容器中有数据,生产者线程处于阻塞状态。消费者负责取值,并通知生产者,并判断容器是否为“”,如果为“”,也就是容器中没有数据,则消费者线程处于阻塞状态。

       打印结果如下:

       


      本示例是1个生产者和1个消费者进行数据的交互,在控制台中打印的日志get和set是交替运行的。

      2、一生产者一消费者操作栈

      首先声明工具类MyStack

      

/*一生产者一消费者:操作栈。使生产者向堆栈list对象中方法数据,是消费者从list堆栈中取出数据,list最大容量是1.实验环境中只有一个生产者与1个消费者*/
public class MyStack {
private List list=new ArrayList<>();
synchronized public void push(){
try {
if (list.size()==1) {
this.wait();
}
list.add("anyString="+Math.random());
this.notify();
System.out.println("push="+list.size());
} catch (Exception e) {
e.printStackTrace();
}
}

synchronized public String pop(){
String returnValue="";
try {
if (list.size()==0) {
System.out.println("pop操作中的"+Thread.currentThread().getName()+" 线程呈wait状态");
this.wait();
}
returnValue=""+list.get(0);
list.remove(0);
this.notify();
System.out.println("pop="+list.size());
} catch (Exception e) {
e.printStackTrace();
}
return returnValue;
}
}

     生产者

     

public class P {
private MyStack myStack;
public P(MyStack myStack){
super();
this.myStack=myStack;
}
public void pushService(){
myStack.push();
}
}

     消费者:

      

public class C {
private MyStack myStack;
public C(MyStack myStack){
super();
this.myStack=myStack;
}
public void popService(){
System.out.println("pop="+myStack.pop());
}
}

     生产者线程:

      

public class P_Thread extends Thread {
private P p;
public P_Thread(P p){
super();
this.p=p;
}
@Override
public void run(){
while (true) {
p.pushService();
}
}
}

     消费者线程:

     

public class C_Thread extends Thread {
private C r;
public C_Thread(C r){
super();
this.r=r;
}
@Override
public void run(){
while (true) {
r.popService();
}
}
}

     
public class Run {

public static void main(String[] args) {
MyStack myStack=new MyStack();//得到工具类对象,堆栈
P p=new P(myStack); //生产者对象
C r=new C(myStack); //消费者对象
P_Thread pThread=new P_Thread(p);//生产者线程对象
C_Thread cThread =new C_Thread(r);//消费者线程对象
pThread.start();
cThread.start();

}

}

     通过使用生产者/消费者模式,容器size()的值不会大于1.

      

     

     

  

         

   
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: