结合Netty与Protobuf小栗子
2015-11-18 16:30
405 查看
结合Netty与Protobuf
基于上一篇生成protobuf的例子,接下来看看怎么与netty结合做个demo~
如何基于protobuf生成Java类请参考上一篇:http://blog.csdn.net/drizzt0878/article/details/49890121
1. 编写服务端响应的protobuf文件
package com.nettydemo.protobuf; option java_package = "com.nettydemo.protobuf"; option java_outer_classname = "MsgSendResponseProto"; message MsgSendResponse { required string msgID = 1; required string result = 2; }
2. 编译并生成MsgSendResponseProto.java
3. 实现的需求
客户端 –> sendMsg –> 服务端服务端(接收消息后) –> sendMsgResponse –> 客户端(接收响应消息)
4. 代码实现
服务端代码实现:(MsgSendResponseProto.java , MsgSendServer.java , MsgSendServerHandler.java)
public class MsgSendServer { private static final Logger logger = Logger.getLogger(MsgSendServer.class); public static void main(String[] args) throws Exception { ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); // 解码用 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 构造函数传递要解码成的类型 pipeline.addLast("protobufDecoder", new ProtobufDecoder(MsgSendProto.MsgSend.getDefaultInstance())); // 编码用 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); // 业务逻辑处理 pipeline.addLast("handler", new MsgSendServerHandler()); return pipeline; } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8080)); logger.info("server start up..."); } }
public class MsgSendServerHandler extends SimpleChannelHandler { private static final Logger logger = Logger.getLogger(MsgSendServerHandler.class); @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { logger.info(e.toString()); } super.handleUpstream(ctx, e); } @Override public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) { // 收到客户端发送的消息,已经解码成了MsgSendProto类型 final MsgSendProto.MsgSend msgSendProto = (MsgSendProto.MsgSend) e.getMessage(); System.out.println("receive msg from client: " + msgSendProto); // 返回成功接收信息给客户端 MsgSendResponseProto.MsgSendResponse.Builder builder = MsgSendResponseProto.MsgSendResponse.newBuilder(); builder.setMsgID(msgSendProto.getMsgID()); builder.setResult("1"); e.getChannel().write(builder.build()); } }
客户端代码实现:( MsgSendProto.java , MsgSendClient.java , MsgSendClientHandler.java )
public class MsgSendClient { public static void main(String[] args) { ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); // 解码用 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 构造函数传递要解码成的类型 pipeline.addLast("protobufDecoder", new ProtobufDecoder( MsgSendResponseProto.MsgSendResponse.getDefaultInstance())); // 编码用 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); // 业务逻辑用 pipeline.addLast("handler", new MsgSendClientHandler()); return pipeline; } }); ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)); // Wait until the connection is made successfully. Channel channel = connectFuture.awaitUninterruptibly().getChannel(); MsgSendClientHandler handler = channel.getPipeline().get(MsgSendClientHandler.class); // send msg handler.sendMsg2Server(); } }
public class MsgSendClientHandler extends SimpleChannelUpstreamHandler { private volatile Channel channel; private static final Logger logger = Logger.getLogger(MsgSendClientHandler.class); @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { channel = e.getChannel(); super.channelOpen(ctx, e); } @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { logger.info(e.toString()); } super.handleUpstream(ctx, e); } @Override public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) { // 收到服务端返回的消息,已经解码成了MsgSendResponse类型 final MsgSendResponseProto.MsgSendResponse response = (MsgSendResponseProto.MsgSendResponse) e .getMessage(); System.out.println("receive response from server: msgID=" + response.getMsgID() + ", resultCode=" + response.getResult()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { logger.warn("Unexpected exception from downstream.", e.getCause()); e.getChannel().close(); } public void sendMsg2Server() { // 先进行序列化 MsgSendProto.MsgSend.Builder builder = MsgSendProto.MsgSend.newBuilder(); builder.setMsgID("20151101"); builder.setPhone("13100000000"); builder.setDoneTime(System.currentTimeMillis()); builder.setEnterpriseID(1); builder.setEnterpriseName("enterprise"); builder.setResult(1); builder.setChannelID(1); builder.setCarrier(1); builder.setBusyType(1); builder.setSize(5); builder.setUnit(1); channel.write(builder.build()); } }
运行结果
客户端
服务端
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树
- [原创]java局域网聊天系统