您的位置:首页 > 编程语言 > Java开发

《Java 7 并发编程指南》学习概要 (3)Semaphore, CountDownLatch, CyclicBarrier , Phaser, Exchanger

2014-09-21 15:16 916 查看
1、Semaphore 信号量

Semaphore(信号量)是一个控制访问多个共享资源的计数器。

当一个线程想要访问某个共享资源,首先,它必须获得semaphore。如果semaphore的内部计数器的值大于0,那么semaphore减少计数器的值并允许访问共享的资源。计数器的值大于0表示,有可以自由使用的资源,所以线程可以访问并使用它们。

另一种情况,如果semaphore的计数器的值等于0,那么semaphore让线程进入休眠状态一直到计数器大于0。计数器的值等于0表示全部的共享资源都正被线程们使用,所以此线程想要访问就必须等到某个资源成为自由的。

当线程使用完共享资源时,他必须放出semaphore为了让其他线程可以访问共享资源。这个操作会增加semaphore的内部计数器的值。
例子1:
用Semaphore类来实现一种比较特殊的semaphores种类,称为binary semaphores。这个semaphores种类保护访问共享资源的独特性,所以semaphore的内部计数器的值只能是1或者0。

public class PrintQueue {
private final Semaphore semaphore;
public PrintQueue() {
semaphore = new Semaphore(1);
}
public void printJob(Object document) {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+": Going to print a job" );
Thread.sleep((long) (new Random().nextInt(5))*1000);
System.out.println(Thread.currentThread().getName()+": " + document);
System.out.println(Thread.currentThread().getName()+": The document has been printed" );
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}

public static void main(String args[]) {
PrintQueue printQueue = new PrintQueue();
Thread thread[] = new Thread[10];
for (int i = 0; i < 10; i++) {
thread[i] = new Thread(new Job(printQueue), "Thread" + i);
}
for (int i = 0; i < 10; i++) {
thread[i].start();
}
}
}

class Job implements Runnable {
private PrintQueue printQueue;
public Job(PrintQueue printQueue) {
this.printQueue = printQueue;
}
@Override
public void run() {
printQueue.printJob(new Object());
}
}
输出:

run:

Thread0: Going to print a job

Thread0: java.lang.Object@3485def8

Thread0: The document has been printed

Thread1: Going to print a job

Thread1: java.lang.Object@30c3bb57

Thread1: The document has been printed

Thread2: Going to print a job

Thread2: java.lang.Object@24065c4

Thread2: The document has been printed

Thread3: Going to print a job

Thread3: java.lang.Object@51d92803

Thread3: The document has been printed

Thread4: Going to print a job

Thread4: java.lang.Object@7d206f0

Thread4: The document has been printed

Thread5: Going to print a job

Thread5: java.lang.Object@6dc57a92

Thread5: The document has been printed

Thread6: Going to print a job

Thread6: java.lang.Object@3ff23f8b

Thread6: The document has been printed

Thread7: Going to print a job

Thread7: java.lang.Object@3929df79

Thread7: The document has been printed

Thread8: Going to print a job

Thread8: java.lang.Object@6c0e9e40

Thread8: The document has been printed

Thread9: Going to print a job

Thread9: java.lang.Object@33b7b32c

Thread9: The document has been printed

BUILD SUCCESSFUL (total time: 21 seconds)

Semaphore的构造参数sync设为1,此时semaphore.acquire()、semaphore.release() 与 synchronized 有相同效果。

其他备注:

1、Semaphore 的 acquireUninterruptibly() 方法忽略中断

2、Semaphore 的 tryAcquire() 方法,不能获得即返回

3、Semaphore 的参数fair默认非公平,若要公平可使用 Semaphore(int permits, boolean fair)

例子2:

semaphores也可以用来保护多个资源的副本,也就是说当你有一个代码片段每次可以被多个线程执行。

public class PrintQueue3 {

private boolean freePrinters[];
private Lock lockPrinters;
private final Semaphore semaphore;

public PrintQueue3() {
semaphore = new Semaphore(3);
freePrinters = new boolean[]{true, true, true};
lockPrinters = new ReentrantLock();
}

public void printJob(Object document) {
try {
semaphore.acquire();
int assignedPrinter = getPrinter();
System.out.println(Thread.currentThread().getName() + ": Going to print a job");
Thread.sleep((long) (new Random().nextInt(5)) * 1000);
System.out.println(Thread.currentThread().getName() +  ": " +" -- " + assignedPrinter + " -- " +document);
System.out.println(Thread.currentThread().getName() + ": The document has been printed");
freePrinters[assignedPrinter] = true;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}

private int getPrinter() {
int ret = -1;
try {
lockPrinters.lock();
for (int i = 0; i < freePrinters.length; i++) {
if (freePrinters[i]) {
ret = i;
freePrinters[i] = false;
break;
}
}
} finally {
lockPrinters.unlock();
}
return ret;
}

public static void main(String args[]) {

PrintQueue3 printQueue3 = new PrintQueue3();
Thread thread[] = new Thread[10];
for (int i = 0; i < 10; i++) {
thread[i] = new Thread(new Job3(printQueue3), "Thread" + i);
}
for (int i = 0; i < 10; i++) {
thread[i].start();
}
}
}
class Job3 implements Runnable {

private PrintQueue3 printQueue3;

public Job3(PrintQueue3 printQueue3) {
this.printQueue3 = printQueue3;
}

@Override
public void run() {
printQueue3.printJob(new Object());
}
}




打印:

Thread0: Going to print a job

Thread2: Going to print a job

Thread1: Going to print a job

Thread2: -- 2 -- java.lang.Object@3929df79

Thread2: The document has been printed

Thread3: Going to print a job

Thread3: -- 2 -- java.lang.Object@6c0e9e40

Thread3: The document has been printed

Thread7: Going to print a job

Thread1: -- 1 -- java.lang.Object@33b7b32c

Thread1: The document has been printed

Thread8: Going to print a job

Thread8: -- 1 -- java.lang.Object@6154283a

Thread8: The document has been printed

Thread4: Going to print a job

Thread7: -- 2 -- java.lang.Object@5c1d29c1

Thread0: -- 0 -- java.lang.Object@7ea06d25

Thread0: The document has been printed

Thread6: Going to print a job

Thread7: The document has been printed

Thread5: Going to print a job

Thread4: -- 1 -- java.lang.Object@2b571dff

Thread6: -- 0 -- java.lang.Object@64726693

Thread6: The document has been printed

Thread4: The document has been printed

Thread9: Going to print a job

Thread5: -- 2 -- java.lang.Object@12ac706a

Thread5: The document has been printed

Thread9: -- 0 -- java.lang.Object@770848b9

Thread9: The document has been printed

2、CountDownLatch 倒时计数闩
CountDownLatch 允许1个或者多个线程一直等待,直到一组操作执行完成。

CountDownLatch 初始一个整数值,此值是线程将要等待的操作数。

await():某个线程为了想要执行这些操作而等待时,让线程进入休眠直到操作完成。

countDown():当某个操作结束,减少CountDownLatch类的内部计数器。当计数器到达0时,这个类会唤醒全部使用await() 方法休眠的线程们。

await(long time, TimeUnit unit): 此方法会休眠直到被中断

public class CountDownLatchTest {

public static void main(String[] args) {

Videoconference conference = new Videoconference(10);
Thread threadConference = new Thread(conference);
threadConference.start();
for (int i = 0; i < 10; i++) {
Participant p = new Participant(conference, " Participant" + i);
Thread t = new Thread(p);
t.start();
}
}
}

class Videoconference implements Runnable {

private final CountDownLatch controller;

public Videoconference(int number) {
controller = new CountDownLatch(number);
}

public void arrive(String name) {
System.out.println(name + "  has arrived");
controller.countDown();
System.out.println("Waiting for  " + controller.getCount() + "   participants. ");
}

@Override
public void run() {
System.out.println("Initialization:" + controller.getCount() + " participants");
try {
controller.await();
System.out.println("All   the participants have come");
System.out.println(" Let 's start...");

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Participant implements Runnable {

private Videoconference conference;
private String name;

public Participant(Videoconference conference, String name) {
this.conference = conference;
this.name = name;
}

@Override
public void run() {
long duration = (long) (Math.random() * 10);
try {
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
conference.arrive(name);
}
}


打印:

Initialization:10 participants

Participant6 has arrived

Waiting for 9 participants.

Participant4 has arrived

Waiting for 8 participants.

Participant0 has arrived

Waiting for 7 participants.

Participant9 has arrived

Waiting for 6 participants.

Participant3 has arrived

Waiting for 5 participants.

Participant7 has arrived

Waiting for 4 participants.

Participant2 has arrived

Waiting for 3 participants.

Participant5 has arrived

Waiting for 2 participants.

Participant1 has arrived

Participant8 has arrived

Waiting for 1 participants.

Waiting for 0 participants.

All the participants have come

Let 's start...

CountDownLatch类有3个基本元素:

初始值决定CountDownLatch类需要等待的事件的数量。
await() 方法, 被等待全部事件终结的线程调用。
countDown() 方法,事件在结束执行后调用

CountDownLatch 机制不是用来保护共享资源或者临界区。它是用来同步一个或者多个执行多个任务的线程。它只能使用一次。像之前解说的,一旦CountDownLatch的计数器到达0,任何对它的方法的调用都是无效的。如果你想再次同步,你必须创建新的对象。

3、CyclicBarrier 循环屏障

CyclicBarrier 类有一个整数初始值,此值表示将在同一点同步的线程数量。

当其中一个线程到达确定点,它会调用await() 方法来等待其他线程。当线程调用这个方法,CyclicBarrier阻塞线程进入休眠直到其他线程到达。

当最后一个线程调用CyclicBarrier 类的await() 方法,它唤醒所有等待的线程并继续执行它们的任务。

CyclicBarrier 类与CountDownLatch有一些共同点,但是也有一些不同。

最主要的不同是,CyclicBarrier对象可以重置到它的初始状态,重新分配新的值给内部计数器,即使它已经被初始过了。而CoiuntDownLatch不能重置。

public class CyclicBarrierTest {

public static void main(String[] args) {

final int ROWS = 10000;
final int NUMBERS = 1000;
final int SEARCH = 5;
final int PARTICIPANTS = 5;
final int LINES_PARTICIPANT = 2000;

MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
Results results = new Results(ROWS);
Grouper grouper = new Grouper(results);
CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);
Searcher searchers[] = new Searcher[PARTICIPANTS];
for (int i = 0; i < PARTICIPANTS; i++) {
searchers[i] = new Searcher(i * LINES_PARTICIPANT, (i * LINES_PARTICIPANT) + LINES_PARTICIPANT, mock, results, 5, barrier);
Thread thread = new Thread(searchers[i]);
thread.start();
}
System.out.printf("Main: The main thread has finished.\n");
}
}

class MatrixMock {

private int data[][];

public MatrixMock(int size, int length, int number) {
int counter = 0;
data = new int[size][length];
Random random = new Random();

for (int i = 0; i < size; i++) {
for (int j = 0; j < length; j++) {
data[i][j] = random.nextInt(10);
if (data[i][j] == number) {
counter++;
}
}
}
System.out.printf("Mock: There are %d ocurrences of number in generated data.\n", counter, number); //译者注:把字符串里的number改为%d.
}

public int[] getRow(int row) {
if ((row >= 0) && (row < data.length)) {
return data[row];
}
return null;
}
}

class Results {

private int data[];

public Results(int size) {
data = new int[size];
}

public void setData(int position, int value) {
data[position] = value;
}

public int[] getData() {
return data;
}
}

class Searcher implements Runnable {

private int firstRow;
private int lastRow;
private MatrixMock mock;
private Results results;
private int number;
private final CyclicBarrier barrier;

public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier) {
this.firstRow = firstRow;
this.lastRow = lastRow;
this.mock = mock;
this.results = results;
this.number = number;
this.barrier = barrier;
}

@Override
public void run() {
int counter;

System.out.printf("%s: Processing lines from %d to %d.\n", Thread.currentThread().getName(), firstRow, lastRow);

for (int i = firstRow; i < lastRow; i++) {
int row[] = mock.getRow(i);
counter = 0;
for (int j = 0; j < row.length; j++) {
if (row[j] == number) {
counter++;
}
}

results.setData(i, counter);
}

System.out.printf("%s: Lines processed.\n", Thread.currentThread().getName());

try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}

class Grouper implements Runnable {

private Results results;

public Grouper(Results results) {
this.results = results;
}

@Override
public void run() {

int finalResult = 0;
System.out.printf("Grouper: Processing results...\n");
int data[] = results.getData();
for (int number : data) {
finalResult += number;
}
System.out.printf("Grouper: Total result: %d.\n", finalResult);
}
}


输出:

Mock: There are 1001577 ocurrences of number in generated data.

Thread-0: Processing lines from 0 to 2000.

Thread-3: Processing lines from 6000 to 8000.

Thread-2: Processing lines from 4000 to 6000.

Thread-1: Processing lines from 2000 to 4000.

Thread-4: Processing lines from 8000 to 10000.

Main: The main thread has finished.

Thread-3: Lines processed.

Thread-2: Lines processed.

Thread-4: Lines processed.

Thread-0: Lines processed.

Thread-1: Lines processed.

Grouper: Processing results...

Grouper: Total result: 1001577.

BUILD SUCCESSFUL (total time: 0 seconds)

3、Phaser

运行阶段性的并发任务。当某些并发任务是分成多个步骤来执行时,那么此机制是非常有用的。Phaser类提供的机制是在每个步骤的结尾同步线程,所以除非全部线程完成第一个步骤,否则线程不能开始进行第二步。

class FileSearch implements Runnable {

private String initPath;
private String end;
private List<String> results;
private Phaser phaser;

public FileSearch(String initPath, String end, Phaser phaser) {
this.initPath = initPath;
this.end = end;
this.phaser = phaser;
results = new ArrayList<>();
}

private void directoryProcess(File file) {

File list[] = file.listFiles();
if (list != null) {
for (int i = 0; i < list.length; i++) {

if (list[i].isDirectory()) {
directoryProcess(list[i]);
} else {
fileProcess(list[i]);
}
}
}
}

private void fileProcess(File file) {
if (file.getName().endsWith(end)) {
results.add(file.getAbsolutePath());
}
}

private void filterResults() {
List<String> newResults = new ArrayList<>();
long actualDate = new Date().getTime();

for (int i = 0; i < results.size(); i++) {
File file = new File(results.get(i));
long fileDate = file.lastModified();

if (actualDate - fileDate < TimeUnit.MILLISECONDS.convert(1,
TimeUnit.DAYS)) {
newResults.add(results.get(i));
}
}

results = newResults;
}

private boolean checkResults() {

if (results.isEmpty()) {
System.out.printf("%s: Phase %d: 0 results.\n", Thread
.currentThread().getName(), phaser.getPhase());
System.out.printf("%s: Phase %d: End.\n", Thread.currentThread()
.getName(), phaser.getPhase());
phaser.arriveAndDeregister();
return false;

} else {
System.out.printf("%s: Phase %d: %d results.\n", Thread
.currentThread().getName(), phaser.getPhase(), results
.size());
phaser.arriveAndAwaitAdvance();
return true;
}
}

private void showInfo() {
for (int i = 0; i < results.size(); i++) {
File file = new File(results.get(i));
System.out.printf("%s: %s\n", Thread.currentThread().getName(),
file.getAbsolutePath());
}
phaser.arriveAndAwaitAdvance();
}

@Override
public void run() {

phaser.arriveAndAwaitAdvance();

System.out.printf("%s: Starting.\n", Thread.currentThread().getName());

File file = new File(initPath);
if (file.isDirectory()) {
directoryProcess(file);
}

if (!checkResults()) {
return;
}

filterResults();

if (!checkResults()) {
return;
}

showInfo();
phaser.arriveAndDeregister();
System.out.printf("%s: Work completed.\n", Thread.currentThread()
.getName());

}
}

class PhaserTest {

public static void main(String[] args) {

Phaser phaser = new Phaser(3);

FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
FileSearch documents = new FileSearch("C:\\Documents And Settings",
"log", phaser);

Thread systemThread = new Thread(system, "System");
systemThread.start();

Thread appsThread = new Thread(apps, "Apps");
appsThread.start();

Thread documentsThread = new Thread(documents, "Documents");
documentsThread.start();

try {
systemThread.join();
appsThread.join();
documentsThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Terminated: " + phaser.isTerminated());
}
}


输出:

Apps: Starting.

System: Starting.

Documents: Starting.

Documents: Phase 1: 0 results.

Documents: Phase 1: End.

Apps: Phase 1: 4 results.

System: Phase 1: 40 results.

Apps: Phase 2: 0 results.

Apps: Phase 2: End.

System: Phase 2: 7 results.

System: C:\Windows\inf\setupapi.app.log

System: C:\Windows\inf\setupapi.dev.log

System: C:\Windows\ServiceProfiles\NetworkService\AppData\Local\Temp\MpCmdRun.log

System: C:\Windows\setupact.log

System: C:\Windows\Temp\MpCmdRun.log

System: C:\Windows\Temp\vminst.log

System: C:\Windows\WindowsUpdate.log

System: Work completed.

Terminated: true

例子2

public class PhaserTest2 {

public static void main(String[] args) {

MyPhaser phaser = new MyPhaser();

Student students[] = new Student[5];
for (int i = 0; i < students.length; i++) {
students[i] = new Student(phaser);
phaser.register();
}

Thread threads[] = new Thread[students.length];
for (int i = 0; i < students.length; i++) {
threads[i] = new Thread(students[i], "Student " + i);
threads[i].start();
}

for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("Main: The phaser has finished: %s.\n",
phaser.isTerminated());
}
}

class MyPhaser extends Phaser {

@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
return studentsArrived();
case 1:
return finishFirstExercise();
case 2:
return finishSecondExercise();
case 3:
return finishExam();
default:
return true;
}
}

private boolean studentsArrived() {
System.out.printf("Phaser: The exam are going to start. The students are ready.\n");
System.out.printf("Phaser: We have %d students.\n",
getRegisteredParties());
return false;
}

private boolean finishFirstExercise() {
System.out.printf("Phaser: All the students have finished the first exercise.\n");
System.out.printf("Phaser: It's time for the second one.\n");
return false;
}

private boolean finishSecondExercise() {
System.out.printf("Phaser: All the students have finished the second exercise.\n");
System.out.printf("Phaser: It's time for the third one.\n");
return false;
}

private boolean finishExam() {
System.out.printf("Phaser: All the students have finished the exam.\n");
System.out.printf("Phaser: Thank you for your time.\n");
return true;
}
}

class Student implements Runnable {

private Phaser phaser;

public Student(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {

System.out.printf("%s: Has arrived to do the exam. %s\n", Thread
.currentThread().getName(), new Date());
phaser.arriveAndAwaitAdvance();

System.out.printf("%s: Is going to do the first exercise. %s\n",
Thread.currentThread().getName(), new Date());
doExercise1();
System.out.printf("%s: Has done the first exercise. %s\n", Thread
.currentThread().getName(), new Date());
phaser.arriveAndAwaitAdvance();

System.out.printf("%s: Is going to do the second exercise.%s\n",
Thread.currentThread().getName(), new Date());
doExercise2();
System.out.printf("%s: Has done the second exercise. %s\n", Thread
.currentThread().getName(), new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the third exercise. %s\n",
Thread.currentThread().getName(), new Date());
doExercise3();
System.out.printf("%s: Has finished the exam. %s\n", Thread
.currentThread().getName(), new Date());
phaser.arriveAndAwaitAdvance();
}

private void doExercise1() {
try {
long duration = (long) (Math.random() * 10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void doExercise2() {
try {
long duration = (long) (Math.random() * 10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void doExercise3() {
try {
long duration = (long) (Math.random() * 10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


输出:

Student 2: Has arrived to do the exam. Sun Sep 21 16:52:37 CST 2014

Student 4: Has arrived to do the exam. Sun Sep 21 16:52:37 CST 2014

Student 0: Has arrived to do the exam. Sun Sep 21 16:52:37 CST 2014

Student 3: Has arrived to do the exam. Sun Sep 21 16:52:37 CST 2014

Student 1: Has arrived to do the exam. Sun Sep 21 16:52:37 CST 2014

Phaser: The exam are going to start. The students are ready.

Phaser: We have 5 students.

Student 3: Is going to do the first exercise. Sun Sep 21 16:52:37 CST 2014

Student 4: Is going to do the first exercise. Sun Sep 21 16:52:37 CST 2014

Student 0: Is going to do the first exercise. Sun Sep 21 16:52:37 CST 2014

Student 2: Is going to do the first exercise. Sun Sep 21 16:52:37 CST 2014

Student 1: Is going to do the first exercise. Sun Sep 21 16:52:37 CST 2014

Student 4: Has done the first exercise. Sun Sep 21 16:52:39 CST 2014

Student 0: Has done the first exercise. Sun Sep 21 16:52:39 CST 2014

Student 3: Has done the first exercise. Sun Sep 21 16:52:43 CST 2014

Student 1: Has done the first exercise. Sun Sep 21 16:52:43 CST 2014

Student 2: Has done the first exercise. Sun Sep 21 16:52:46 CST 2014

Phaser: All the students have finished the first exercise.

Phaser: It's time for the second one.

Student 0: Is going to do the second exercise.Sun Sep 21 16:52:46 CST 2014

Student 4: Is going to do the second exercise.Sun Sep 21 16:52:46 CST 2014

Student 2: Is going to do the second exercise.Sun Sep 21 16:52:46 CST 2014

Student 1: Is going to do the second exercise.Sun Sep 21 16:52:46 CST 2014

Student 3: Is going to do the second exercise.Sun Sep 21 16:52:46 CST 2014

Student 4: Has done the second exercise. Sun Sep 21 16:52:51 CST 2014

Student 0: Has done the second exercise. Sun Sep 21 16:52:52 CST 2014

Student 1: Has done the second exercise. Sun Sep 21 16:52:52 CST 2014

Student 2: Has done the second exercise. Sun Sep 21 16:52:55 CST 2014

Student 3: Has done the second exercise. Sun Sep 21 16:52:55 CST 2014

Phaser: All the students have finished the second exercise.

Phaser: It's time for the third one.

Student 0: Is going to do the third exercise. Sun Sep 21 16:52:55 CST 2014

Student 2: Is going to do the third exercise. Sun Sep 21 16:52:55 CST 2014

Student 1: Is going to do the third exercise. Sun Sep 21 16:52:55 CST 2014

Student 3: Is going to do the third exercise. Sun Sep 21 16:52:55 CST 2014

Student 4: Is going to do the third exercise. Sun Sep 21 16:52:55 CST 2014

Student 0: Has finished the exam. Sun Sep 21 16:52:58 CST 2014

Student 4: Has finished the exam. Sun Sep 21 16:52:58 CST 2014

Student 2: Has finished the exam. Sun Sep 21 16:53:02 CST 2014

Student 3: Has finished the exam. Sun Sep 21 16:53:02 CST 2014

Student 1: Has finished the exam. Sun Sep 21 16:53:03 CST 2014

Phaser: All the students have finished the exam.

Phaser: Thank you for your time.

Main: The phaser has finished: true.

4、Exchanger

Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的。

public class ExchangerTest {

public static void main(String[] args) {

List<String> buffer1 = new ArrayList<String>();
List<String> buffer2 = new ArrayList<String>();

Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

Producer producer = new Producer(buffer1, exchanger);
Consumer consumer = new Consumer(buffer2, exchanger);

Thread threadProducer = new Thread(producer);
Thread threadConsumer = new Thread(consumer);
threadProducer.start();
threadConsumer.start();
}
}

class Producer implements Runnable {

private List<String> buffer;
private final Exchanger<List<String>> exchanger;

public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}

@Override
public void run() {
int cycle = 1;
for (int i = 0; i < 10; i++) {
System.out.printf("Producer: Cycle %d\n", cycle);

for (int j = 0; j < 10; j++) {
String message = "Event " + ((i * 10) + j);
System.out.printf("Producer: %s\n", message);
buffer.add(message);
}

try {
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer: " + buffer.size());
cycle++;
}
}
}

class Consumer implements Runnable {

private List<String> buffer;
private final Exchanger<List<String>> exchanger;

public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}

@Override
public void run() {
int cycle = 1;
for (int i = 0; i < 10; i++) {
System.out.printf("Consumer: Cycle %d\n", cycle);
try {
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 10; j++) {
String message = buffer.get(0);
System.out.println("Consumer: " + message);
buffer.remove(0);
}
cycle++;
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: