控制并发线程数的Semaphore和线程之间的数据交换Exchanger
2017-05-06 11:28
671 查看
控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
可以这样来形容,比如:xx马路要限制车流量,只允许同时又一百辆车在这条路上行驶,其他的都必须等在路口,所以前一百辆会看到绿灯,可以开进马路,后面的车会看到红灯,不能开进该马路。但是如果如果前一百辆中有5辆驶离了马路,那么后面就允许有5辆驶入马路。这个例子中的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
1、应用场景
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景。比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。
代码中,虽然有30个线程在执行,但只允许10个并发执行。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。
除此之外,还提供一些其他的方法,具体如下。
int availablePermits():返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads():是否有线程正在等待获取许可证。
void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。
线程之间的数据交换Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。它用于线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这2个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当2个线程都到达同步点时,这2个线程就可以交换数据,将本线程生产出来的数据传递给对方。
Exchanger的应用场景
Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两个人的数据,并使用交叉规则得出2个交配结果。Exchanger也可用于校对工作,比如,我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看是否录入一致。如:
如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
可以这样来形容,比如:xx马路要限制车流量,只允许同时又一百辆车在这条路上行驶,其他的都必须等在路口,所以前一百辆会看到绿灯,可以开进马路,后面的车会看到红灯,不能开进该马路。但是如果如果前一百辆中有5辆驶离了马路,那么后面就允许有5辆驶入马路。这个例子中的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
1、应用场景
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景。比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。
public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for(int i=0;i<THREAD_COUNT;i++){ threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire();//获取一个许可证 System.out.println("save data..."); System.out.println("..."+s.availablePermits()+"..."+s.getQueueLength()+"..."+s.hasQueuedThreads()); s.release();//归还一个许可证 } catch (InterruptedException e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } }
代码中,虽然有30个线程在执行,但只允许10个并发执行。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。
除此之外,还提供一些其他的方法,具体如下。
int availablePermits():返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads():是否有线程正在等待获取许可证。
void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。
线程之间的数据交换Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。它用于线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这2个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当2个线程都到达同步点时,这2个线程就可以交换数据,将本线程生产出来的数据传递给对方。
Exchanger的应用场景
Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两个人的数据,并使用交叉规则得出2个交配结果。Exchanger也可用于校对工作,比如,我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看是否录入一致。如:
public class ExchangerTest { private static Exchanger<String> exchanger = new Exchanger<String>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { String A = "数据A"; try { exchanger.exchange(A); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.execute(new Runnable() { @Override public void run() { String B = "数据B"; try { String A = exchanger.exchange("B"); System.out.println("A和B的数据是否一致:"+A.equals(B)+";A录入的是:"+A); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.shutdown(); } }
如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。
相关文章推荐
- 黑马程序员-Condition条件对象、Semaphore、CyclicBarrier、倒计时门栓 CountDownLatch、Exchanger(实现两个线程之间数据交换
- jdk5 Exchanger 线程之间数据交换
- Java线程总结(十):并发包------两个线程交换数据Exchanger
- Exchanger两个线程之间的数据交换
- exchanger 线程之间交换数据
- Java多线程之~~~使用Exchanger在线程之间交换数据[这个结合多线程并行会有解决很多问题]生产者消费者模型
- Java多线程与并发库高级应用之线程数据交换Exchanger
- Java多线程/并发25、Exchanger线程数据交换
- Java并发工具类(四):线程间交换数据的Exchanger
- Java多线程之~~~使用Exchanger在线程之间交换数据
- Java并发工具类之线程间数据交换工具Exchanger
- jdk5 Exchanger 线程之间数据交换
- 并发工具类(四)线程间的交换数据 Exchanger
- Exchanger两个线程之间交换数据
- Java多线程与并发库高级应用之线程数据交换Exchanger
- 控制并发线程数的Semaphore
- Java_并发线程_Semaphore、CountDownLatch、CyclicBarrier、Exchanger
- java多线程并发——Exchanger 两个任务之间交换对象
- 控制并发线程数的Semaphore
- 并发任务之间的数据交换