您的位置:首页 > 其它

基于Netty的异步Rpc调用的小框架

2016-05-06 19:01 197 查看

基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手。

 

客户端与服务端通信的类

package cc.ymsoft.Framework;

import java.io.Serializable;

@SuppressWarnings("serial")
public class MethodAndArgs implements Serializable{
private String methodName;//调用的方法名称
private Class<?>[] types;//参数类型
private Object[] objects;//参数列表
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getTypes() {
return types;
}
public void setTypes(Class<?>[] types) {
this.types = types;
}
public Object[] getObjects() {
return objects;
}
public void setObjects(Object[] objects) {
this.objects = objects;
}
public MethodAndArgs() {
super();
// TODO Auto-generated constructor stub
}
public MethodAndArgs(String methodName, Class<?>[] types, Object[] objects) {

this.methodName = methodName;
this.types = types;
this.objects = objects;
}

}

 

 框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类)

/**
* @author xulang
*/
package cc.ymsoft.Framework;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* 服务端处理
* @author hadoop
*
*/
class TcpServerHandler extends ChannelInboundHandlerAdapter {

private Object obj;
private Object response;

public TcpServerHandler(Object obj) {
super();
this.obj = obj;

}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// TODO Auto-generated method stub
MethodAndArgs methodAndArgs=(MethodAndArgs) msg;
Method method=obj.getClass().getMethod(methodAndArgs.getMethodName(), methodAndArgs.getTypes());
ctx.writeAndFlush(method.invoke(obj, methodAndArgs.getObjects()));
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println("client die");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println("channelActive>>>>>>>>");
ctx.writeAndFlush("调用异常");
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("服务器异常");
}
}
/**
* 客户端处理
* @author hadoop
*
*/
class TcpClientHander extends ChannelInboundHandlerAdapter {
private Object response;

public Object getResponse() {
return response;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response=msg;
System.out.println("client接收到服务器返回的消息:" + msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}

public class RpcFramework {

/**
* 服务注册
* @param obj 需要注册的服务对象
* @param port 端口
* @param ip 地址
* @throws InterruptedException
*/
public static void regist(final Object obj,int port,String ip) throws InterruptedException {
int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2;

int BIZTHREADSIZE = 100;
EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
if (obj == null)
throw new IllegalArgumentException("对象不能为null");
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("错误的端口" + port);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
// TODO Auto-generated method stub
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//  pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
// pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new TcpServerHandler(obj));
}
});
ChannelFuture f = bootstrap.bind(ip, port).sync();
f.channel().closeFuture().sync();
System.out.println("TCP服务器已启动");
}
@SuppressWarnings("unchecked")
public static <T>T getObj(Class<T> interfaceClass,final String host,final int port) {
if (interfaceClass == null)
throw new IllegalArgumentException("接口类型不能为空");
if (!interfaceClass.isInterface())
throw new IllegalArgumentException("类名" + interfaceClass.getName() + "必须是接口");
if (host == null || host.length() == 0)
throw new IllegalArgumentException("目标主机不能为空");
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("端口错误:" + port);

return (T)	Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

MethodAndArgs mArgs=new MethodAndArgs(method.getName(), method.getParameterTypes(), args);
final TcpClientHander tcpClientHander=new TcpClientHander();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group);
//     b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
b.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);

b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",tcpClientHander);
}
});

ChannelFuture f = b.connect(host, port).sync();

f.channel().writeAndFlush(mArgs).sync();
f.channel().closeFuture().sync();

} catch (Exception e) {

} finally {

group.shutdownGracefully();
}
return tcpClientHander.getResponse();

}
});
}
}

  测试

接口

package cc.ymsoft.test;

interface HelloService {
String SayHello(String name);
}

 接口实现类

package cc.ymsoft.test;

public class HelloImp implements HelloService {

@Override
public String SayHello(String name) {
// TODO Auto-generated method stub

return "你好:"+name;
}

}

 客户端

package cc.ymsoft.test;

import cc.ymsoft.Framework.RpcFramework;

public class HelloInvoke {

public static void main(String[] args) throws Exception {
final HelloService helloService = RpcFramework.getObj(HelloService.class, "127.0.0.1", 1717);

System.out.println(helloService.SayHello("XL"));

}

}

 服务端

 

package cc.ymsoft.test;

import cc.ymsoft.Framework.RpcFramework;

public class HelloPro {
public static void main(String[] args) throws Exception {
HelloService hello=new HelloImp();
RpcFramework.regist(hello, 1717, "127.0.0.1");
}

}

 

完整代码在github https://github.com/xulang/NettyRpc

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: