您的位置:首页 > 编程语言 > Java开发

JAVA线程间通信问题

2011-02-20 20:41 260 查看
问题

在前一小节,介绍了在多线程编程中使用同步机制的重要性,并学会了如何实现同步的方法来正确地访问共享资源。这些线程之间的关系是平等的,彼此之间
并不存在任何依赖,它们各自竞争CPU资源,互不相让,并且还无条件地阻止其他线程对共享资源的异步访问。然而,也有很多现实问题要求不仅要同步的访问同
一共享资源,而且线程间还彼此牵制,通过相互通信来向前推进。那么,多个线程之间是如何进行通信的呢?

解决思路

在现实应用中,很多时候都需要让多个线程按照一定的次序来访问共享资源,例如,经典的生产者和消费者问题。这类问题描述了这样一种情况,假设仓库中
只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中的产品取走消费。如果仓库中没有产品,则生产者可以将产品放入仓库,否则停止生产并等
待,直到仓库中的产品被消费者取走为止。如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止。显然,这是
一个同步问题,生产者和消费者共享同一资源,并且,生产者和消费者之间彼此依赖,互为条件向前推进。但是,该如何编写程序来解决这个问题呢?

传统的思路是利用循环检测的方式来实现,这种方式通过重复检查某一个特定条件是否成立来决定线程的推进顺序。比如,一旦生产者生产结束,它就继续利
用循环检测来判断仓库中的产品是否被消费者消费,而消费者也是在消费结束后就会立即使用循环检测的方式来判断仓库中是否又放进产品。显然,这些操作是很耗
费CPU资源的,不值得提倡。那么有没有更好的方法来解决这类问题呢?

首先,当线程在继续执行前需要等待一个条件方可继续执行时,仅有 synchronized
关键字是不够的。因为虽然synchronized关键字可以阻止并发更新同一个共享资源,实现了同步,但是它不能用来实现线程间的消息传递,也就是所谓
的通信。而在处理此类问题的时候又必须遵循一种原则,即:对于生产者,在生产者没有生产之前,要通知消费者等待;在生产者生产之后,马上又通知消费者消
费;对于消费者,在消费者消费之后,要通知生产者已经消费结束,需要继续生产新的产品以供消费。

其实,Java提供了3个非常重要的方法来巧妙地解决线程间的通信问题。这3个方法分别是:wait()、notify()和 notifyAll()。它们都是Object类的最终方法,因此每一个类都默认拥有它们。

虽然所有的类都默认拥有这3个方法,但是只有在synchronized关键字作用的范围内,并且是同一个同步问题中搭配使用这3个方法时才有实际的意义。

这些方法在Object类中声明的语法格式如下所示:

Java代码


final


void


wait()
throws


InterruptedException

final


void


notify()

final


void


notifyAll()

Java代码

final

void
wait()
throws
InterruptedException

final

void
notify()

final

void
notifyAll()

final void wait() throws InterruptedException

final void notify()

final void notifyAll()


其中,调用wait()方法可以使调用该方法的线程释放共享资源的锁,然后从运行态退出,进入等待队列,直到被再次唤醒。而调用notify()方
法可以唤醒等待队列中第一个等待同一共享资源的线程,并使该线程退出等待队列,进入可运行态。调用notifyAll()方法可以使所有正在等待队列中等
待同一共享资源的线程从等待状态退出,进入可运行状态,此时,优先级最高的那个线程最先执行。显然,利用这些方法就不必再循环检测共享资源的状态,而是在
需要的时候直接唤醒等待队列中的线程就可以了。这样不但节省了宝贵的CPU资源,也提高了程序的效率。

由于wait()方法在声明的时候被声明为抛出InterruptedException异常,因此,在调用wait()方法时,需要将它放入 try…catch代码块中。此外,使用该方法时还需要把它放到一个同步代码段中,否则会出现如下异常:

Java代码


"java.lang.IllegalMonitorStateException: current thread not owner"

Java代码

"java.lang.IllegalMonitorStateException: current thread not owner"

"java.lang.IllegalMonitorStateException: current thread not owner"


这些方法是不是就可以实现线程间的通信了呢?下面将通过多线程同步的模型: 生产者和消费者问题来说明怎样通过程序解决多线程间的通信问题。

具体步骤

下面这个程序演示了多个线程之间进行通信的具体实现过程。程序中用到了4个类,其中ShareData类用来定义共享数据和同步方法。在同步方法中调用了wait()方法和notify()方法,并通过一个信号量来实现线程间的消息传递。

Java代码


// 例4.6.1 CommunicationDemo.java 描述:生产者和消费者之间的消息传递过程

class


ShareData

{

private


char


c;

private


boolean


isProduced =
false


;
// 信号量

public


synchronized


void


putShareChar(
char


c)
// 同步方法putShareChar()

{

if


(isProduced)
// 如果产品还未消费,则生产者等待

{

try


{

wait();
// 生产者等待

}
catch


(InterruptedException e) {

e.printStackTrace();

}

}

this


.c = c;

isProduced =
true


;
// 标记已经生产

notify();
// 通知消费者已经生产,可以消费

}

public


synchronized


char


getShareChar()
// 同步方法getShareChar()

{

if


(!isProduced)
// 如果产品还未生产,则消费者等待

{

try


{

wait();
// 消费者等待

}
catch


(InterruptedException e) {

e.printStackTrace();

}

}

isProduced =
false


;
// 标记已经消费

notify();
// 通知需要生产

return


this


.c;

}

}

class


Producer
extends


Thread
// 生产者线程

{

private


ShareData s;

Producer(ShareData s)

{

this


.s = s;

}

public


void


run()

{

for


(
char


ch =
'A'

; ch <=
'D'

; ch++)

{

try


{

Thread.sleep((
int


) (Math.random() *
3000

));

}
catch


(InterruptedException e) {

e.printStackTrace();

}

s.putShareChar(ch);
// 将产品放入仓库

System.out.println(ch +
" is produced by Producer."

);

}

}

}

class


Consumer
extends


Thread
// 消费者线程

{

private


ShareData s;

Consumer(ShareData s)

{

this


.s = s;

}

public


void


run()

{

char


ch;

do


{

try


{

Thread.sleep((
int


) (Math.random() *
3000

));

}
catch


(InterruptedException e) {

e.printStackTrace();

}

ch = s.getShareChar();
// 从仓库中取出产品

System.out.println(ch +
" is consumed by Consumer. "

);

}
while


(ch !=
'D'

);

}

}

class


CommunicationDemo

{

public


static


void


main(String[] args)

{

ShareData s =
new


ShareData();

new


Consumer(s).start();

new


Producer(s).start();

}

}

Java代码

// 例4.6.1 CommunicationDemo.java 描述:生产者和消费者之间的消息传递过程

class
ShareData

{

private

char
c;

private

boolean
isProduced =
false
;
// 信号量

public

synchronized

void
putShareChar(
char
c)
// 同步方法putShareChar()

{

if
(isProduced)
// 如果产品还未消费,则生产者等待

{

try

{

wait(); // 生产者等待

} catch
(InterruptedException e) {

e.printStackTrace();

}

}

this
.c = c;

isProduced = true
;
// 标记已经生产

notify(); // 通知消费者已经生产,可以消费

}

public

synchronized

char
getShareChar()
// 同步方法getShareChar()

{

if
(!isProduced)
// 如果产品还未生产,则消费者等待

{

try

{

wait(); // 消费者等待

} catch
(InterruptedException e) {

e.printStackTrace();

}

}

isProduced = false
;
// 标记已经消费

notify(); // 通知需要生产

return

this
.c;

}

}

class
Producer
extends
Thread
// 生产者线程

{

private
ShareData s;

Producer(ShareData s)

{

this
.s = s;

}

public

void
run()

{

for
(
char
ch =
'A'
; ch <=
'D'
; ch++)

{

try

{

Thread.sleep((int
) (Math.random() *
3000
));

} catch
(InterruptedException e) {

e.printStackTrace();

}

s.putShareChar(ch); // 将产品放入仓库

System.out.println(ch + " is produced by Producer."
);

}

}

}

class
Consumer
extends
Thread
// 消费者线程

{

private
ShareData s;

Consumer(ShareData s)

{

this
.s = s;

}

public

void
run()

{

char
ch;

do
{

try

{

Thread.sleep((int
) (Math.random() *
3000
));

} catch
(InterruptedException e) {

e.printStackTrace();

}

ch = s.getShareChar(); // 从仓库中取出产品

System.out.println(ch + " is consumed by Consumer. "
);

} while
(ch !=
'D'
);

}

}

class
CommunicationDemo

{

public

static

void
main(String[] args)

{

ShareData s = new
ShareData();

new
Consumer(s).start();

new
Producer(s).start();

}

}

// 例4.6.1  CommunicationDemo.java 描述:生产者和消费者之间的消息传递过程

class ShareData

{
private char c;
private boolean isProduced = false; // 信号量
public synchronized void putShareChar(char c) // 同步方法putShareChar()
{
if (isProduced) // 如果产品还未消费,则生产者等待
{
try
{
wait(); // 生产者等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.c = c;
isProduced = true; // 标记已经生产
notify(); // 通知消费者已经生产,可以消费
}

public synchronized char getShareChar() // 同步方法getShareChar()
{
if (!isProduced) // 如果产品还未生产,则消费者等待
{
try
{
wait(); // 消费者等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isProduced = false; // 标记已经消费
notify(); // 通知需要生产
return this.c;
}
}

class Producer extends Thread // 生产者线程
{
private ShareData s;

Producer(ShareData s)
{
this.s = s;
}

public void run()
{
for (char ch = 'A'; ch <= 'D'; ch++)
{
try
{
Thread.sleep((int) (Math.random() * 3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
s.putShareChar(ch); // 将产品放入仓库
System.out.println(ch + " is produced by Producer.");
}
}
}

class Consumer extends Thread // 消费者线程
{
private ShareData s;

Consumer(ShareData s)
{
this.s = s;
}

public void run()
{
char ch;
do {
try
{
Thread.sleep((int) (Math.random() * 3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
ch = s.getShareChar(); // 从仓库中取出产品
System.out.println(ch + " is consumed by Consumer. ");
} while (ch != 'D');
}
}

class CommunicationDemo
{
public static void main(String[] args)
{
ShareData s = new ShareData();
new Consumer(s).start();
new Producer(s).start();
}
}


上面的程序演示了生产者生产出A、B、C、D四个字符,消费者消费这四个字符的全过程

通过程序的运行结果可以看到,尽管在主方法中先启动了Consumer线程,但是,由于仓库中没有产品,因此,Consumer线程就会调用
wait()方法进入等待队列进行等待,直到Producer线程将产品生产出来并放进仓库,然后使用notify()方法将其唤醒。

由于在两个线程中都指定了一定的休眠时间,因此也可能出现这样的情况:生产者将产品生产出来放入仓库,并通知等待队列中的Consumer线程,然
而,由于休眠时间过长,Consumer线程还没有打算消费产品,此时,Producer线程欲生产下一个产品,结果由于仓库中的产品没有被消费掉,故
Producer线程执行wait()方法进入等待队列等待,直到Consumer线程将仓库中的产品消费掉以后通过notify()方法去唤醒等待队列
中的Producer线程为止。可见,两个线程之间除了必须保持同步之外,还要通过相互通信才能继续向前推进。

前面这个程序中,生产者一次只能生产一个产品,而消费者也只能一次消费一个产品。那么现实中也有这样的情况,生产者可以一次生产多个产品,只要仓库容量够大,就可以一直生产。而消费者也可以一次消费多个产品,直到仓库中没有产品为止。

但是,无论是生产产品到仓库,还是从仓库中消费,每一次都只能允许一个操作。显然,这也是个同步问题,只不过在这个问题中共享资源是一个资源池,可以存放多个资源。下面就以栈结构为例给出如何在这个问题中解决线程通信的程序代码。

Java代码


// 例4.6.2 CommunicationDemo2.java

class


SyncStack
// 同步堆栈类,可以一次放入多个数据

{

private


int


index =
0

;
// 堆栈指针初始值为0

private


char


[] buffer =
new


char


[
5

];
// 堆栈有5个字符的空间

public


synchronized


void


push(
char


c)
// 入栈同步方法

{

if


(index == buffer.length)
// 堆栈已满,不能入栈

{

try


{

this


.wait();
// 等待出栈线程将数据出栈

}
catch


(InterruptedException e) {

}

}

buffer[index] = c;
// 数据入栈

index++;
// 指针加1,栈内空间减少

this


.notify();
// 通知其他线程把数据出栈

}

public


synchronized


char


pop()
// 出栈同步方法

{

if


(index ==
0

)
// 堆栈无数据,不能出栈

{

try


{

this


.wait();
// 等待入栈线程把数据入栈

}
catch


(InterruptedException e) {

}

}

this


.notify();
// 通知其他线程入栈

index--;
// 指针向下移动

return


buffer[index];
// 数据出栈

}

}

class


Producer
implements


Runnable
// 生产者类

{

SyncStack s;
// 生产者类生成的字母都保存到同步堆栈中

public


Producer(SyncStack s)

{

this


.s = s;

}

public


void


run()

{

char


ch;

for


(
int


i =
0

; i <
5

; i++)

{

try


{

Thread.sleep((
int


) (Math.random() *
1000

));

}
catch


(InterruptedException e) {

}

ch = (
char


) (Math.random() *
26

+
'A'

);
// 随机产生5个字符

s.push(ch);
// 把字符入栈

System.out.println(
"Push "

+ ch +
" in Stack"

);
// 打印字符入栈

}

}

}

class


Consumer
implements


Runnable
// 消费者类

{

SyncStack s;
// 消费者类获得的字符都来自同步堆栈

public


Consumer(SyncStack s)

{

this


.s = s;

}

public


void


run()

{

char


ch;

for


(
int


i =
0

; i <
5

; i++)

{

try


{

Thread.sleep((
int


) (Math.random() *
3000

));

}
catch


(InterruptedException e) {

}

ch = s.pop();
// 从堆栈中读取字符

System.out.println(
"Pop "

+ ch +
" from Stack"

);
// 打印字符出栈

}

}

}

public


class


CommunicationDemo2

{

public


static


void


main(String[] args)

{

SyncStack stack =
new


SyncStack();

// 下面的消费者类对象和生产者类对象所操作的是同一个同步堆栈对象

Thread t1 =
new


Thread(
new


Producer(stack));
// 线程实例化

Thread t2 =
new


Thread(
new


Consumer(stack));
// 线程实例化

t2.start();
// 线程启动

t1.start();
// 线程启动

}

}

Java代码

// 例4.6.2 CommunicationDemo2.java

class
SyncStack
// 同步堆栈类,可以一次放入多个数据

{

private

int
index =
0
;
// 堆栈指针初始值为0

private

char
[] buffer =
new

char
[
5
];
// 堆栈有5个字符的空间

public

synchronized

void
push(
char
c)
// 入栈同步方法

{

if
(index == buffer.length)
// 堆栈已满,不能入栈

{

try

{

this
.wait();
// 等待出栈线程将数据出栈

} catch
(InterruptedException e) {

}

}

buffer[index] = c; // 数据入栈

index++; // 指针加1,栈内空间减少

this
.notify();
// 通知其他线程把数据出栈

}

public

synchronized

char
pop()
// 出栈同步方法

{

if
(index ==
0
)
// 堆栈无数据,不能出栈

{

try

{

this
.wait();
// 等待入栈线程把数据入栈

} catch
(InterruptedException e) {

}

}

this
.notify();
// 通知其他线程入栈

index--; // 指针向下移动

return
buffer[index];
// 数据出栈

}

}

class
Producer
implements
Runnable
// 生产者类

{

SyncStack s; // 生产者类生成的字母都保存到同步堆栈中

public
Producer(SyncStack s)

{

this
.s = s;

}

public

void
run()

{

char
ch;

for
(
int
i =
0
; i <
5
; i++)

{

try

{

Thread.sleep((int
) (Math.random() *
1000
));

} catch
(InterruptedException e) {

}

ch = (char
) (Math.random() *
26
+
'A'
);
// 随机产生5个字符

s.push(ch); // 把字符入栈

System.out.println("Push "
+ ch +
" in Stack"
);
// 打印字符入栈

}

}

}

class
Consumer
implements
Runnable
// 消费者类

{

SyncStack s; // 消费者类获得的字符都来自同步堆栈

public
Consumer(SyncStack s)

{

this
.s = s;

}

public

void
run()

{

char
ch;

for
(
int
i =
0
; i <
5
; i++)

{

try

{

Thread.sleep((int
) (Math.random() *
3000
));

} catch
(InterruptedException e) {

}

ch = s.pop(); // 从堆栈中读取字符

System.out.println("Pop "
+ ch +
" from Stack"
);
// 打印字符出栈

}

}

}

public

class
CommunicationDemo2

{

public

static

void
main(String[] args)

{

SyncStack stack = new
SyncStack();

// 下面的消费者类对象和生产者类对象所操作的是同一个同步堆栈对象

Thread t1 = new
Thread(
new
Producer(stack));
// 线程实例化

Thread t2 = new
Thread(
new
Consumer(stack));
// 线程实例化

t2.start(); // 线程启动

t1.start(); // 线程启动

}

}

// 例4.6.2  CommunicationDemo2.java

class SyncStack // 同步堆栈类,可以一次放入多个数据
{
private int index = 0; // 堆栈指针初始值为0
private char[] buffer = new char[5]; // 堆栈有5个字符的空间
public synchronized void push(char c) // 入栈同步方法
{
if (index == buffer.length) // 堆栈已满,不能入栈
{
try
{
this.wait(); // 等待出栈线程将数据出栈
} catch (InterruptedException e) {
}
}
buffer[index] = c; // 数据入栈
index++; // 指针加1,栈内空间减少
this.notify(); // 通知其他线程把数据出栈
}

public synchronized char pop() // 出栈同步方法
{
if (index == 0) // 堆栈无数据,不能出栈
{
try
{
this.wait(); // 等待入栈线程把数据入栈
} catch (InterruptedException e) {
}
}
this.notify(); // 通知其他线程入栈
index--; // 指针向下移动
return buffer[index]; // 数据出栈
}
}

class Producer implements Runnable // 生产者类
{
SyncStack s; // 生产者类生成的字母都保存到同步堆栈中
public Producer(SyncStack s)
{
this.s = s;
}

public void run()
{
char ch;
for (int i = 0; i < 5; i++)
{
try
{
Thread.sleep((int) (Math.random() * 1000));
} catch (InterruptedException e) {
}
ch = (char) (Math.random() * 26 + 'A'); // 随机产生5个字符
s.push(ch); // 把字符入栈
System.out.println("Push " + ch + " in Stack"); // 打印字符入栈
}
}
}

class Consumer implements Runnable // 消费者类
{
SyncStack s; // 消费者类获得的字符都来自同步堆栈
public Consumer(SyncStack s)
{
this.s = s;
}

public void run()
{
char ch;
for (int i = 0; i < 5; i++)
{
try
{
Thread.sleep((int) (Math.random() * 3000));
} catch (InterruptedException e) {
}
ch = s.pop(); // 从堆栈中读取字符
System.out.println("Pop  " + ch + " from Stack"); // 打印字符出栈
}
}
}

public class CommunicationDemo2
{
public static void main(String[] args)
{
SyncStack stack = new SyncStack();
// 下面的消费者类对象和生产者类对象所操作的是同一个同步堆栈对象
Thread t1 = new Thread(new Producer(stack)); // 线程实例化
Thread t2 = new Thread(new Consumer(stack)); // 线程实例化
t2.start(); // 线程启动
t1.start(); // 线程启动
}
}


程序中引入了一个堆栈数组buffer[]来模拟资源池,并使生产者类和消费者类都实现了Runnable接口,然后在主程序中通过前面介绍的方法
创建两个共享同一堆栈资源的线程,并且有意先启动消费者线程,后启动生产者线程。请在阅读程序的时候仔细观察例4.6.1和本例的相似点以及区别之处,体
会作者的用心。

由于是栈结构,所以符合后进先出原则。有兴趣的读者还可以用符合先进先出原则的队列结构来模拟线程间通信的过程,相信可以通过查阅相关的资料来解决这个问题,在这里就不再给出程序代码了,作为一个思考题供读者练习。

专家说明

本小节介绍了三个重要的方法:wait()、notify()和notifyAll()。使用它们可以高效率地完成多个线程间的通信问题,这样在通
信问题上就不必再使用循环检测的方法来等待某个条件的发生,因为这种方法是极为浪费CPU资源的,当然这种情况也不是所期望的。在例4.6.1中,为了更
好地通信,引入了一个专门用来传递信息的信号量。利用信号量来决定线程是否等待无疑是一种非常安全的操作,值得提倡。此外,在例4.6.2中引入了资源池
作为共享资源,并解决了在这种情况下如何实现多线程之间的通信问题。希望读者能够举一反三,编写出解决更加复杂问题的程序。

专家指点

可以肯定的是,合理地使用wait()、notify()和notifyAll()方法确实能够很好地解决线程间通信的问题。但是,也应该了解到这
些方法是更复杂的锁定、排队和并发性代码的构件。尤其是使用
notify()来代替notifyAll()时是有风险的。除非确实知道每一个线程正在做什么,否则最好使用notifyAll()。其实,在
JDK1.5中已经引入了一个新的包:java.util.concurrent
包,该包是一个被广泛使用的开放源码工具箱,里面都是有用的并发性实用程序。完全可以代替wait()和notify()方法用来编写自己的调度程序和
锁。有关信息可以查阅相关资料,本书中不再赘述。

相关问题

Java提供了各种各样的输入输出流(stream),使程序员能够很方便地对数据进行操作。其中,管道(pipe)流是一种特殊的流,用于在不同
线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道中读出数据。通过使用管道,达到实现多个线程间通信的目的。那么,如何创建和使用
管道呢?

Java提供了两个特殊的专门用来处理管道的类,它们就是PipedInputStream类和PipedOutputStream类。

其中,PipedInputStream代表了数据在管道中的输出端,也就是线程从管道读出数据的一端;PipedOutputStream代表了数据在管道中的输入端,也就是线程向管道写入数据的一端,这两个类一起使用就可以创建出数据输入输出的管道流对象。

一旦创建了管道之后,就可以利用多线程的通信机制对磁盘中的文件通过管道进行数据的读写,从而使多线程的程序设计在实际应用中发挥更大的作用。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: