Java并发编程学习笔记(3)――线程同步进阶
2016-03-26 22:28
369 查看
Semaphore
Semaphore是一个控制访问多个共享资源的计数器。当计数器值大于0,代表还有可用资源,线程可以继续访问和使用资源;当计数器的值等于0,代表暂无可用资源,线程必须等待资源的释放。
一个典型的例子就是,有多台打印机,当新的打印任务来时,将检测是否还有可用的打印机,若有则使用,并将可用打印机数量减1;若无则等待;当使用完毕后,释放打印机,将可用打印机数量加1。
使用示例
public class SemaphoreTest {
private final Semaphore semaphore = new Semaphore(4);
public void print() {
try {
semaphore.acquire();
//do something
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
public class SemaphoreTest {
private final Semaphore semaphore = new Semaphore(4);
public void print() {
try {
semaphore.acquire();
//do something
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
同Lock一样,Semaphore也可开启公平机制,Semaphore(int permits, boolean fair)。
acquireUninterruptibly(),与acquire()的区别是:线程中断不会抛出异常
tryAcquire(),尝试获取semaphore。如果成功,返回true。如果不成功,返回false值,并不会被阻塞和等待semaphore的释放。
CountDownLatch
CountDownLatch,用于等待多个并发事件完成,相当于一个倒数计数器,初始化时传入一个整数参数,调用方法countDown()减少计数,当计数为0时,解除await()的休眠。
使用示例
public class CountDownLatchTest implements Runnable{
private final CountDownLatch controller = new CountDownLatch(4);
public void assemble() {
System.out.println("come one");
controller.countDown();
}
@Override
public void run() {
try {
System.out.println("wait " + controller.getCount());
controller.await();
System.out.println("get all");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CountDownLatchTest implements Runnable{
private final CountDownLatch controller = new CountDownLatch(4);
public void assemble() {
System.out.println("come one");
controller.countDown();
}
@Override
public void run() {
try {
System.out.println("wait " + controller.getCount());
controller.await();
System.out.println("get all");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CountDownLatch的计数器一旦初始化就无法更改。
CyclicBarrier
CyclicBarrier,用于在同一个点同步任务,与CountDownLatch类似,但CyclicBarrier可以在计数器等于0时执行初始化时传递的Runnable对象。
使用示例
public class CyclicBarrierTest implements Runnable {
private final CyclicBarrier barrier = new CyclicBarrier(4, new After());
@Override
public void run() {
try {
System.out.println("wait");
barrier.await();
System.out.println("pass");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class After implements Runnable {
@Override
public void run() {
System.out.println("tear down");
}
}
public class CyclicBarrierTest implements Runnable {
private final CyclicBarrier barrier = new CyclicBarrier(4, new After());
@Override
public void run() {
try {
System.out.println("wait");
barrier.await();
System.out.println("pass");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class After implements Runnable {
@Override
public void run() {
System.out.println("tear down");
}
}
CyclicBarrier的计数器可以重置,reset()。
Phaser
Phaser,用于阶段性并发任务的同步。CyclicBarrier可以在一个点同步任务,当需要在运行过程中多个点同步线程,则可以用Phaser来处理。
使用示例
public class PhaserTest implements Runnable {
private Phaser phaser;
public PhaserTest(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance();
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndDeregister();
}
}
class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(2);
PhaserTest t1 = new PhaserTest(phaser);
PhaserTest t2 = new PhaserTest(phaser);
Thread n1 = new Thread(t1);
Thread n2 = new Thread(t2);
n1.start();
n2.start();
try {
n1.join();
n2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
public class PhaserTest implements Runnable {
private Phaser phaser;
public PhaserTest(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance();
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndDeregister();
}
}
class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(2);
PhaserTest t1 = new PhaserTest(phaser);
PhaserTest t2 = new PhaserTest(phaser);
Thread n1 = new Thread(t1);
Thread n2 = new Thread(t2);
n1.start();
n2.start();
try {
n1.join();
n2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
若没有Phaser来阶段性的同步,输出的结果可能是“1 1 2 2”、“1 2 1 2”。但加入Phaser来阶段性的同步后,输出结果一定是“1 1 2 2”。
Phaser提供每次phaser改变阶段都会执行的方法,onAdvance(),可以Override该方法来实现不同的业务逻辑。
使用示例
Phaser phaser = new Phaser(2){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("p:"+phase);
return super.onAdvance(phase, registeredParties);
}
};
Phaser phaser = new Phaser(2){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("p:"+phase);
return super.onAdvance(phase, registeredParties);
}
};
再次运行上述程序,结果则变为“1 1 p0 2 2 p1 end”。
Exchanger
Exchanger,用于并发任务间交换数据,但只适用于1对1的交换。
使用示例
public class ExchangerTest implements Runnable {
private final Exchanger<String> exchanger;
private String value;
public ExchangerTest(Exchanger<String> exchanger, String value) {
this.exchanger = exchanger;
this.value = value;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Before: " + value);
try {
value = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " After: " + value);
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<String>();
ExchangerTest e1 = new ExchangerTest(exchanger, "11111");
ExchangerTest e2 = new ExchangerTest(exchanger, "22222");
Thread t1 = new Thread(e1, "t1");
Thread t2 = new Thread(e2, "t2");
t1.start();
t2.start();
}
}
public class ExchangerTest implements Runnable {
private final Exchanger<String> exchanger;
private String value;
public ExchangerTest(Exchanger<String> exchanger, String value) {
this.exchanger = exchanger;
this.value = value;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Before: " + value);
try {
value = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " After: " + value);
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<String>();
ExchangerTest e1 = new ExchangerTest(exchanger, "11111");
ExchangerTest e2 = new ExchangerTest(exchanger, "22222");
Thread t1 = new Thread(e1, "t1");
Thread t2 = new Thread(e2, "t2");
t1.start();
t2.start();
}
}
运行结果
t1 Before: 11111
t2 Before: 22222
t2 After: 11111
t1 After: 22222
Semaphore是一个控制访问多个共享资源的计数器。当计数器值大于0,代表还有可用资源,线程可以继续访问和使用资源;当计数器的值等于0,代表暂无可用资源,线程必须等待资源的释放。
一个典型的例子就是,有多台打印机,当新的打印任务来时,将检测是否还有可用的打印机,若有则使用,并将可用打印机数量减1;若无则等待;当使用完毕后,释放打印机,将可用打印机数量加1。
使用示例
public class SemaphoreTest {
private final Semaphore semaphore = new Semaphore(4);
public void print() {
try {
semaphore.acquire();
//do something
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
public class SemaphoreTest {
private final Semaphore semaphore = new Semaphore(4);
public void print() {
try {
semaphore.acquire();
//do something
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
同Lock一样,Semaphore也可开启公平机制,Semaphore(int permits, boolean fair)。
acquireUninterruptibly(),与acquire()的区别是:线程中断不会抛出异常
tryAcquire(),尝试获取semaphore。如果成功,返回true。如果不成功,返回false值,并不会被阻塞和等待semaphore的释放。
CountDownLatch
CountDownLatch,用于等待多个并发事件完成,相当于一个倒数计数器,初始化时传入一个整数参数,调用方法countDown()减少计数,当计数为0时,解除await()的休眠。
使用示例
public class CountDownLatchTest implements Runnable{
private final CountDownLatch controller = new CountDownLatch(4);
public void assemble() {
System.out.println("come one");
controller.countDown();
}
@Override
public void run() {
try {
System.out.println("wait " + controller.getCount());
controller.await();
System.out.println("get all");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CountDownLatchTest implements Runnable{
private final CountDownLatch controller = new CountDownLatch(4);
public void assemble() {
System.out.println("come one");
controller.countDown();
}
@Override
public void run() {
try {
System.out.println("wait " + controller.getCount());
controller.await();
System.out.println("get all");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CountDownLatch的计数器一旦初始化就无法更改。
CyclicBarrier
CyclicBarrier,用于在同一个点同步任务,与CountDownLatch类似,但CyclicBarrier可以在计数器等于0时执行初始化时传递的Runnable对象。
使用示例
public class CyclicBarrierTest implements Runnable {
private final CyclicBarrier barrier = new CyclicBarrier(4, new After());
@Override
public void run() {
try {
System.out.println("wait");
barrier.await();
System.out.println("pass");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class After implements Runnable {
@Override
public void run() {
System.out.println("tear down");
}
}
public class CyclicBarrierTest implements Runnable {
private final CyclicBarrier barrier = new CyclicBarrier(4, new After());
@Override
public void run() {
try {
System.out.println("wait");
barrier.await();
System.out.println("pass");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class After implements Runnable {
@Override
public void run() {
System.out.println("tear down");
}
}
CyclicBarrier的计数器可以重置,reset()。
Phaser
Phaser,用于阶段性并发任务的同步。CyclicBarrier可以在一个点同步任务,当需要在运行过程中多个点同步线程,则可以用Phaser来处理。
使用示例
public class PhaserTest implements Runnable {
private Phaser phaser;
public PhaserTest(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance();
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndDeregister();
}
}
class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(2);
PhaserTest t1 = new PhaserTest(phaser);
PhaserTest t2 = new PhaserTest(phaser);
Thread n1 = new Thread(t1);
Thread n2 = new Thread(t2);
n1.start();
n2.start();
try {
n1.join();
n2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
public class PhaserTest implements Runnable {
private Phaser phaser;
public PhaserTest(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance();
try {
Long duration = (long)(Math.random()*10000);
Thread.sleep(duration);
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndDeregister();
}
}
class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(2);
PhaserTest t1 = new PhaserTest(phaser);
PhaserTest t2 = new PhaserTest(phaser);
Thread n1 = new Thread(t1);
Thread n2 = new Thread(t2);
n1.start();
n2.start();
try {
n1.join();
n2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
若没有Phaser来阶段性的同步,输出的结果可能是“1 1 2 2”、“1 2 1 2”。但加入Phaser来阶段性的同步后,输出结果一定是“1 1 2 2”。
Phaser提供每次phaser改变阶段都会执行的方法,onAdvance(),可以Override该方法来实现不同的业务逻辑。
使用示例
Phaser phaser = new Phaser(2){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("p:"+phase);
return super.onAdvance(phase, registeredParties);
}
};
Phaser phaser = new Phaser(2){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("p:"+phase);
return super.onAdvance(phase, registeredParties);
}
};
再次运行上述程序,结果则变为“1 1 p0 2 2 p1 end”。
Exchanger
Exchanger,用于并发任务间交换数据,但只适用于1对1的交换。
使用示例
public class ExchangerTest implements Runnable {
private final Exchanger<String> exchanger;
private String value;
public ExchangerTest(Exchanger<String> exchanger, String value) {
this.exchanger = exchanger;
this.value = value;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Before: " + value);
try {
value = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " After: " + value);
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<String>();
ExchangerTest e1 = new ExchangerTest(exchanger, "11111");
ExchangerTest e2 = new ExchangerTest(exchanger, "22222");
Thread t1 = new Thread(e1, "t1");
Thread t2 = new Thread(e2, "t2");
t1.start();
t2.start();
}
}
public class ExchangerTest implements Runnable {
private final Exchanger<String> exchanger;
private String value;
public ExchangerTest(Exchanger<String> exchanger, String value) {
this.exchanger = exchanger;
this.value = value;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Before: " + value);
try {
value = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " After: " + value);
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<String>();
ExchangerTest e1 = new ExchangerTest(exchanger, "11111");
ExchangerTest e2 = new ExchangerTest(exchanger, "22222");
Thread t1 = new Thread(e1, "t1");
Thread t2 = new Thread(e2, "t2");
t1.start();
t2.start();
}
}
运行结果
t1 Before: 11111
t2 Before: 22222
t2 After: 11111
t1 After: 22222
相关文章推荐
- java反射详解
- java四种内部类详解
- java中的左右移
- java中的nextLine
- eclipse+安卓开发环境搭建
- 我的java学习之路
- JAVA多线程——线程的中断
- 共同学习Java源码--常用数据类型--String(三)
- [Think In Java]基础拾遗3 - 容器、I/O、NIO、序列化
- java-cef系列视频第二集:搭建开发环境
- Java Nio 十一、Java NIO:非堵塞服务器
- 深入了解Spring4整合Hibernate4时的No Session异常的原理与解决方案
- 第67课:Spark SQL下采用Java和Scala实现Join的案例综合实战(巩固前面学习的Spark SQL知识)
- JAVA方法声明中参数的那三个小数点
- 20145311 《Java程序设计》第4周学习总结
- struts2类型转化器
- 第一章 第一个spring boot程序
- JAVA大数处理(BigInteger,BigDecimal)
- spring
- java 多线程