您的位置:首页 > 其它

netty 对象序列化传输示例

2015-06-01 23:53 106 查看
package object.server.impl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class SubReqServer {
public void start(int port) {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup);
bootstrap.channel(NioServerSocketChannel.class);
// 配置 NioServerSocketChannel 的 tcp 参数, BACKLOG 的大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
/*
* 使用 weakCachingConcurrentResolver 创建线程安全的 WeakReferenceMap
* ,对类加载器进行缓存
* ,它支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄露,为了房子异常码流和解码错位导致的内存溢出
* ,这里将当个对象序列化之后的字节数组长度设置为1M
*/
ObjectDecoder objectDecoder = new ObjectDecoder(1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this
.getClass().getClassLoader()));

ch.pipeline().addLast(objectDecoder);
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubReqHandler());
}
});
// 绑定端口,随后调用它的同步阻塞方法 sync 等等绑定操作成功,完成之后 Netty 会返回一个 ChannelFuture
// 它的功能类似于的 Future,主要用于异步操作的通知回调.
ChannelFuture channelFuture;
try {
channelFuture = bootstrap.bind(port).sync();
// 等待服务端监听端口关闭,调用 sync 方法进行阻塞,等待服务端链路关闭之后 main 函数才退出.
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
SubReqServer server = new SubReqServer();
server.start(9091);
}
}


package object.server.impl;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqHandler extends ChannelHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
super.exceptionCaught(ctx, cause);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {

System.out.println(msg);
SubscriptResp sub = new SubscriptResp();
ctx.writeAndFlush(sub);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

}


Client

package object.client.impl;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class SubReqClient {
public void connect(String host, int port) {
NioEventLoopGroup workGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
/*
* 禁止堆类加载器进行缓存,他在基于 OSGI 的动态模块化编程中经常使用,由于 OSGI 可以进行热部署和热升级,当某个
* bundle
* 升级后,它对应的类加载器也将一起升级,因此在动态模块化的编程过程中,很少对类加载器进行缓存,因为他随时可能会发生变化.
*/
ch.pipeline().addLast(
new ObjectDecoder(1024 >> 2, ClassResolvers
.cacheDisabled(getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
});

// 发起异步链接操作
ChannelFuture future;
try {
future = bootstrap.connect(host, port).sync();
// 等待客户端链路关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new SubReqClient().connect("localhost", 9091);
}
}


ClientHandler

package object.client.impl;

import object.server.impl.SubScriptReq;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqClientHandler extends ChannelHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SubScriptReq req = new SubScriptReq();
req.setSubReq(999);
ctx.writeAndFlush(req);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

}


POJO

package object.server.impl;

import java.io.Serializable;

public class SubscriptResp implements Serializable {
/**
*
*/
private static final long serialVersionUID = 4923081103118853877L;
private Integer subScriptID;
private String respCode;
private String desc;

public Integer getSubScriptID() {
return subScriptID;
}

public void setSubScriptID(Integer subScriptID) {
this.subScriptID = subScriptID;
}

public String getRespCode() {
return respCode;
}

public void setRespCode(String respCode) {
this.respCode = respCode;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "SubscriptResp [subScriptID=" + subScriptID + ", respCode="
+ respCode + ", desc=" + desc + "]";
}

}


req

package object.server.impl;

import java.io.Serializable;

public class SubScriptReq implements Serializable {
/**
*
*/
private static final long serialVersionUID = 4686274228090335845L;
private Integer subReq;
private String userName;
private String productName;
private String address;

public Integer getSubReq() {
return subReq;
}

public void setSubReq(Integer subReq) {
this.subReq = subReq;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getProductName() {
return productName;
}

public void setProductName(String productName) {
this.productName = productName;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

@Override
public String toString() {
return "SubScriptReq [subReq=" + subReq + ", userName=" + userName
+ ", productName=" + productName + ", address=" + address + "]";
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: