组播实现失败检测(心跳)
2011-06-08 21:41
351 查看
原理概述
在分布式服务中,存在多个节点,而我们不能保证这些节点时刻都能正常运转,所以我们需要实现一种机制来检测新解冻的加入和正在运行节点的崩溃。为了检测节点崩溃,一种常用的方法是心跳(heart beat)。即,我们通过定时地发送消息到其他节点,来告诉其他人自己还活着,而 一段时间没有接收到原本存在的节点的心跳消息,我们就认为这个节点崩溃了。通过这种方式,我们实现了失败检测。
多播是的优势网络数据包的数量减至最低。组播技术是基于UDP,比TCP更加轻量级。在一下实现里,节点每1秒发送一次心跳信息,若3秒没有收到信息,则认为节点崩溃。
实现细节
1.组播实现
参考了 http://ajava.org/code/Protocol/16181.html
上面的实现算是比较清楚了,我只是分别将server和client写成了线程,一台主机上既要有发送消息的服务,也要有接收消息的服务,也就是说,所有机器上运行的程序全都是一样的,并没有特定的server节点。
要提一下的是组播IP地址,IP组播地址,或称为主机组地址,由D类IP地址标记。D类IP地址的最高四位为“1110”,起范围从224.0.0.0到
239.255.255.255。
接收步骤:
1.设置监听udp的端口,设置并加入组播组。
2.开始接收消息,receive()方法阻塞。
发送步骤:
1.生成要发送的数据
2.设置好要发送的组和端口,发送。
注意在同一个组里,组IP和接收端口都是相同的。
2.定时器
3秒进行一次扫描,将所有存在本地的节点信息扫描一遍。若在3秒内有收到心跳消息,则仅仅将其标记,若在3秒内没有收到消息,即标记还在,则认为该节点崩溃,从本地中删除该节点。
3.Map和序列化
在我的实现里,我用一个Map来保存组播消息和心跳标记这样一组键值对。心跳标记为boolean型,组播消息是自定义类MultiMsg。
因为该类要能够在网络中传输,所以要将其类型转换为byte[],这个时候我们要用到序列化(Serializable),将一个类变为可序列化只要加上implements Serializable即可。然后我们通过一些输出转换,就可以将该类转换为byte[],这个比较简单,可以直接看我的实现。
然而一般的序列化的类并不能满足map的操作,要将一个自定义类加入map中,则必须为该类实现equals和hashcode函数,以覆盖map中原有的函数。
实现代码
1.Send
2.Listen
3.MultiMsg
在分布式服务中,存在多个节点,而我们不能保证这些节点时刻都能正常运转,所以我们需要实现一种机制来检测新解冻的加入和正在运行节点的崩溃。为了检测节点崩溃,一种常用的方法是心跳(heart beat)。即,我们通过定时地发送消息到其他节点,来告诉其他人自己还活着,而 一段时间没有接收到原本存在的节点的心跳消息,我们就认为这个节点崩溃了。通过这种方式,我们实现了失败检测。
多播是的优势网络数据包的数量减至最低。组播技术是基于UDP,比TCP更加轻量级。在一下实现里,节点每1秒发送一次心跳信息,若3秒没有收到信息,则认为节点崩溃。
实现细节
1.组播实现
参考了 http://ajava.org/code/Protocol/16181.html
上面的实现算是比较清楚了,我只是分别将server和client写成了线程,一台主机上既要有发送消息的服务,也要有接收消息的服务,也就是说,所有机器上运行的程序全都是一样的,并没有特定的server节点。
要提一下的是组播IP地址,IP组播地址,或称为主机组地址,由D类IP地址标记。D类IP地址的最高四位为“1110”,起范围从224.0.0.0到
239.255.255.255。
接收步骤:
1.设置监听udp的端口,设置并加入组播组。
2.开始接收消息,receive()方法阻塞。
发送步骤:
1.生成要发送的数据
2.设置好要发送的组和端口,发送。
注意在同一个组里,组IP和接收端口都是相同的。
2.定时器
3秒进行一次扫描,将所有存在本地的节点信息扫描一遍。若在3秒内有收到心跳消息,则仅仅将其标记,若在3秒内没有收到消息,即标记还在,则认为该节点崩溃,从本地中删除该节点。
3.Map和序列化
在我的实现里,我用一个Map来保存组播消息和心跳标记这样一组键值对。心跳标记为boolean型,组播消息是自定义类MultiMsg。
因为该类要能够在网络中传输,所以要将其类型转换为byte[],这个时候我们要用到序列化(Serializable),将一个类变为可序列化只要加上implements Serializable即可。然后我们通过一些输出转换,就可以将该类转换为byte[],这个比较简单,可以直接看我的实现。
然而一般的序列化的类并不能满足map的操作,要将一个自定义类加入map中,则必须为该类实现equals和hashcode函数,以覆盖map中原有的函数。
实现代码
1.Send
import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.net.UnknownHostException; public class Send extends Thread{ String ip; int port; String attr; public Send(String ip, int port, String attr){ this.ip = ip; this.port = port; this.attr = attr; } public void run(){ System.out.println("client start......."); MulticastSocket multicastSocket = null; try { multicastSocket = new MulticastSocket(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } InetAddress group = null; try { group = InetAddress.getByName(ip); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } InetAddress inet = null; try { inet = InetAddress.getLocalHost(); } catch (UnknownHostException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } MultiMsg msg = new MultiMsg(inet.getHostAddress(),port,attr); ByteArrayOutputStream byteStream = new ByteArrayOutputStream(500); ObjectOutputStream os; byte[] data = null; try { os = new ObjectOutputStream(new BufferedOutputStream(byteStream)); os.writeObject(msg); os.flush(); data = byteStream.toByteArray(); os.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } DatagramPacket packet = new DatagramPacket(data, data.length, group, port); while (true) { try { multicastSocket.send(packet); Thread.sleep(1000); } catch (IOException ex) { ex.printStackTrace(); System.exit(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
2.Listen
import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Timer; public class Listen extends Thread{ String ip; int port; static Map<MultiMsg, Boolean> member ; public Listen(String ip, int port){ this.ip = ip; this.port = port; Listen.member = new HashMap<MultiMsg, Boolean>(); } public void run(){ System.out.println("server start......."); MulticastSocket multicastSocket = null; try { multicastSocket = new MulticastSocket(port); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } InetAddress group = null; try { group = InetAddress.getByName(ip); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { multicastSocket.joinGroup(group); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } byte[] data = new byte[500]; DatagramPacket packet = new DatagramPacket(data, data.length); Timer timer = new Timer(); timer.schedule(new Detector(), 0, 3000); while (true) { try { multicastSocket.receive(packet); ByteArrayInputStream byteStream = new ByteArrayInputStream(data); ObjectInputStream os = new ObjectInputStream(byteStream); MultiMsg msg = null; try { msg = (MultiMsg)os.readObject(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } member.put(msg,true); } catch (IOException ex) { System.exit(1); } } } private static class Detector extends java.util.TimerTask{ @Override public void run() { // TODO Auto-generated method stub Iterator iter = member.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); MultiMsg key = (MultiMsg)entry.getKey(); Boolean val = (Boolean)entry.getValue(); if(val == false){ iter.remove(); continue; } member.put(key, false); } } } }
3.MultiMsg
import java.io.Serializable; public class MultiMsg implements Serializable{ /** * */ private static final long serialVersionUID = 4285977738049056295L; private String ip; private int port; private String attr; public MultiMsg(String ip , int port , String attr){ this.ip = ip; this.port = port; this.attr = attr; } public String Getattr(){ return this.attr; } public boolean equals(Object obj){ if(this == obj){ return true; } if (!(obj instanceof MultiMsg)) { return false; } MultiMsg msg = (MultiMsg)obj; if (this.ip.equals(msg.ip) && this.port == msg.port){ return true; }else{ return false; } } public int hashCode() { return this.ip.hashCode() * this.port; } }
相关文章推荐
- Qt 实现脉搏检测-1-心跳曲线部分
- Netty实现服务端客户端长连接通讯、心跳检测及自定义报文发送(一)
- netty的心跳检测实现
- Netty 4.0 实现心跳检测和断线重连
- Netty实践(四):心跳检测实现
- 关于Android软件检测心跳的实现原理
- swoole实现Timer定时器、心跳检测及Task进阶实例:mysql连接池
- 基于MINA实现server端心跳检测(KeepAliveFilter)
- Netty实现心跳检测
- Netty 4.0 实现心跳检测和断线重连
- netty的心跳检测实现
- Netty实现服务端客户端长连接通讯及心跳检测
- Perl脚本实现检测主机心跳信号功能
- netty实现tcp长连接和心跳检测
- Qt 实现脉搏检测-1-心跳曲线部分
- 基于MINA实现server端心跳检测(KeepAliveFilter)
- Netty 4.0 实现心跳检测和断线重连
- Netty 4.0 实现心跳检测和断线重连
- 基于MINA实现server端心跳检测(KeepAliveFilter)
- 用Rails的Ajax实现注册时的用户名检测