您的位置:首页 > 其它

组播实现失败检测(心跳)

2011-06-08 21:41 351 查看
原理概述  

   

      在分布式服务中,存在多个节点,而我们不能保证这些节点时刻都能正常运转,所以我们需要实现一种机制来检测新解冻的加入和正在运行节点的崩溃。为了检测节点崩溃,一种常用的方法是心跳(heart beat)。即,我们通过定时地发送消息到其他节点,来告诉其他人自己还活着,而   一段时间没有接收到原本存在的节点的心跳消息,我们就认为这个节点崩溃了。通过这种方式,我们实现了失败检测。

      多播是的优势网络数据包的数量减至最低。组播技术是基于UDP,比TCP更加轻量级。在一下实现里,节点每1秒发送一次心跳信息,若3秒没有收到信息,则认为节点崩溃。

 

 

实现细节

1.组播实现

参考了 http://ajava.org/code/Protocol/16181.html
上面的实现算是比较清楚了,我只是分别将server和client写成了线程,一台主机上既要有发送消息的服务,也要有接收消息的服务,也就是说,所有机器上运行的程序全都是一样的,并没有特定的server节点。

要提一下的是组播IP地址,IP组播地址,或称为主机组地址,由D类IP地址标记。D类IP地址的最高四位为“1110”,起范围从224.0.0.0到
239.255.255.255。

接收步骤:

1.设置监听udp的端口,设置并加入组播组。

2.开始接收消息,receive()方法阻塞。

发送步骤:

1.生成要发送的数据

2.设置好要发送的组和端口,发送。

注意在同一个组里,组IP和接收端口都是相同的。

 

 

2.定时器

3秒进行一次扫描,将所有存在本地的节点信息扫描一遍。若在3秒内有收到心跳消息,则仅仅将其标记,若在3秒内没有收到消息,即标记还在,则认为该节点崩溃,从本地中删除该节点。

 

 

3.Map和序列化

在我的实现里,我用一个Map来保存组播消息和心跳标记这样一组键值对。心跳标记为boolean型,组播消息是自定义类MultiMsg。

因为该类要能够在网络中传输,所以要将其类型转换为byte[],这个时候我们要用到序列化(Serializable),将一个类变为可序列化只要加上implements Serializable即可。然后我们通过一些输出转换,就可以将该类转换为byte[],这个比较简单,可以直接看我的实现。

然而一般的序列化的类并不能满足map的操作,要将一个自定义类加入map中,则必须为该类实现equals和hashcode函数,以覆盖map中原有的函数。

 

 

实现代码

1.Send

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
public class Send extends Thread{
String ip;
int port;
String attr;

public Send(String ip, int port, String attr){
this.ip = ip;
this.port = port;
this.attr = attr;
}

public void run(){
System.out.println("client start.......");
MulticastSocket multicastSocket = null;
try {
multicastSocket = new MulticastSocket();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
InetAddress group = null;
try {
group = InetAddress.getByName(ip);
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
InetAddress inet = null;
try {
inet = InetAddress.getLocalHost();
} catch (UnknownHostException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
MultiMsg msg = new MultiMsg(inet.getHostAddress(),port,attr);
ByteArrayOutputStream byteStream = new ByteArrayOutputStream(500);
ObjectOutputStream os;
byte[] data = null;
try {
os = new ObjectOutputStream(new BufferedOutputStream(byteStream));
os.writeObject(msg);
os.flush();
data = byteStream.toByteArray();
os.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
DatagramPacket packet = new DatagramPacket(data, data.length, group, port);
while (true) {
try {
multicastSocket.send(packet);
Thread.sleep(1000);
} catch (IOException ex) {
ex.printStackTrace();
System.exit(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}


 

2.Listen

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
public class Listen extends Thread{

String ip;
int port;
static Map<MultiMsg, Boolean> member ;

public Listen(String ip, int port){
this.ip = ip;
this.port = port;
Listen.member = new HashMap<MultiMsg, Boolean>();
}

public void run(){
System.out.println("server start.......");
MulticastSocket multicastSocket = null;
try {
multicastSocket = new MulticastSocket(port);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
InetAddress group = null;
try {
group = InetAddress.getByName(ip);
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
multicastSocket.joinGroup(group);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
byte[] data = new byte[500];
DatagramPacket packet = new DatagramPacket(data, data.length);
Timer timer = new Timer();
timer.schedule(new Detector(), 0, 3000);
while (true) {
try {
multicastSocket.receive(packet);
ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
ObjectInputStream os = new ObjectInputStream(byteStream);
MultiMsg msg = null;
try {
msg = (MultiMsg)os.readObject();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
member.put(msg,true);
} catch (IOException ex) {
System.exit(1);
}
}
}

private static class Detector extends java.util.TimerTask{

@Override
public void run() {
// TODO Auto-generated method stub
Iterator iter = member.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
MultiMsg key = (MultiMsg)entry.getKey();
Boolean val = (Boolean)entry.getValue();
if(val == false){
iter.remove();
continue;
}
member.put(key, false);
}
}
}
}


 

3.MultiMsg

import java.io.Serializable;
public class MultiMsg implements Serializable{
/**
*
*/
private static final long serialVersionUID = 4285977738049056295L;
private String ip;
private int port;
private String attr;

public MultiMsg(String ip , int port , String attr){
this.ip = ip;
this.port = port;
this.attr = attr;
}

public String Getattr(){
return this.attr;
}

public boolean equals(Object obj){
if(this == obj){
return true;
}
if (!(obj instanceof MultiMsg)) {
return false;
}
MultiMsg msg = (MultiMsg)obj;
if (this.ip.equals(msg.ip) && this.port == msg.port){
return true;
}else{
return false;
}
}

public int hashCode()
{
return this.ip.hashCode() * this.port;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息