跟着实例学习ZooKeeper的用法: Barrier
2015-06-01 20:19
483 查看
转自:http://ifeve.com/zookeeper-barrier/
分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。
比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。
首先你需要设置栅栏,它将阻塞在它上面等待的线程:
然后需要阻塞的线程调用“方法等待放行条件:
当条件满足时,移除栅栏,所有等待的线程将继续执行:
异常处理 DistributedBarrier 会监控连接状态,当连接断掉时
看一个例子:
这个例子创建了
如果你开始不设置栅栏,所有的线程就不会阻塞住。
双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是
DistributedBarrier 会监控连接状态,当连接断掉时
例子代码:
分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。
比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。
栅栏Barrier
DistributedBarrier类实现了栅栏的功能。 它的构造函数如下:
public DistributedBarrier(CuratorFramework client, String barrierPath) Parameters: client - client barrierPath - path to use as the barrier
首先你需要设置栅栏,它将阻塞在它上面等待的线程:
setBarrier();
然后需要阻塞的线程调用“方法等待放行条件:
public void waitOnBarrier()
当条件满足时,移除栅栏,所有等待的线程将继续执行:
removeBarrier();
异常处理 DistributedBarrier 会监控连接状态,当连接断掉时
waitOnBarrier()方法会抛出异常。
看一个例子:
package com.colobu.zkrecipe.barrier;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
public class DistributedBarrierExample {
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception {
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
controlBarrier.setBarrier();
for (int i = 0; i < QTY; ++i) {
final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " waits on Barrier");
barrier.waitOnBarrier();
System.out.println("Client #" + index + " begins");
return null;
}
};
service.submit(task);
}
Thread.sleep(10000);
System.out.println("all Barrier instances should wait the condition");
controlBarrier.removeBarrier();
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
}
}
}
这个例子创建了
controlBarrier来设置栅栏和移除栅栏。 我们创建了5个线程,在此Barrier上等待。 最后移除栅栏后所有的线程才继续执行。
如果你开始不设置栅栏,所有的线程就不会阻塞住。
双栅栏Double Barrier
双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier。 构造函数为:
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty) Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until all members have entered. When leave() is called, it blocks until all members have left. Parameters: client - the client barrierPath - path to use memberQty - the number of members in the barrier
memberQty是成员数量,当
enter方法被调用时,成员被阻塞,直到所有的成员都调用了
enter。 当
leave方法被调用时,它也阻塞调用线程, 知道所有的成员都调用了
leave。 就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。
DistributedBarrier 会监控连接状态,当连接断掉时
enter()和
leave方法会抛出异常。
例子代码:
package com.colobu.zkrecipe.barrier; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; public class DistributedBarrierExample { private static final int QTY = 5; private static final String PATH = "/examples/barrier"; public static void main(String[] args) throws Exception { try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY); final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { Thread.sleep((long) (3 * Math.random())); System.out.println("Client #" + index + " enters"); barrier.enter(); System.out.println("Client #" + index + " begins"); Thread.sleep((long) (3000 * Math.random())); barrier.leave(); System.out.println("Client #" + index + " left"); return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } } }
相关文章推荐
- linux dns 服务器搭建及配置
- CSAPP 深入理解计算机系统 Buflab实验,缓冲区溢出攻击实验(4)
- Fibonacci Numbers(斐波那契数列 前四位 后四位)
- 【Android】仿微信通讯录中的右侧字母表控件
- 黑马程序员——java基础----集合框架知识点总结(一)
- 关于List操作排序工具,collection.sort的原理
- 阅读程序 多重继承
- java web tomcat,myeclipse, mysql
- 我想过不打扰,却发现做不到。
- 集合框架(四)如何使用以及何时使用HashSet、LinkedHashSet或者TreeSet来存储元素
- Jsp初探2015-06-01
- html5语义化标签(二)
- 创建了对嵌入的互操作程序集间接引用,无法嵌入互操作类型
- 启动代码和Bootloader区别
- 使用预处理语句实现数据查询的方法
- 在PowerPoint插入可计算和排序表格ppt模板素材
- Android属性动画完全解析(上),初识属性动画的基本用法 .
- 九度OJ-题目1510:替换空格
- HTML语义化标签(一)
- 织梦ajax登陆