您的位置:首页 > 编程语言 > Java开发

Java 多线程(五)——线程通信(共享内存、管道流、wait()、notify()等)

2016-03-16 22:13 609 查看

1 线程间通信方式

1.1 共享内存

  因为线程与父进程的其他线程共享该进程所拥有的全部资源。所以创建的线程本来就已经实现共享内存。但要注意的是,在操作共享资源时要注意实现同步机制,确保线程安全。

  例如,通过实现Runnable接口实现线程的共享变量:

package thread;

public class RunnableTest {
public static void main(String[] args) {
RunnableThread runnableThread = new RunnableThread();
new Thread(runnableThread).start();
new Thread(runnableThread).start();
new Thread(runnableThread).start();
}
}

class RunnableThread implements Runnable{
// 多个线程之间共享num
int num = 0;
@Override
synchronized public void run() {
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + ":" + num++);
}
}
}


也可以通过内部类的形式来共享变量:

package thread;

/**
* 通过内部类实现线程的共享变量
*/
public class InnerShareThread {
public static void main(String[] args) {
MyThread mythread = new MyThread();
mythread.getThread().start();
mythread.getThread().start();
mythread.getThread().start();
}
}
class MyThread {
int num = 0;
private class InnerThread extends Thread {
public synchronized void run() {
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName()
+ ":" + num++);
}
}
}
public Thread getThread() {
return new InnerThread();
}
}


1.2 管道流

  管道流过程:生产者向管道中输出数据,消费者从管道中读取数据。当然,生产者的管道输出要与消费者的管道输入进行连接。

package thread;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class CommunicateWhitPiping {
public static void main(String[] args) {
// 创建管道输出流
PipedOutputStream pipedOutputStream = new PipedOutputStream();
// 创建管道输入流
PipedInputStream pipedInputStream = new PipedInputStream();
try {
// 将管道输入流与输出流连接 此过程也可通过重载的构造函数来实现
pipedOutputStream.connect(pipedInputStream);
} catch (IOException e) {
e.printStackTrace();
}
// 创建生产者线程
Producer p = new Producer(pipedOutputStream);
// 创建消费者线程
Consumer c = new Consumer(pipedInputStream);
// 启动线程
p.start();
c.start();
}
}

/**
* 生产者线程(与一个管道输出流相关联)
*/
class Producer extends Thread {
private PipedOutputStream pos;

public Producer(PipedOutputStream pos) {
this.pos = pos;
}

public void run() {
int i = 8;
try {
pos.write(i);
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 消费者线程(与一个管道输入流相关联)
*/
class Consumer extends Thread {
private PipedInputStream pis;

public Consumer(PipedInputStream pis) {
this.pis = pis;
}

public void run() {
try {
System.out.println(pis.read());
} catch (IOException e) {
e.printStackTrace();
}
}
}


2 Java中实现线程通信的方法

2.1 wait()、notify()、notifyAll()

wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()或notifyAll()方法来唤醒该线程。wait()可以带毫秒参数。调用wait()方法的当前线程会释放对该同步监视器的锁定。

notify():唤醒在此同步监视器上等待的单个线程。唤醒哪个线程是任意性的。

notifyAll():唤醒在此同步监视器上等待的所有线程。

(1)wait()、notify()、notifyAll()方法属于Object类,不属于Thread类

(2)这个三个方法必须由同步监视器对象来调用:

  ①对于synchronized修饰的同步方法,同步监视器是该类默认的实例(this),所以可以直接在同步方法中调用这个三个方法。

  ②对于synchronized修饰的同步代码块,同步监视器是括号里的对象,所以需要使用该对象调用这个三个方法。

例子:

同一个账户,有两个存款者和两个取款者,存款者和取款者不断地重复存钱和取钱动作,但不允许存款者连续两次存钱和取款者连续两次取钱。

package thread;

/**
* Created by Zen9 on 2016/3/16.
*/
public class WaitAndNotifyTest {
public static void main(String[] args) {
//创建一个账户,开始余额为0
Account account = new Account(0);

new DrawThread("取款者A",account,800).start();
new DepositThread("存款者甲",account,800).start();
new DrawThread("取款者B",account,800).start();
new DepositThread("存款者乙",account,800).start();
}
}

//账户
class Account{
private double balance;
// 标识账户中是否已有存款
private boolean flag = false;
public Account(double balance){
this.balance = balance;
}

synchronized public void draw(double drawAmount){
try {
if (!flag){
// 当前线程等待,释放该同步监视器的锁定
wait();
}
else {
//执行取款操作
System.out.println(Thread.currentThread().getName() + " 取钱:" + drawAmount);
balance -= drawAmount;
System.out.println("账户余额为:" + balance);
// 账户中存款被取走,将标志flag设为false
flag = false;
// 唤醒其他线程
notifyAll();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}

synchronized public void deposit(double depositAmount){
try {
if (flag){
wait();
}
else {
//执行存款操作
System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount);
balance += depositAmount;
System.out.println("账户余额为:" + balance);
// 账户中已有存款,将标志flag设为true
flag = true;
// 唤醒其他线程
notifyAll();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}

}

//取钱线程
class DrawThread extends Thread{
// 账户
private Account account;
// 当前取钱线程希望去钱
private double drawAmount;
public DrawThread(String name,Account account,double drawAmount){
super(name);
this.account = account;
this.drawAmount = drawAmount;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.draw(drawAmount);
}
}
}

class DepositThread extends Thread{
private Account account;
private double depositAmount;
public DepositThread(String name,Account account,double depositAmount){
super(name);
this.account = account;
this.depositAmount = depositAmount;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.deposit(depositAmount);
}
}
}




程序最后阻塞,无法运行下去。因为当取款者或存款者其中一方的两者执行完200次循环后,剩下的另一方无法继续进行操作。

注意:该种情况的程序阻塞不是死锁!

2.2 使用Condeition控制

  当程序使用Lock对象(如:ReentrantLock对象)来保证同步时,Java提供了一个Condition类来保持协调。Condition实例被绑定在一个Lock对象上。

  在这种情况下,Lock替代了同步方法或同步代码块,Condition替代了同步监视器的功能。

改写上面Account类:

//账户
class Account{
private double balance;
// 标识账户中是否已有存款
private boolean flag = false;

private final Lock lock = new ReentrantLock();            /////////修改之处////////
private final Condition condition = lock.newCondition();  /////////修改之处////////

public Account(double balance){
this.balance = balance;
}

public void draw(double drawAmount){
lock.lock();       /////////修改之处////////
try {
if (!flag){
condition.await();      /////////修改之处////////
}
else {
//执行取款操作
System.out.println(Thread.currentThread().getName() + " 取钱:" + drawAmount);
balance -= drawAmount;
System.out.println("账户余额为:" + balance);
// 账户中存款被取走,将标志flag设为false
flag = false;
// 唤醒其他线程
condition.signalAll();  /////////修改之处////////
}
}catch (InterruptedException e){
e.printStackTrace();
}
}

public void deposit(double depositAmount){
lock.lock();       /////////修改之处////////
try {
if (flag){
condition.await();      /////////修改之处////////
}
else {
//执行存款操作
System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount);
balance += depositAmount;
System.out.println("账户余额为:" + balance);
// 账户中已有存款,将标志flag设为true
flag = true;
// 唤醒其他线程
condition.signalAll();  /////////修改之处////////
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}


对比两个Account类,此处只是显示地使用了Lock对象来充当同步监视器,则需要使用Condition对象来暂停、唤醒指定线程。

2.3 使用阻塞队列(BlockingQueue)

  BlockingQueue接口,是Queue的子接口,但它的主要作用不是作为容器,而是作为线程同步工具。

  BlockingQueue具有一个特性:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。

  BlockingQueue包含的方法:

抛出异常不同返回值阻塞线程指定超时时长
队尾插入元素add(e)offer(e)put(e)offer(e,time,unit)
队头删除元素remove()poll()take()poll(time,unit)
获取、不删除元素element()peek()
例子:

package thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* Created by Zen9 on 2016/3/16.
*/
public class BlockingQueueTest {
public static void main(String[] args) {
// 创建容量为1的BlockingQueue
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(1);
// 启动3个生产者线程
new Producer(blockingQueue).start();
new Producer(blockingQueue).start();
new Producer(blockingQueue).start();
// 启动1个消费者线程
new Consumer(blockingQueue).start();
}
}

class Producer extends Thread{
private BlockingQueue<String> blockingQueue;
public Producer(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}

@Override
public void run() {
String[] strArr = new String[]{"AAAA","BBBB","CCCC"};
for (int i = 0; i < 99999; i++) {
System.out.println(getName() + " 生产者准备生产集合元素!");
try {
Thread.sleep(200);
// 尝试放入元素,如果队列已满,则线程被阻塞
blockingQueue.put(strArr[i%3]);
}catch (Exception e){
e.printStackTrace();
}
System.out.println(getName() + "生产完成:" + blockingQueue);
}
}
}

class Consumer extends Thread{
private BlockingQueue<String> blockingQueue;
public Consumer(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}

@Override
public void run() {
while (true) {
System.out.println(getName() + " 消费者准备消费集合元素!");
try {
Thread.sleep(200);
// 尝试取出元素,如果队列已空,则线程被阻塞
blockingQueue.take();
}catch (Exception e){
e.printStackTrace();
}
System.out.println(getName() + "消费完成:" + blockingQueue);
}
}
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: