您的位置:首页 > 其它

dubbo 集群容错模式源码学习---ForkingCluster

2018-10-30 19:30 393 查看

ForkingCluster 模式

思想:服务端多个节点并行响应(并发任务)某个服务的请求,将执行完的结果放在一个阻塞队列中(FIFO)。第一个完成的任务的结果进入队列,将会马上返回,不会等待所有任务执行完成, 只要有一个任务完成,结果放到队列中,队列中弹出第一个(最快完成响应的那个节点)结果, 返回服务请求方。如果n 个并发任务都出错了,抛异常,将最后一个异常放到队列中。服务请求返回就是抛异常。通常用于实时性高的读操作,相对耗费资源。最大并行数要可配置。

public class ForkCluster {

private final ExecutorService executor = Executors.newCachedThreadPool();

@Test
public void run() {
String response = forkCluster();
System.out.println(response);
}

/**
* 思想:多个节点并行响应(并发任务)某个服务的请求,将执行完的结果放在一个阻塞队列中(FIFO)。
* 不会等待所有任务执行完成, 只要有一个任务完成,结果放到队列中,队列中弹出第一个(最快完成响应的那个节点)结果, 返回服务请求方。
* 如果n 个并发任务都出错了,抛异常,将最后一个异常放到队列中。服务请求返回就是抛异常。
*
* 通常用于实时性高的读操作,相对耗费资源。最大并行数要可配置。
*
*
*/
public String forkCluster() {
Node node1 = new Node(1);
Node node2 = new Node(2);
Node node3 = new Node(3);
Node node4 = new Node(4);
List<Node> cluster = new ArrayList<>();
cluster.add(node1);
cluster.add(node2);
cluster.add(node3);
cluster.add(node4);
BlockingQueue<Object> responses = new LinkedBlockingQueue<>();
final AtomicInteger count = new AtomicInteger(); // 统计异常节点的个数.
for (Node node : cluster) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
String response = node.doSomething();
responses.offer(response);
}catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= cluster.size())
responses.offer(e);
}
}
});
}
try {
Object resp = responses.poll(1000, TimeUnit.MILLISECONDS);
if (resp instanceof Throwable) {
throw new RuntimeException("Cannot perform cluster fork. ");
}
return (String)resp;
} catch (InterruptedException e) {
throw new RuntimeException("Cannot get cluster fork response.");
}
}

class Node {
private int id;

Node(int id) {
this.id = id;
}

String doSomething() {
return "Node " + id + "第一个完成响应";
};
}

}

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: