您的位置:首页 > 其它

netty使用jboss的Marshalling编解码(对象序列化)

2018-01-04 22:47 197 查看
首选映入netty-all-5.0.0.Alpha2.jar  jboss-marshalling-serial-1.3.0.CR9.jar  jboss-marshalling-1.3.0.CR9.jar 包

/**

 * 客户端

 */

public class Client {

//内部类的懒加载
private static class SingletonHolder {
static final Client instance = new Client();
}

public static Client getInstance(){
return SingletonHolder.instance;
}

private EventLoopGroup group;
private Bootstrap b;
private ChannelFuture cf ;

private Client(){
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); //使用jboss的marshalling做对象序列化 
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //使用jboss的marshalling做对象反序列化
//超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
sc.pipeline().addLast(new ReadTimeoutHandler(5));  //设置回话超时时间5s
sc.pipeline().addLast(new ClientHandler());        //处理器
}
   });
}

public void connect(){
try {
this.cf = b.connect("127.0.0.1", 8765).sync(); //建立连接
System.out.println("远程服务器已经连接, 可以进行数据交换..");

} catch (Exception e) {
e.printStackTrace();
}
}

public ChannelFuture getChannelFuture(){ 

if(this.cf == null){ //程序启动时连接
this.connect();
}
if(!this.cf.channel().isActive()){ //如果连接断开时再次建立连接
this.connect();
}

return this.cf;
}

public static void main(String[] args) throws Exception{
final Client c = Client.getInstance();
//c.connect();

ChannelFuture cf = c.getChannelFuture();
for(int i = 1; i <= 3; i++ ){
Request request = new Request();
request.setId("" + i);
request.setName("pro" + i);
request.setRequestMessage("数据信息" + i);
cf.channel().writeAndFlush(request);//发送消息
TimeUnit.SECONDS.sleep(4);
}

cf.channel().closeFuture().sync();//当连接未断开时阻塞

new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("进入子线程...");
ChannelFuture cf = c.getChannelFuture();//重新建立连接
System.out.println(cf.channel().isActive());
System.out.println(cf.channel().isOpen());

//再次发送数据
Request request = new Request();
request.setId("" + 4);
request.setName("pro" + 4);
request.setRequestMessage("数据信息" + 4);
cf.channel().writeAndFlush(request);

cf.channel().closeFuture().sync();
System.out.println("子线程结束.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

System.out.println("断开连接,主线程结束..");

}

}

----------------------------------------------------------------

//客户端处理器

public class ClientHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Response resp = (Response)msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());

} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}

-----------------------------------------------------------------------

//需要传送的对象

public class Request implements Serializable{

private static final long  SerialVersionUID = 1L;

private String id ;
private String name ;
private String requestMessage ;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestM
bd69
essage(String requestMessage) {
this.requestMessage = requestMessage;
}

}

---------------------------------------------------------------

//服务端

public class Server {

public static void main(String[] args) throws Exception{

EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//设置日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
sc.pipeline().addLast(new ServerHandler());
}
});

ChannelFuture cf = b.bind(8765).sync();

cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();

}

}

-----------------------------------------------------

//服务端处理器

public class ServerHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request)msg;
System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
Response response = new Response();
response.setId(request.getId());
response.setName("response" + request.getId());
response.setResponseMessage("响应内容" + request.getId());
ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}

--------------------------------------------------

//响应对象,序列化和反序列化时对象两端的对象包名类型必须一致

public class Response implements Serializable{

private static final long serialVersionUID = 1L;

private String id;
private String name;
private String responseMessage;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: