您的位置:首页 > 其它

Netty实现心跳检测

2018-02-16 11:59 531 查看
使用Socket通信一般经常用来处理多个服务器之间的心跳检测,一般来讲去维护服务器集群,肯定有一台或几台服务器主机Master,还应该有n台Slave。Master常常需要知道自己下面从服务器的各方面情况,进行实时监控,这在分布式架构里叫做心跳检测或心跳监控,可以使用Netty完成这个任务。

使用Sigar监控Slave的CPU、内存、磁盘使用情况,需要引入下图中的2个jar包



由于本机是Windows 7 的64位系统,需要将sigar-amd64-winnt.dll放置到C:\Program Files\Java\jdk1.8.0_121\bin中



Client.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

public static void main(String[] args) throws Exception{

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//Netty中使用Marshalling
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClienHeartBeattHandler());
}
});

ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();

cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}


ClienHeartBeattHandler.java
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import org.hyperic.sigar.Swap;

public class ClienHeartBeattHandler extends ChannelHandlerAdapter {

private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private ScheduledFuture<?> heartBeat;

/*channel激活所执行的方法*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String ip = InetAddress.getLocalHost().getHostAddress();
//在实际的项目中可能需要根据具体需求使用特定的认证信息,并加密处理认证信息,保证网络传输认证信息的安全
ctx.writeAndFlush(ip);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if(msg instanceof String){
String ret = (String)msg;
if("server authentication success".equals(ret)){
// 握手成功,主动发送心跳消息。初始化时等待0秒,每隔2秒执行一次发送心跳信息的任务
/*Parameters:
command: the task to execute
initialDelay: the time to delay first execution
delay: the delay between the termination of one execution and the commencement of the next
unit: the time unit of the initialDelay and delay parameters*/
this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS);
System.out.println("Client:接收到Server端返回信息"+msg);
}
else {
System.out.println(msg);
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}

//内部类,心跳信息的类
private class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx;

public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
}

@Override
public void run() {
try {
RequestInfo info = new RequestInfo();
InetAddress addr=InetAddress.getLocalHost();
info.setIp(addr.getHostAddress());

Sigar sigar = new Sigar();
//cpu info
CpuPerc cpuPerc = sigar.getCpuPerc();
HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
cpuPercMap.put("combined", cpuPerc.getCombined());
cpuPercMap.put("user", cpuPerc.getUser());
cpuPercMap.put("sys", cpuPerc.getSys());
cpuPercMap.put("wait", cpuPerc.getWait());
cpuPercMap.put("idle", cpuPerc.getIdle());
//memory info
Mem mem = sigar.getMem();
HashMap<String, Object> memoryMap = new HashMap<String, Object>();
memoryMap.put("total", mem.getTotal() / 1024L);
memoryMap.put("used", mem.getUsed() / 1024L);
memoryMap.put("free", mem.getFree() / 1024L);
info.setCpuPercMap(cpuPercMap);
info.setMemoryMap(memoryMap);
ctx.writeAndFlush(info);

} catch (Exception e) {
e.printStackTrace();
}
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}

}
}


MarshallingCodeFactory.java
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

public final class MarshallingCodeFactory {

/*Jboss Marshalling解码器*/
public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//指定单个消息序列化后的最大长度1M
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}

/*Jboss Marshalling编码器*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}


RequestInfo.java
import java.io.Serializable;
import java.util.HashMap;

public class RequestInfo implements Serializable {

private String ip ;
private HashMap<String, Object> cpuPercMap ;
private HashMap<String, Object> memoryMap;

public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public HashMap<String, Object> getCpuPercMap() {
return cpuPercMap;
}
public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
this.cpuPercMap = cpuPercMap;
}
public HashMap<String, Object> getMemoryMap() {
return memoryMap;
}
public void setMemoryMap(HashMap<String, Object> memoryMap) {
this.memoryMap = memoryMap;
}

}


Server.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class Server {

public static void main(String[] args) throws Exception{

EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//设置日志
//.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
//Netty中使用Marshalling
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHeartBeatHandler());
}
});

ChannelFuture cf = b.bind(8765).sync();

cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}


ServerHeartBeatHandler.java
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;

public class ServerHeartBeatHandler extends ChannelHandlerAdapter {

/*Server端需要对IP进行判断*/
private boolean auth(ChannelHandlerContext ctx, Object msg){
System.out.println("接收到的内容:"+msg);
if(null!=msg&&msg.equals("192.168.0.105")){
ctx.writeAndFlush("server authentication success");
return true;
} else {
//短连接
ctx.writeAndFlush("server authentication fail!")
.addListener(ChannelFutureListener.CLOSE);
return false;
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
auth(ctx, msg);
} else if (msg instanceof RequestInfo) {

RequestInfo info = (RequestInfo) msg;//因为使用了Marshalling序列化框架,所以可以直接转换为RequestInfo对象
System.out.println("--------------------------------------------");
System.out.println("当前主机ip为: " + info.getIp());
System.out.println("当前主机cpu情况: ");
HashMap<String, Object> cpu = info.getCpuPercMap();
System.out.println("总使用率: " + cpu.get("combined"));
System.out.println("用户使用率: " + cpu.get("user"));
System.out.println("系统使用率: " + cpu.get("sys"));
System.out.println("等待率: " + cpu.get("wait"));
System.out.println("空闲率: " + cpu.get("idle"));

System.out.println("当前主机memory情况: ");
HashMap<String, Object> memory = info.getMemoryMap();
System.out.println("内存总量: " + memory.get("total"));
System.out.println("当前内存使用量: " + memory.get("used"));
System.out.println("当前内存剩余量: " + memory.get("free"));
System.out.println("--------------------------------------------");

ctx.writeAndFlush("RequestInfo has received!");
} else {
ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
}
}

}
先启动Server再启动Client,Eclipse的console输出



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: