您的位置:首页 > 编程语言 > Java开发

Java并发编程学习笔记(3)――线程同步进阶

2016-03-26 22:28 369 查看
Semaphore

Semaphore是一个控制访问多个共享资源的计数器。当计数器值大于0,代表还有可用资源,线程可以继续访问和使用资源;当计数器的值等于0,代表暂无可用资源,线程必须等待资源的释放。

一个典型的例子就是,有多台打印机,当新的打印任务来时,将检测是否还有可用的打印机,若有则使用,并将可用打印机数量减1;若无则等待;当使用完毕后,释放打印机,将可用打印机数量加1。

使用示例

public class SemaphoreTest {
private final Semaphore semaphore = new Semaphore(4);
public void print() {
try {
semaphore.acquire();
//do something
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
public class SemaphoreTest {
private final Semaphore semaphore = new Semaphore(4);
public void print() {
try {
semaphore.acquire();
//do something
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
同Lock一样,Semaphore也可开启公平机制,Semaphore(int permits, boolean fair)。
acquireUninterruptibly(),与acquire()的区别是:线程中断不会抛出异常
tryAcquire(),尝试获取semaphore。如果成功,返回true。如果不成功,返回false值,并不会被阻塞和等待semaphore的释放。

CountDownLatch

CountDownLatch,用于等待多个并发事件完成,相当于一个倒数计数器,初始化时传入一个整数参数,调用方法countDown()减少计数,当计数为0时,解除await()的休眠。

使用示例

public class CountDownLatchTest implements Runnable{
private final CountDownLatch controller = new CountDownLatch(4);
public void assemble() {
System.out.println("come one");
controller.countDown();
}
@Override
public void run() {
try {
System.out.println("wait " + controller.getCount());
controller.await();
System.out.println("get all");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CountDownLatchTest implements Runnable{
private final CountDownLatch controller = new CountDownLatch(4);
public void assemble() {
System.out.println("come one");
controller.countDown();
}
@Override
public void run() {
try {
System.out.println("wait " + controller.getCount());
controller.await();
System.out.println("get all");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CountDownLatch的计数器一旦初始化就无法更改。

CyclicBarrier

CyclicBarrier,用于在同一个点同步任务,与CountDownLatch类似,但CyclicBarrier可以在计数器等于0时执行初始化时传递的Runnable对象。

使用示例

public class CyclicBarrierTest implements Runnable {
private final CyclicBarrier barrier = new CyclicBarrier(4, new After());
@Override
public void run() {
try {
System.out.println("wait");
barrier.await();
System.out.println("pass");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

}
}
class After implements Runnable {
@Override
public void run() {
System.out.println("tear down");
}
}
public class CyclicBarrierTest implements Runnable {
private final CyclicBarrier barrier = new CyclicBarrier(4, new After());
@Override
public void run() {
try {
System.out.println("wait");
barrier.await();
System.out.println("pass");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

}
}
class After implements Runnable {
@Override
public void run() {
System.out.println("tear down");
}
}
CyclicBarrier的计数器可以重置,reset()。

Phaser

Phaser,用于阶段性并发任务的同步。CyclicBarrier可以在一个点同步任务,当需要在运行过程中多个点同步线程,则可以用Phaser来处理。

使用示例

public class PhaserTest implements Runnable {
private Phaser phaser;
public PhaserTest(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance();
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndDeregister();
}
}
class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(2);
PhaserTest t1 = new PhaserTest(phaser);
PhaserTest t2 = new PhaserTest(phaser);
Thread n1 = new Thread(t1);
Thread n2 = new Thread(t2);
n1.start();
n2.start();
try {
n1.join();
n2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
public class PhaserTest implements Runnable {
private Phaser phaser;
public PhaserTest(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance();
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndDeregister();
}
}
class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(2);
PhaserTest t1 = new PhaserTest(phaser);
PhaserTest t2 = new PhaserTest(phaser);
Thread n1 = new Thread(t1);
Thread n2 = new Thread(t2);
n1.start();
n2.start();
try {
n1.join();
n2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
若没有Phaser来阶段性的同步,输出的结果可能是“1 1 2 2”、“1 2 1 2”。但加入Phaser来阶段性的同步后,输出结果一定是“1 1 2 2”。

Phaser提供每次phaser改变阶段都会执行的方法,onAdvance(),可以Override该方法来实现不同的业务逻辑。

使用示例

Phaser phaser = new Phaser(2){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("p:"+phase);
return super.onAdvance(phase, registeredParties);
}
};
Phaser phaser = new Phaser(2){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("p:"+phase);
return super.onAdvance(phase, registeredParties);
}
};
再次运行上述程序,结果则变为“1 1 p0 2 2 p1 end”。

Exchanger

Exchanger,用于并发任务间交换数据,但只适用于1对1的交换。

使用示例

public class ExchangerTest implements Runnable {
private final Exchanger<String> exchanger;
private String value;
public ExchangerTest(Exchanger<String> exchanger, String value) {
this.exchanger = exchanger;
this.value = value;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Before: " + value);
try {
value = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " After: " + value);
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<String>();
ExchangerTest e1 = new ExchangerTest(exchanger, "11111");
ExchangerTest e2 = new ExchangerTest(exchanger, "22222");
Thread t1 = new Thread(e1, "t1");
Thread t2 = new Thread(e2, "t2");
t1.start();
t2.start();
}
}
public class ExchangerTest implements Runnable {
private final Exchanger<String> exchanger;
private String value;
public ExchangerTest(Exchanger<String> exchanger, String value) {
this.exchanger = exchanger;
this.value = value;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Before: " + value);
try {
value = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " After: " + value);
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<String>();
ExchangerTest e1 = new ExchangerTest(exchanger, "11111");
ExchangerTest e2 = new ExchangerTest(exchanger, "22222");
Thread t1 = new Thread(e1, "t1");
Thread t2 = new Thread(e2, "t2");
t1.start();
t2.start();
}
}
运行结果

t1 Before: 11111
t2 Before: 22222
t2 After: 11111
t1 After: 22222
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: