您的位置:首页 > 大数据 > 人工智能

SE高阶(6):多线程—③线程通信(协调运行)

2017-04-26 12:21 337 查看

建议使用目录功能来查看内容。

Java的多线程通信方式

同步机制
通过条件控制线程协调运行(wait/notify机制、Lock类和Condition对象)
管道流

同步机制

多个线程通过synchronized关键字这种方式来实现线程间的通信,属于共享变量的方式。
例如两个线程并发访问的共享资源属于同一个类,把该类对象作为同步监视器,每次只能有一个线程获得锁,这可以保证两个线程有序执行,算是一种简单通信。但有时候线程会遇到满足条件之后才执行的情况,这时就需要条件变量来协调线程间的运行。

wait/notify机制

wait()、notify()、notifyAll()是Object类的方法,由对象来调用,一般和synchronized联用。在同步代码块中,需要使用显式传入的同步锁对象。而在同步方法中,同步锁对象是是隐式传入的,即this,所以能直接使用也可以使用this来调用。

wait():导致当前线程进入等待,要唤醒只能使用notify()或者notifyAll()。该方法会释放锁的拥有权,而sleep()不会释放锁。
notify():随机唤醒一个等待获取当前对象锁的线程。唤醒的线程只有获得对同步监视器的锁定才可以被执行。
notifyAll():唤醒处于等待的所有线程,但只有获得对同步监视器的锁定的线程才会被执行。

wait()/notify()代码实例

两个线程实现轮番打印0101...
public class Numbers {
private int i = 1; //指定初始值
public static void main(String[] args) {
Numbers n = new Numbers();
Atest at = new Atest(n);
Btest bt = new Btest(n);
Thread t1  = new Thread(at,"A");
Thread t2  = new Thread(bt,"B");
t1.start();
t2.start();
}
//打印1的方法
public synchronized void printOne() throws InterruptedException {
//i不等于0,让当前线程等待
if(i != 0)
this.wait();
//线程唤醒并执行i自增,和打印0方法的线程进行协调运行
i++;
System.out.print(i + " ");
this.notify();//唤醒任意一个等待线程
}
//打印0的方法
public synchronized void printZero() throws InterruptedException {
//因为要执行i自减1,所以i为0时,让当前线程等待
if(i == 0)
this.wait();
//线程唤醒并执行i自减,和打印1方法的线程进行协调运行
i--;
System.out.print(i + " ");
this.notify();//唤醒任意一个等待线程
}
}
//执行打印0的线程
class Atest implements Runnable{
private Numbers numbers;
public Atest(Numbers numbers) {
this.numbers = numbers;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"抢到处理权!");
for(int i=0; i<20; i++) {
try {
numbers.printZero();
} catch (InterruptedException e) {}//省略异常信息打印。。。
}
}
}
//执行打印1的线程
class Btest implements Runnable{
private Numbers numbers;
public Btest(Numbers numbers) {
this.numbers = numbers;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"抢到处理权!");
for(int i=0; i<20; i++) {
try {
numbers.printOne();
} catch (InterruptedException e) {}//省略异常信息打印。。。
}
}
}

执行流程:A线程和B线程任意一个获得执行权,如果是B线程获得,则会执行打印1的方法,因为i指定了初始值为1,所以执行printOne()方法时,满足i != 0条件,B线程进入等待,并且释放了同步锁。此时A线程开始执行,运行printZero()方法时,因为i为1,不满足等于0的条件,所以向下执行i自减,打印输出0。然后随机唤醒一个等待的线程(注意:被唤醒的线程和其他线程一样需要争抢执行权,但因为设置了条件,所以能保证线程能够得到有效执行),此时A线程和被唤醒线程在争抢执行权,如果A线程抢到了,根据方法中的条件,A线程会进入等待,此时B线程就得到了执行,因为i
== 0,所以能执行到i自增,则打印输出1。然后再次随机唤醒一个等待线程。A、B线程就这样轮番执行20次。
实例解析:讲完了执行流程,会觉得以上代码没问题,但如果启动多个线程来执行printOne()方法呢?这时结果再也不是0101了,执行代码如下:
public static void main(String[] args) {
Numbers n = new Numbers();
Atest at = new Atest(n);
Btest bt = new Btest(n);
Thread t1  = new Thread(at,"A-");
//使用三个线程来执行打印1,该三个线程都是使用同一个Runnable对象,所以printOne()方法只会执行20次
Thread t2  = new Thread(bt,"B-");
Thread t3  = new Thread(bt,"C-");
Thread t4  = new Thread(bt,"D-");
t1.start();
t2.start();
t3.start();
t4.start();
}

执行结果,看图:





线程执行出错分析:

输出“抢到处理权”语句的这一段代码放在for循环之前,是非同步的,因为线程调度的不确定性,所以A、B、C、D线程执行顺序不确定。如果A线程获得执行权,输出“A-抢到处理权“之后,不一定就会进入for循环执行PrintZero()的方法,很可能下一时刻就被其他线程抢到执行权,从而没执行A线程的方法。

如果B、C、D线程之一抢到处理权(假定为B),因为i初始值为1,满足条件(i !=0),B线程进入等待,这时就剩A、C、D线程争抢执行权。
如果是C和D连续获得执行权,和B线程一样,根据条件进入等待。此时唯一能执行的就是A线程了,所以A线程执行i自减1和输出0语句,然后唤醒B、C、D线程其中之一。

如果唤醒了B线程,B线程依然需要和A线程争抢执行权,因为此时i等于0,所以A线程抢到执行权就进入等待,或者没抢到,然后B线程一定会被执行,执行了自增1和输出1语句,然后又随机唤醒一个等待线程。
重点:如果A、B线程交替执行,那看起来是正常的。但如果是C或D线程被唤醒,则往下执行,不再进行判断(因为if只能判断一次)则i会再次自增1为2,这就破坏了线程执行的条件,每次唤醒B、C、D线程都会导致i值不断增加,如果A线程获得执行权,输出的i值也在不断增加。

代码进行修改:

使用while代替if来做判断,保证每次都能判断条件。如果是两个线程,用notify()就能保证唤醒的线程能够执行。多个线程的话,如果每次只唤醒一个线程,唤醒的线程进入等待而无法执行notify()时,会导致线程无法相互唤醒,全部线程阻塞。所以要使用notifyAll()来唤醒所有线程,保证条件的有效执行。

//打印1的方法
public synchronized void printOne() throws InterruptedException {
//i不等于0,让当前线程等待
while(i != 0) {
this.wait();
}
//线程唤醒并执行i自增,和打印0方法的线程进行协调运行
i++;
System.out.println(Thread.currentThread().getName() + i + " ");
this.notifyAll();//唤醒任意一个等待线程
}
//打印0的方法
public synchronized void printZero() throws InterruptedException {
//因为要执行i自减1,所以i为0时,让当前线程等待
while(i == 0) {
this.wait();
}
//线程唤醒并执行i自减,和打印1方法的线程进行协调运行
i--;
System.out.println(Thread.currentThread().getName() + i + " ");
this.notifyAll();//唤醒任意一个等待线程
}


Lock/Condition机制

如果使用Lock来实现线程同步,java提供了Condition类来保持协调,使用Condition可以让哪些已经得到的Lock对象却无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他处于等待的线程。
Condition对象需要和一个Lock对象进行绑定:Lock对象的newCondition()可以获得一个Condition对象。
Condition对象的三个方法:

await():和wait()方法一样,会导致当前线程等待,只能使用signal()或者signalAll()方法才能唤醒。会释放对Lock对象的锁定。
signal():随机唤醒一个在Lock对象上等待的线程。只有Lock对象的锁被释放,唤醒的线程才能被执行。
signalAll():唤醒在Lock对象上等待的所有线程。只有Lock对象的锁被释放,唤醒的线程才能被执行。

Lock/Condition代码实例

完成26字母和26数字的轮番打印
public class LoopNumGragpheme {
private char a = 'A';
private int num = 1;
private boolean flag = true;
final Lock lock = new Reentr
b7cc
antLock();
final Condition con = lock.newCondition();
//打印字母方法
public void printGrapheme()  {
lock.lock(); //加锁
try {
//为false,等待获取lock对象锁的线程
while(!flag)
con.await();
//flag == true,就执行打印字母
System.out.print(a++ + " "); //打印完字母,然后自增
flag = false;
con.signalAll();
} catch (InterruptedException e) {
}finally {
lock.unlock();//释放锁
}
}
//打印数字方法
public void printNumbers()  {
lock.lock(); //加锁
try {
//为true,等待获取lock对象锁的线程
while(flag)
con.await();
//flag == fasle,执行打印数字
System.out.print(num++  + " "); //打印完数字,然后自增1
flag = true;
con.signalAll();
} catch (InterruptedException e) {
}finally {
lock.unlock();//释放锁
}
}
}
//打印26字母线程
class PrintThread1 implements Runnable{
private LoopNumGragpheme lng;
public PrintThread1(LoopNumGragpheme lng) {
this.lng = lng;
}
@Override
public void run() {
for(int i = 0; i < 26; i++)
lng.printGrapheme();
}
}
//打印1~26数字线程
class PrintThread2 implements Runnable{
private LoopNumGragpheme lng;
public PrintThread2(LoopNumGragpheme lng) {
this.lng = lng;
}
@Override
public void run() {
for(int i = 0; i < 26; i++)
lng.printNumbers();
}
}
public class TestDemo {
public static void main(String[] args) {
LoopNumGragpheme t = new LoopNumGragpheme();
PrintThread1 pt1 = new PrintThread1(t);
PrintThread2 pt2 = new PrintThread2(t);
new Thread(pt1).start();
new Thread(pt2).start();
new Thread(pt2).start();
new Thread(pt2).start();
}
}


管道流

管道流和IO流一样,是单向的,两个线程之间使用管道流通信需要管道输入流和输出流。
管道流是一对一传输数据,所以只能用于两个线程之间。线程超过两个以上,不建议使用管道流,应使用其他线程通信方式。
IO流:PipeInputStream、PipeOutputStream。
NIO流:Pipe.SinkChannel、Pipe.SourceChannel。

管道流代码实例

这里是使用IO流的管道流来完成线程通信,NIO流就不演示了,想了解NIO流点此:如何使用NIO流
public class PipeDemo{
public static void main(String[] args)  throws Exception  {
//创建管道输入流/输出流
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream();
pis.connect(pos);  //两个管道连接
Consumer cs = new Consumer(pis);
Producer pro = new Producer(pos);
new Thread(cs).start();
new Thread(pro).start();
//    Consumer cs1 = new Consumer(pis);
//    Producer pro1 = new Producer(pos);
//    new Thread(cs1).start();
//    new Thread(pro1).start();
//    管道已关闭,其他线程还在使用,导致出现数据读取异常
}
}
//消费者线程
class Consumer implements Runnable {
private PipedInputStream pis;
public Consumer(PipedInputStream pis)  {
this.pis = pis;
}
@Override
public void run() {
try{
byte[] buf = new byte[1024];
int len = 0;
System.out.println("--消费者抢到执行权--");
while((len = pis.read(buf)) != -1)//会进入阻塞状态
System.out.println("消费者收到数据:" + new String(buf,0,len));
pis.close();
}catch(Exception e){ e.printStackTrace(); }
}
}
//生产者线程
class Producer implements Runnable {
private PipedOutputStream pos;
public Producer(PipedOutputStream pos)  {
this.pos = pos;
}
public void run(){
System.out.println("--生产者抢到执行权--");
try {
System.out.println("延迟3s后,往管道中输出数据");
Thread.sleep(3000);
pos.write("输出数据:我是由生产者制造的数据".getBytes());
pos.close();
}catch(Exception e) {e.printStackTrace();}
}
}

实例解析:管道流的注意点在前面已经介绍了。至于使用方式的话,和常规IO流没啥区别,一个读取一个接收。只是管道流一般用在多线程的通信中。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐