您的位置:首页 > 编程语言 > Java开发

结合Netty与Protobuf小栗子

2015-11-18 16:30 405 查看

结合Netty与Protobuf

基于上一篇生成protobuf的例子,接下来看看怎么与netty结合做个demo~

如何基于protobuf生成Java类请参考上一篇:

http://blog.csdn.net/drizzt0878/article/details/49890121

1. 编写服务端响应的protobuf文件

package com.nettydemo.protobuf;
option java_package = "com.nettydemo.protobuf";
option java_outer_classname = "MsgSendResponseProto";

message MsgSendResponse {
required string msgID = 1;
required string result = 2;
}


2. 编译并生成MsgSendResponseProto.java

3. 实现的需求

客户端 –> sendMsg –> 服务端

服务端(接收消息后) –> sendMsgResponse –> 客户端(接收响应消息)

4. 代码实现

服务端代码实现:(MsgSendResponseProto.java , MsgSendServer.java , MsgSendServerHandler.java)

public class MsgSendServer {

private static final Logger logger = Logger.getLogger(MsgSendServer.class);

public static void main(String[] args) throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 解码用
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 构造函数传递要解码成的类型
pipeline.addLast("protobufDecoder",
new ProtobufDecoder(MsgSendProto.MsgSend.getDefaultInstance()));

// 编码用
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("protobufEncoder", new ProtobufEncoder());

// 业务逻辑处理
pipeline.addLast("handler", new MsgSendServerHandler());
return pipeline;
}

});

bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(8080));

logger.info("server start up...");
}
}


public class MsgSendServerHandler extends SimpleChannelHandler {

private static final Logger logger = Logger.getLogger(MsgSendServerHandler.class);

@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}

@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
// 收到客户端发送的消息,已经解码成了MsgSendProto类型
final MsgSendProto.MsgSend msgSendProto = (MsgSendProto.MsgSend) e.getMessage();
System.out.println("receive msg from client: " + msgSendProto);

// 返回成功接收信息给客户端
MsgSendResponseProto.MsgSendResponse.Builder builder = MsgSendResponseProto.MsgSendResponse.newBuilder();
builder.setMsgID(msgSendProto.getMsgID());
builder.setResult("1");
e.getChannel().write(builder.build());
}
}


客户端代码实现:( MsgSendProto.java , MsgSendClient.java , MsgSendClientHandler.java )

public class MsgSendClient {

public static void main(String[] args) {

ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();

// 解码用
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 构造函数传递要解码成的类型
pipeline.addLast("protobufDecoder", new ProtobufDecoder(
MsgSendResponseProto.MsgSendResponse.getDefaultInstance()));

// 编码用
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("protobufEncoder", new ProtobufEncoder());

// 业务逻辑用
pipeline.addLast("handler", new MsgSendClientHandler());
return pipeline;
}

});

ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080));
// Wait until the connection is made successfully.
Channel channel = connectFuture.awaitUninterruptibly().getChannel();
MsgSendClientHandler handler = channel.getPipeline().get(MsgSendClientHandler.class);

// send msg
handler.sendMsg2Server();
}
}


public class MsgSendClientHandler extends SimpleChannelUpstreamHandler {

private volatile Channel channel;

private static final Logger logger = Logger.getLogger(MsgSendClientHandler.class);

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
channel = e.getChannel();
super.channelOpen(ctx, e);
}

@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}

@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
// 收到服务端返回的消息,已经解码成了MsgSendResponse类型
final MsgSendResponseProto.MsgSendResponse response = (MsgSendResponseProto.MsgSendResponse) e
.getMessage();
System.out.println("receive response from server: msgID=" + response.getMsgID() + ", resultCode="
+ response.getResult());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
logger.warn("Unexpected exception from downstream.", e.getCause());
e.getChannel().close();
}

public void sendMsg2Server() {
// 先进行序列化
MsgSendProto.MsgSend.Builder builder = MsgSendProto.MsgSend.newBuilder();
builder.setMsgID("20151101");
builder.setPhone("13100000000");
builder.setDoneTime(System.currentTimeMillis());
builder.setEnterpriseID(1);
builder.setEnterpriseName("enterprise");
builder.setResult(1);
builder.setChannelID(1);
builder.setCarrier(1);
builder.setBusyType(1);
builder.setSize(5);
builder.setUnit(1);
channel.write(builder.build());
}
}


运行结果

客户端



服务端

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java netty